Make some good progress on e2e tests.

This commit is contained in:
Jacob Dufault 2017-09-12 20:35:27 -07:00
parent 6cdb7c66e1
commit 17565f9a14
9 changed files with 550 additions and 171 deletions

View File

@ -20,6 +20,7 @@
#include "test.h"
#include "timer.h"
#include "threaded_queue.h"
#include "work_thread.h"
#include "working_files.h"
#include <loguru.hpp>
@ -189,7 +190,7 @@ bool FindFileOrFail(QueryDatabase* db, lsRequestId id, const std::string& absolu
if (out_file_id)
*out_file_id = QueryFileId((size_t)-1);
LOG_S(INFO) << "Unable to find file " << absolute_path;
LOG_S(INFO) << "Unable to find file \"" << absolute_path << "\"";
Out_Error out;
out.id = id;
@ -541,9 +542,10 @@ struct Index_Request {
std::string path;
std::vector<std::string> args; // TODO: make this a string that is parsed lazily.
bool is_interactive;
optional<std::string> contents; // Preloaded contents. Useful for tests.
Index_Request(const std::string& path, const std::vector<std::string>& args, bool is_interactive)
: path(path), args(args), is_interactive(is_interactive) {}
Index_Request(const std::string& path, const std::vector<std::string>& args, bool is_interactive, optional<std::string> contents)
: path(path), args(args), is_interactive(is_interactive), contents(contents) {}
};
struct Index_DoIdMap {
@ -652,6 +654,15 @@ struct QueueManager {
Index_OnIndexedQueue on_indexed;
QueueManager(MultiQueueWaiter* waiter) : index_request(waiter), do_id_map(waiter), load_previous_index(waiter), on_id_mapped(waiter), on_indexed(waiter) {}
bool HasWork() {
return
!index_request.IsEmpty() ||
!do_id_map.IsEmpty() ||
!load_previous_index.IsEmpty() ||
!on_id_mapped.IsEmpty() ||
!on_indexed.IsEmpty();
}
};
void RegisterMessageTypes() {
@ -684,6 +695,9 @@ void RegisterMessageTypes() {
MessageRegistry::instance()->Register<Ipc_CqueryCallers>();
MessageRegistry::instance()->Register<Ipc_CqueryBase>();
MessageRegistry::instance()->Register<Ipc_CqueryDerived>();
MessageRegistry::instance()->Register<Ipc_CqueryIndexFile>();
MessageRegistry::instance()->Register<Ipc_CqueryQueryDbWaitForIdleIndexer>();
MessageRegistry::instance()->Register<Ipc_CqueryExitWhenIdle>();
}
@ -726,6 +740,12 @@ struct ImportManager {
import_.erase(path);
}
// Returns true if there any any files currently being imported.
bool HasActiveImports() {
std::lock_guard<std::mutex> guard(mutex_);
return !import_.empty();
}
std::mutex mutex_;
std::unordered_set<std::string> import_;
};
@ -832,7 +852,8 @@ std::vector<Index_DoIdMap> DoParseFile(
CacheLoader* cache_loader,
bool is_interactive,
const std::string& path,
const std::vector<std::string>& args) {
const std::vector<std::string>& args,
const optional<FileContents>& contents) {
std::vector<Index_DoIdMap> result;
IndexFile* previous_index = cache_loader->TryLoad(path);
@ -904,6 +925,10 @@ std::vector<Index_DoIdMap> DoParseFile(
// well. We then default to a fast file-copy if not in working set.
bool loaded_primary = false;
std::vector<FileContents> file_contents;
if (contents) {
loaded_primary = loaded_primary || contents->path == path;
file_contents.push_back(*contents);
}
for (const auto& it : cache_loader->caches) {
const std::unique_ptr<IndexFile>& index = it.second;
assert(index);
@ -930,6 +955,7 @@ std::vector<Index_DoIdMap> DoParseFile(
config, file_consumer_shared,
path, args, file_contents,
&perf, index);
// LOG_S(INFO) << "Parsing " << path << " gave " << indexes.size() << " indexes";
for (std::unique_ptr<IndexFile>& new_index : indexes) {
Timer time;
@ -964,7 +990,12 @@ std::vector<Index_DoIdMap> ParseFile(
FileConsumer::SharedState* file_consumer_shared,
TimestampManager* timestamp_manager,
bool is_interactive,
const Project::Entry& entry) {
const Project::Entry& entry,
const optional<std::string>& contents) {
optional<FileContents> file_contents;
if (contents)
file_contents = FileContents(entry.filename, *contents);
CacheLoader cache_loader(config);
@ -973,7 +1004,7 @@ std::vector<Index_DoIdMap> ParseFile(
// complain about if indexed by itself.
IndexFile* entry_cache = cache_loader.TryLoad(entry.filename);
std::string tu_path = entry_cache ? entry_cache->import_file : entry.filename;
return DoParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, &cache_loader, is_interactive, tu_path, entry.args);
return DoParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, &cache_loader, is_interactive, tu_path, entry.args, file_contents);
}
bool IndexMain_DoParse(
@ -985,18 +1016,28 @@ bool IndexMain_DoParse(
TimestampManager* timestamp_manager,
clang::Index* index) {
optional<Index_Request> request = queue->index_request.TryDequeue();
if (!request)
return false;
if (!import_manager->StartImport(request->path))
bool can_import = false;
optional<Index_Request> request = queue->index_request.TryDequeuePlusAction([&](const Index_Request& request) {
can_import = import_manager->StartImport(request.path);
});
if (!request || !can_import)
return false;
Project::Entry entry;
entry.filename = request->path;
entry.args = request->args;
std::vector<Index_DoIdMap> responses = ParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, request->is_interactive, entry);
std::vector<Index_DoIdMap> responses = ParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, request->is_interactive, entry, request->contents);
// Unmark file as imported if indexing failed.
bool found_import = false;
for (auto& response : responses) {
if (response.current->path == request->path)
found_import = true;
}
if (!found_import)
import_manager->DoneImport(request->path);
// Don't bother sending an IdMap request if there are no responses.
if (responses.empty())
return false;
@ -1100,48 +1141,48 @@ bool IndexMergeIndexUpdates(QueueManager* queue) {
}
}
void IndexMain(Config* config,
FileConsumer::SharedState* file_consumer_shared,
ImportManager* import_manager,
TimestampManager* timestamp_manager,
Project* project,
WorkingFiles* working_files,
MultiQueueWaiter* waiter,
QueueManager* queue) {
SetCurrentThreadName("indexer");
WorkThread::Result IndexMain(
Config* config,
FileConsumer::SharedState* file_consumer_shared,
ImportManager* import_manager,
TimestampManager* timestamp_manager,
Project* project,
WorkingFiles* working_files,
MultiQueueWaiter* waiter,
QueueManager* queue) {
// TODO: dispose of index after it is not used for a while.
clang::Index index(1, 0);
while (true) {
// TODO: process all off IndexMain_DoIndex before calling
// IndexMain_DoCreateIndexUpdate for
// better icache behavior. We need to have some threads spinning on
// both though
// otherwise memory usage will get bad.
// TODO: process all off IndexMain_DoIndex before calling
// IndexMain_DoCreateIndexUpdate for
// better icache behavior. We need to have some threads spinning on
// both though
// otherwise memory usage will get bad.
// We need to make sure to run both IndexMain_DoParse and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
bool did_parse = IndexMain_DoParse(config, working_files, queue, file_consumer_shared, import_manager, timestamp_manager, &index);
// We need to make sure to run both IndexMain_DoParse and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
bool did_parse = IndexMain_DoParse(config, working_files, queue, file_consumer_shared, import_manager, timestamp_manager, &index);
bool did_create_update =
IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager);
bool did_create_update =
IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager);
bool did_load_previous = IndexMain_LoadPreviousIndex(config, queue);
bool did_load_previous = IndexMain_LoadPreviousIndex(config, queue);
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates(queue);
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates(queue);
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
waiter->Wait(
{&queue->index_request, &queue->on_id_mapped, &queue->load_previous_index, &queue->on_indexed});
}
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
waiter->Wait(
{&queue->index_request, &queue->on_id_mapped, &queue->load_previous_index, &queue->on_indexed});
}
return queue->HasWork() ? WorkThread::Result::MoreWork : WorkThread::Result::NoWork;
}
bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import_manager, QueueManager* queue, WorkingFiles* working_files) {
@ -1157,8 +1198,8 @@ bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import
// it, load the previous state from disk and rerun IdMap logic later. Do not
// do this if we have already attempted in the past.
if (!request->load_previous &&
!request->previous &&
db->usr_to_file.find(LowerPathIfCaseInsensitive(request->current->path)) != db->usr_to_file.end()) {
!request->previous &&
db->usr_to_file.find(LowerPathIfCaseInsensitive(request->current->path)) != db->usr_to_file.end()) {
assert(!request->load_previous);
request->load_previous = true;
queue->load_previous_index.Enqueue(std::move(*request));
@ -1299,6 +1340,7 @@ bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import
bool QueryDbMainLoop(
Config* config,
QueryDatabase* db,
bool* exit_when_idle,
MultiQueueWaiter* waiter,
QueueManager* queue,
Project* project,
@ -1336,14 +1378,15 @@ bool QueryDbMainLoop(
<< " with uri " << request->params.rootUri->raw_uri;
if (!request->params.initializationOptions) {
LOG_S(INFO) << "Initialization parameters (particularily cacheDirectory) are required";
LOG_S(FATAL) << "Initialization parameters (particularily cacheDirectory) are required";
exit(1);
}
*config = *request->params.initializationOptions;
// Check client version.
if (config->clientVersion != kExpectedClientVersion) {
if (config->clientVersion != kExpectedClientVersion &&
config->clientVersion != -1 /*disable check*/) {
Out_ShowLogMessage out;
out.display_type = Out_ShowLogMessage::DisplayType::Show;
out.params.type = lsMessageType::Error;
@ -1375,8 +1418,8 @@ bool QueryDbMainLoop(
}
std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl;
for (int i = 0; i < config->indexerCount; ++i) {
new std::thread([&]() {
IndexMain(config, file_consumer_shared, import_manager, timestamp_manager, project, working_files, waiter, queue);
WorkThread::StartThread("indexer" + std::to_string(i), [&]() {
return IndexMain(config, file_consumer_shared, import_manager, timestamp_manager, project, working_files, waiter, queue);
});
}
@ -1395,7 +1438,7 @@ bool QueryDbMainLoop(
// << "] Dispatching index request for file " << entry.filename
// << std::endl;
bool is_interactive = working_files->GetFileByFilename(entry.filename) != nullptr;
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive));
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive, nullopt));
});
// We need to support multiple concurrent index processes.
@ -1459,7 +1502,7 @@ bool QueryDbMainLoop(
LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1)
<< "] Dispatching index request for file " << entry.filename;
bool is_interactive = working_files->GetFileByFilename(entry.filename) != nullptr;
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive));
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive, nullopt));
});
break;
}
@ -1674,7 +1717,7 @@ bool QueryDbMainLoop(
// Submit new index request.
const Project::Entry& entry = project->FindCompilationEntryForFile(path);
queue->index_request.PriorityEnqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/));
queue->index_request.PriorityEnqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/, nullopt));
break;
}
@ -1719,7 +1762,7 @@ bool QueryDbMainLoop(
// if so, ignore that index response.
// TODO: send as priority request
Project::Entry entry = project->FindCompilationEntryForFile(path);
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/));
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/, nullopt));
clang_complete->NotifySave(path);
@ -2572,6 +2615,45 @@ bool QueryDbMainLoop(
break;
}
case IpcId::CqueryIndexFile: {
auto msg = static_cast<Ipc_CqueryIndexFile*>(message.get());
queue->index_request.Enqueue(Index_Request(
NormalizePath(msg->params.path),
msg->params.args,
msg->params.is_interactive,
msg->params.contents));
break;
}
case IpcId::CqueryQueryDbWaitForIdleIndexer: {
auto msg = static_cast<Ipc_CqueryQueryDbWaitForIdleIndexer*>(message.get());
LOG_S(INFO) << "Waiting for idle";
int idle_count = 0;
while (true) {
bool has_work = false;
has_work |= import_manager->HasActiveImports();
has_work |= queue->HasWork();
has_work |= QueryDb_ImportMain(config, db, import_manager, queue, working_files);
if (!has_work)
++idle_count;
else
idle_count = 0;
// There are race conditions between each of the three checks above,
// so we retry a bunch of times to try to avoid any.
if (idle_count > 10)
break;
}
LOG_S(INFO) << "Done waiting for idle";
break;
}
case IpcId::CqueryExitWhenIdle: {
*exit_when_idle = true;
WorkThread::request_exit_on_idle = true;
break;
}
default: {
LOG_S(INFO) << "[querydb] Unhandled IPC message " << IpcIdToString(message->method_id);
exit(1);
@ -2588,10 +2670,8 @@ bool QueryDbMainLoop(
return did_work;
}
void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) {
// Create queues.
QueueManager queue(waiter);
void RunQueryDbThread(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter, QueueManager* queue) {
bool exit_when_idle = false;
Project project;
WorkingFiles working_files;
ClangCompleteManager clang_complete(
@ -2610,18 +2690,22 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
QueryDatabase db;
while (true) {
bool did_work = QueryDbMainLoop(
config, &db, waiter, &queue,
config, &db, &exit_when_idle, waiter, queue,
&project, &file_consumer_shared, &import_manager, &timestamp_manager, &working_files,
&clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get());
// No more work left and exit request. Exit.
if (!did_work && exit_when_idle && WorkThread::num_active_threads == 0)
exit(0);
// Cleanup and free any unused memory.
FreeUnusedMemory();
if (!did_work) {
waiter->Wait({
IpcManager::instance()->threaded_queue_for_server_.get(),
&queue.do_id_map,
&queue.on_indexed
&queue->do_id_map,
&queue->on_indexed
});
}
}
@ -2688,8 +2772,6 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
// TODO: global lock on stderr output.
// Separate thread whose only job is to read from stdin and
// dispatch read commands to the actual indexer program. This
@ -2697,16 +2779,16 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
// blocks.
//
// |ipc| is connected to a server.
void LanguageServerStdinLoop(Config* config, std::unordered_map<IpcId, Timer>* request_times) {
IpcManager* ipc = IpcManager::instance();
void LaunchStdinLoop(Config* config, std::unordered_map<IpcId, Timer>* request_times) {
WorkThread::StartThread("stdin", [config, request_times]() {
IpcManager* ipc = IpcManager::instance();
SetCurrentThreadName("stdin");
while (true) {
std::unique_ptr<BaseIpcMessage> message = MessageRegistry::instance()->ReadMessageFromStdin();
// Message parsing can fail if we don't recognize the method.
if (!message)
continue;
return WorkThread::Result::MoreWork;
(*request_times)[message->method_id] = Timer();
@ -2722,8 +2804,21 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map<IpcId, Timer>* r
break;
}
case IpcId::Exit: {
exit(0);
break;
}
case IpcId::CqueryExitWhenIdle: {
// querydb needs to know to exit when idle. We return out of the stdin
// loop to exit the thread. If we keep parsing input stdin is likely
// closed so cquery will exit.
LOG_S(INFO) << "cquery will exit when all threads are idle";
ipc->SendMessage(IpcManager::Destination::Server, std::move(message));
return WorkThread::Result::ExitThread;
}
case IpcId::Initialize:
case IpcId::Exit:
case IpcId::TextDocumentDidOpen:
case IpcId::TextDocumentDidChange:
case IpcId::TextDocumentDidClose:
@ -2747,7 +2842,9 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map<IpcId, Timer>* r
case IpcId::CqueryVars:
case IpcId::CqueryCallers:
case IpcId::CqueryBase:
case IpcId::CqueryDerived: {
case IpcId::CqueryDerived:
case IpcId::CqueryIndexFile:
case IpcId::CqueryQueryDbWaitForIdleIndexer: {
ipc->SendMessage(IpcManager::Destination::Server, std::move(message));
break;
}
@ -2757,7 +2854,9 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map<IpcId, Timer>* r
exit(1);
}
}
}
return WorkThread::Result::MoreWork;
});
}
@ -2805,15 +2904,14 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map<IpcId, Timer>* r
void StdoutMain(std::unordered_map<IpcId, Timer>* request_times, MultiQueueWaiter* waiter) {
SetCurrentThreadName("stdout");
IpcManager* ipc = IpcManager::instance();
void LaunchStdoutThread(std::unordered_map<IpcId, Timer>* request_times, MultiQueueWaiter* waiter, QueueManager* queue) {
WorkThread::StartThread("stdout", [=]() {
IpcManager* ipc = IpcManager::instance();
while (true) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Client);
if (messages.empty()) {
waiter->Wait({ipc->threaded_queue_for_client_.get()});
continue;
waiter->Wait({ ipc->threaded_queue_for_client_.get() });
return queue->HasWork() ? WorkThread::Result::MoreWork : WorkThread::Result::NoWork;
}
for (auto& message : messages) {
@ -2839,26 +2937,24 @@ void StdoutMain(std::unordered_map<IpcId, Timer>* request_times, MultiQueueWaite
}
}
}
}
return WorkThread::Result::MoreWork;
});
}
void LanguageServerMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) {
QueueManager queue(waiter);
std::unordered_map<IpcId, Timer> request_times;
// Start stdin reader. Reading from stdin is a blocking operation so this
// needs a dedicated thread.
new std::thread([&]() {
LanguageServerStdinLoop(config, &request_times);
});
// Start querydb thread. querydb will start indexer threads as needed.
new std::thread([&]() {
QueryDbMain(bin_name, config, waiter);
});
LaunchStdinLoop(config, &request_times);
// We run a dedicated thread for writing to stdout because there can be an
// unknown number of delays when output information.
StdoutMain(&request_times, waiter);
LaunchStdoutThread(&request_times, waiter, &queue);
// Start querydb which takes over this thread. The querydb will launch
// indexer threads as needed.
RunQueryDbThread(bin_name, config, waiter, &queue);
}

View File

@ -5,6 +5,7 @@
#include "project.h"
#include "standard_includes.h"
#include "timer.h"
#include "work_thread.h"
#include <thread>
@ -114,8 +115,7 @@ void IncludeComplete::Rescan() {
match_ = MakeUnique<GroupMatch>(config_->includeCompletionWhitelist, config_->includeCompletionBlacklist);
is_scanning = true;
new std::thread([this]() {
SetCurrentThreadName("include_scanner");
WorkThread::StartThread("include_scanner", [this]() {
Timer timer;
InsertStlIncludes();
@ -127,6 +127,8 @@ void IncludeComplete::Rescan() {
timer.ResetAndPrint("[perf] Scanning for includes");
is_scanning = false;
return WorkThread::Result::ExitThread;
});
}

View File

@ -71,6 +71,14 @@ const char* IpcIdToString(IpcId id) {
case IpcId::Cout:
return "$cout";
case IpcId::CqueryIndexFile:
return "$cquery/indexFile";
case IpcId::CqueryQueryDbWaitForIdleIndexer:
return "$cquery/queryDbWaitForIdleIndexer";
case IpcId::CqueryExitWhenIdle:
return "$cquery/exitWhenIdle";
default:
assert(false && "missing IpcId string name");
exit(1);

View File

@ -46,7 +46,14 @@ enum class IpcId : int {
CqueryDerived, // Show all derived types/methods.
// Internal implementation detail.
Cout
Cout,
// Index the given file contents. Used in tests.
CqueryIndexFile,
// Make querydb wait for the indexer to be idle. Used in tests.
CqueryQueryDbWaitForIdleIndexer,
// Exit after all messages have been read/processes. Used in tests.
CqueryExitWhenIdle
};
MAKE_ENUM_HASHABLE(IpcId)
MAKE_REFLECT_TYPE_PROXY(IpcId, int)
@ -68,4 +75,28 @@ struct Ipc_Cout : public IpcMessage<Ipc_Cout> {
std::string content;
IpcId original_ipc_id;
};
MAKE_REFLECT_STRUCT(Ipc_Cout, content);
MAKE_REFLECT_STRUCT(Ipc_Cout, content);
struct Ipc_CqueryIndexFile : public IpcMessage<Ipc_CqueryIndexFile> {
static constexpr IpcId kIpcId = IpcId::CqueryIndexFile;
struct Params {
std::string path;
std::vector<std::string> args;
bool is_interactive = false;
std::string contents;
};
Params params;
};
MAKE_REFLECT_STRUCT(Ipc_CqueryIndexFile::Params, path, args, is_interactive, contents);
MAKE_REFLECT_STRUCT(Ipc_CqueryIndexFile, params);
struct Ipc_CqueryQueryDbWaitForIdleIndexer : public IpcMessage<Ipc_CqueryQueryDbWaitForIdleIndexer> {
static constexpr IpcId kIpcId = IpcId::CqueryQueryDbWaitForIdleIndexer;
};
MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryQueryDbWaitForIdleIndexer);
struct Ipc_CqueryExitWhenIdle : public IpcMessage<Ipc_CqueryExitWhenIdle> {
static constexpr IpcId kIpcId = IpcId::CqueryExitWhenIdle;
};
MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryExitWhenIdle);

View File

@ -1,5 +1,8 @@
#include "language_server_api.h"
#include <doctest/doctest.h>
#include <loguru.hpp>
void Reflect(Writer& visitor, lsRequestId& value) {
assert(value.id0.has_value() || value.id1.has_value());
@ -22,55 +25,113 @@ void Reflect(Reader& visitor, lsRequestId& id) {
MessageRegistry* MessageRegistry::instance_ = nullptr;
std::unique_ptr<BaseIpcMessage> MessageRegistry::ReadMessageFromStdin() {
int content_length = -1;
int iteration = 0;
// Reads a JsonRpc message. |read| returns the next input character.
optional<std::string> ReadJsonRpcContentFrom(std::function<optional<char>()> read) {
// Read the content length. It is terminated by the "\r\n" sequence.
int exit_seq = 0;
std::string stringified_content_length;
while (true) {
if (++iteration > 10) {
assert(false && "bad parser state");
exit(1);
optional<char> opt_c = read();
if (!opt_c) {
LOG_S(INFO) << "No more input when reading content length header";
return nullopt;
}
char c = *opt_c;
std::string line;
std::getline(std::cin, line);
if (exit_seq == 0 && c == '\r') ++exit_seq;
if (exit_seq == 1 && c == '\n') break;
stringified_content_length += c;
}
constexpr char* kContentLengthStart = "Content-Length: ";
assert(StartsWith(stringified_content_length, kContentLengthStart));
int content_length = atoi(stringified_content_length.c_str() + strlen(kContentLengthStart));
// No content; end of stdin.
if (line.empty()) {
std::cerr << "stdin closed; exiting" << std::endl;
exit(0);
}
// std::cin >> line;
// std::cerr << "Read line " << line;
if (line.compare(0, 14, "Content-Length") == 0) {
content_length = atoi(line.c_str() + 16);
}
if (line == "\r")
break;
// There is always a "\r\n" sequence before the actual content.
auto expect_char = [&](char expected) {
optional<char> opt_c = read();
return opt_c && *opt_c == expected;
};
if (!expect_char('\r') || !expect_char('\n')) {
LOG_S(INFO) << "Unexpected token (expected \r\n sequence)";
return nullopt;
}
// bad input that is not a message.
if (content_length < 0) {
std::cerr << "parsing command failed (no Content-Length header)"
<< std::endl;
return nullptr;
}
// TODO: maybe use std::cin.read(c, content_length)
// Read content.
std::string content;
content.reserve(content_length);
for (int i = 0; i < content_length; ++i) {
char c;
std::cin.read(&c, 1);
content += c;
for (size_t i = 0; i < content_length; ++i) {
optional<char> c = read();
if (!c) {
LOG_S(INFO) << "No more input when reading content body";
return nullopt;
}
content += *c;
}
return content;
}
TEST_SUITE("FindIncludeLine");
auto MakeContentReader(std::string* content, bool can_be_empty) {
return [=]() -> optional<char> {
if (!can_be_empty)
REQUIRE(!content->empty());
if (content->empty())
return nullopt;
char c = (*content)[0];
content->erase(content->begin());
return c;
};
}
TEST_CASE("ReadContentFromSource") {
auto parse_correct = [](std::string content) {
auto reader = MakeContentReader(&content, false /*can_be_empty*/);
auto got = ReadJsonRpcContentFrom(reader);
REQUIRE(got);
return got.value();
};
auto parse_incorrect = [](std::string content) {
auto reader = MakeContentReader(&content, true /*can_be_empty*/);
return ReadJsonRpcContentFrom(reader);
};
REQUIRE(parse_correct("Content-Length: 0\r\n\r\n") == "");
REQUIRE(parse_correct("Content-Length: 1\r\n\r\na") == "a");
REQUIRE(parse_correct("Content-Length: 4\r\n\r\nabcd") == "abcd");
REQUIRE(parse_incorrect("ggg") == optional<std::string>());
REQUIRE(parse_incorrect("Content-Length: 0\r\n") == optional<std::string>());
REQUIRE(parse_incorrect("Content-Length: 5\r\n\r\nab") == optional<std::string>());
}
TEST_SUITE_END();
optional<char> ReadCharFromStdinBlocking() {
// Bad stdin means parent process has probably exited. Either way, cquery
// can no longer be communicated with so just exit.
if (!std::cin.good()) {
LOG_S(FATAL) << "std::cin.good() is false; exiting";
exit(1);
}
//std::cerr << content.c_str() << std::endl;
char c = 0;
std::cin.read(&c, 1);
return c;
}
std::unique_ptr<BaseIpcMessage> MessageRegistry::ReadMessageFromStdin() {
optional<std::string> content = ReadJsonRpcContentFrom(&ReadCharFromStdinBlocking);
if (!content) {
LOG_S(FATAL) << "Failed to read JsonRpc input; exiting";
exit(1);
}
rapidjson::Document document;
document.Parse(content.c_str(), content_length);
document.Parse(content->c_str(), content->length());
assert(!document.HasParseError());
return Parse(document);
@ -86,7 +147,7 @@ std::unique_ptr<BaseIpcMessage> MessageRegistry::Parse(Reader& visitor) {
ReflectMember(visitor, "method", method);
if (allocators.find(method) == allocators.end()) {
std::cerr << "Unable to find registered handler for method \"" << method << "\"" << std::endl;
LOG_S(ERROR) << "Unable to find registered handler for method \"" << method << "\"" << std::endl;
return nullptr;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "work_thread.h"
#include <optional.h>
#include <algorithm>
@ -38,8 +39,11 @@ struct MultiQueueWaiter {
// HasState() is called data gets posted but before we begin waiting for
// the condition variable, we will miss the notification. The timeout of 5
// means that if this happens we will delay operation for 5 seconds.
//
// If we're trying to exit (WorkThread::request_exit_on_idle), do not
// bother waiting.
while (!HasState(queues)) {
while (!HasState(queues) && !WorkThread::request_exit_on_idle) {
std::unique_lock<std::mutex> l(m);
cv.wait_for(l, std::chrono::seconds(5));
}
@ -132,7 +136,8 @@ public:
// Get the first element from the queue without blocking. Returns a null
// value if the queue is empty.
optional<T> TryDequeue() {
template<typename TAction>
optional<T> TryDequeuePlusAction(TAction action) {
std::lock_guard<std::mutex> lock(mutex_);
if (priority_.empty() && queue_.empty())
return nullopt;
@ -145,9 +150,16 @@ public:
auto val = std::move(queue_.front());
queue_.pop();
action(val);
return std::move(val);
}
optional<T> TryDequeue() {
return TryDequeuePlusAction([](const T&) {});
}
private:
std::queue<T> priority_;
mutable std::mutex mutex_;

28
src/work_thread.cc Normal file
View File

@ -0,0 +1,28 @@
#include "work_thread.h"
#include "platform.h"
std::atomic<int> WorkThread::num_active_threads;
std::atomic<bool> WorkThread::request_exit_on_idle;
// static
void WorkThread::StartThread(
const std::string& thread_name,
const std::function<Result()>& entry_point) {
new std::thread([thread_name, entry_point]() {
SetCurrentThreadName(thread_name);
++num_active_threads;
// Main loop.
while (true) {
Result result = entry_point();
if (result == Result::ExitThread)
break;
if (request_exit_on_idle && result == Result::NoWork)
break;
}
--num_active_threads;
});
}

31
src/work_thread.h Normal file
View File

@ -0,0 +1,31 @@
#pragma once
#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
// Helper methods for starting threads that do some work. Enables test code to
// wait for all work to complete.
struct WorkThread {
enum class Result {
MoreWork,
NoWork,
ExitThread
};
// The number of active worker threads.
static std::atomic<int> num_active_threads;
// Set to true to request all work thread instances to exit.
static std::atomic<bool> request_exit_on_idle;
// Launch a new thread. |entry_point| will be called continously. It should
// return true if it there is still known work to be done.
static void StartThread(
const std::string& thread_name,
const std::function<Result()>& entry_point);
// Static-only class.
WorkThread() = delete;
};

View File

@ -1,7 +1,12 @@
import json
import re
import shlex
from subprocess import Popen, PIPE
CQUERY_PATH = 'x64/Debug/cquery.exe'
CACHE_DIR = 'e2e_CACHE'
# Content-Length: ...\r\n
# \r\n
# {
@ -22,15 +27,33 @@ from subprocess import Popen, PIPE
class TestBuilder:
def __init__(self):
self.files = []
self.sent = []
self.received = []
self.expected = []
def WithFile(self, filename, contents):
def IndexFile(self, path, contents):
"""
Writes the file contents to disk so that the language server can access it.
"""
self.files.append((filename, contents))
self.Send({
'method': '$cquery/indexFile',
'params': {
'path': path,
'contents': contents,
'args': [
'-xc++',
'-std=c++11',
'-isystemC:/Program Files (x86)/Microsoft Visual Studio/2017/Community/VC/Tools/MSVC/14.10.25017/include',
'-isystemC:/Program Files (x86)/Windows Kits/10/Include/10.0.15063.0/ucrt'
]
}
})
return self
def WaitForIdle(self):
"""
Blocks the querydb thread until any active imports are complete.
"""
self.Send({'method': '$cquery/queryDbWaitForIdleIndexer'})
return self
def Send(self, stdin):
@ -41,11 +64,12 @@ class TestBuilder:
self.sent.append(stdin)
return self
def Expect(self, stdout):
def Expect(self, expected):
"""
Expect a message from the language server.
"""
self.received.append(stdout)
expected['jsonrpc'] = '2.0'
self.expected.append(expected)
return self
def SetupCommonInit(self):
@ -57,15 +81,45 @@ class TestBuilder:
'method': 'initialize',
'params': {
'processId': 123,
'rootPath': 'cquery',
'rootUri': 'cquery',
'capabilities': {},
'trace': 'off'
'trace': 'off',
'initializationOptions': {
'cacheDirectory': CACHE_DIR,
'clientVersion': -1 # Disables the check
}
}
})
self.Expect({
'id': 0,
'method': 'initialized',
'result': {}
'result': {
'capabilities': {
'textDocumentSync': 2,
'hoverProvider': True,
'completionProvider': {
'resolveProvider': False,
'triggerCharacters': [ '.', ':', '>', '#' ]
},
'signatureHelpProvider': {
'triggerCharacters': [ '(', ',' ]
},
'definitionProvider': True,
'referencesProvider': True,
'documentHighlightProvider': True,
'documentSymbolProvider': True,
'workspaceSymbolProvider': True,
'codeActionProvider': True,
'codeLensProvider': {
'resolveProvider': False
},
'documentFormattingProvider': False,
'documentRangeFormattingProvider': False,
'renameProvider': True,
'documentLinkProvider': {
'resolveProvider': False
}
}
}
})
return self
@ -79,32 +133,76 @@ def _ExecuteTest(name, func):
if not isinstance(test_builder, TestBuilder):
raise Exception('%s does not return a TestBuilder instance' % name)
test_builder.Send({ 'method': 'exit' })
# Possible test runner implementation
cmd = "x64/Debug/indexer.exe --language-server"
process = Popen(shlex.split(cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
# Add a final exit message.
test_builder.Send({ 'method': '$cquery/exitWhenIdle' })
# Convert messages to a stdin byte array.
stdin = ''
for message in test_builder.sent:
payload = json.dumps(message)
wrapped = 'Content-Length: %s\r\n\r\n%s' % (len(payload), payload)
stdin += wrapped
stdin_bytes = stdin.encode(encoding='UTF-8')
print('## %s ##' % name)
print('== STDIN ==')
print(stdin)
(stdout, stderr) = process.communicate(stdin)
if stdout:
print('== STDOUT ==')
print(stdout)
if stderr:
print('== STDERR ==')
print(stderr)
# Finds all messages in |string| by parsing Content-Length headers.
def GetMessages(string):
messages = []
for match in re.finditer('Content-Length: (\d+)\r\n\r\n', string):
start = match.span()[1]
length = int(match.groups()[0])
message = string[start:start + length]
messages.append(json.loads(message))
return messages
# TODO: Actually verify stdout.
# Utility method to print a byte array.
def PrintByteArray(bytes):
for line in bytes.split(b'\r\n'):
print(line.decode('utf8'))
# Execute program.
cmd = "%s --language-server" % CQUERY_PATH
process = Popen(shlex.split(cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = process.communicate(stdin_bytes)
exit_code = process.wait();
# Check if test succeeded.
actual = GetMessages(stdout.decode('utf8'))
success = actual == test_builder.expected
# Print failure messages.
if success:
print('== Passed %s with exit_code=%s ==' % (name, exit_code))
else:
print('== FAILED %s with exit_code=%s ==' % (name, exit_code))
print('## STDIN:')
for message in GetMessages(stdin):
print(json.dumps(message, indent=True))
if stdout:
print('## STDOUT:')
for message in GetMessages(stdout.decode('utf8')):
print(json.dumps(message, indent=True))
if stderr:
print('## STDERR:')
PrintByteArray(stderr)
print('## Expected output')
for message in test_builder.expected:
print(message)
print('## Actual output')
for message in actual:
print(message)
print('## Difference')
common_end = min(len(test_builder.expected), len(actual))
for i in range(0, common_end):
if test_builder.expected[i] != actual[i]:
print('i=%s' % i)
print('- Expected %s' % str(test_builder.expected[i]))
print('- Actual %s' % str(actual[i]))
for i in range(common_end, len(test_builder.expected)):
print('Extra expected: %s' % str(test_builder.expected[i]))
for i in range(common_end, len(actual)):
print('Extra actual: %s' % str(actual[i]))
exit_code = process.wait()
def _DiscoverTests():
"""
@ -122,12 +220,16 @@ def _RunTests():
Executes all tests.
"""
for name, func in _DiscoverTests():
print('Running test function %s' % name)
_ExecuteTest(name, func)
#### EXAMPLE TESTS ####
class lsSymbolKind:
Function = 1
@ -138,24 +240,32 @@ def lsSymbolInfo(name, position, kind):
'kind': kind
}
def Test_Init():
def DISABLED_Test_Init():
return (TestBuilder()
.SetupCommonInit()
)
def _Test_Outline():
def Test_Outline():
return (TestBuilder()
.SetupCommonInit()
.WithFile("foo.cc",
"""
void main() {}
"""
)
# .IndexFile("file:///C%3A/Users/jacob/Desktop/cquery/foo.cc",
.IndexFile("foo.cc",
"""void foobar();""")
.WaitForIdle()
.Send({
'id': 1,
'method': 'textDocument/documentSymbol',
'params': {}
'params': {
'textDocument': {
'uri': 'C:/Users/jacob/Desktop/cquery/foo.cc'
}
}
})
# .Expect({
# 'jsonrpc': '2.0',
# 'id': 1,
# 'error': {'code': -32603, 'message': 'Unable to find file '}
# }))
.Expect({
'id': 1,
'result': [
@ -165,4 +275,4 @@ def _Test_Outline():
if __name__ == '__main__':
_RunTests()
_RunTests()