diff --git a/src/command_line.cc b/src/command_line.cc index f4519337..0fbd2a3a 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -20,6 +20,7 @@ #include "test.h" #include "timer.h" #include "threaded_queue.h" +#include "work_thread.h" #include "working_files.h" #include @@ -189,7 +190,7 @@ bool FindFileOrFail(QueryDatabase* db, lsRequestId id, const std::string& absolu if (out_file_id) *out_file_id = QueryFileId((size_t)-1); - LOG_S(INFO) << "Unable to find file " << absolute_path; + LOG_S(INFO) << "Unable to find file \"" << absolute_path << "\""; Out_Error out; out.id = id; @@ -541,9 +542,10 @@ struct Index_Request { std::string path; std::vector args; // TODO: make this a string that is parsed lazily. bool is_interactive; + optional contents; // Preloaded contents. Useful for tests. - Index_Request(const std::string& path, const std::vector& args, bool is_interactive) - : path(path), args(args), is_interactive(is_interactive) {} + Index_Request(const std::string& path, const std::vector& args, bool is_interactive, optional contents) + : path(path), args(args), is_interactive(is_interactive), contents(contents) {} }; struct Index_DoIdMap { @@ -652,6 +654,15 @@ struct QueueManager { Index_OnIndexedQueue on_indexed; QueueManager(MultiQueueWaiter* waiter) : index_request(waiter), do_id_map(waiter), load_previous_index(waiter), on_id_mapped(waiter), on_indexed(waiter) {} + + bool HasWork() { + return + !index_request.IsEmpty() || + !do_id_map.IsEmpty() || + !load_previous_index.IsEmpty() || + !on_id_mapped.IsEmpty() || + !on_indexed.IsEmpty(); + } }; void RegisterMessageTypes() { @@ -684,6 +695,9 @@ void RegisterMessageTypes() { MessageRegistry::instance()->Register(); MessageRegistry::instance()->Register(); MessageRegistry::instance()->Register(); + MessageRegistry::instance()->Register(); + MessageRegistry::instance()->Register(); + MessageRegistry::instance()->Register(); } @@ -726,6 +740,12 @@ struct ImportManager { import_.erase(path); } + // Returns true if there any any files currently being imported. + bool HasActiveImports() { + std::lock_guard guard(mutex_); + return !import_.empty(); + } + std::mutex mutex_; std::unordered_set import_; }; @@ -832,7 +852,8 @@ std::vector DoParseFile( CacheLoader* cache_loader, bool is_interactive, const std::string& path, - const std::vector& args) { + const std::vector& args, + const optional& contents) { std::vector result; IndexFile* previous_index = cache_loader->TryLoad(path); @@ -904,6 +925,10 @@ std::vector DoParseFile( // well. We then default to a fast file-copy if not in working set. bool loaded_primary = false; std::vector file_contents; + if (contents) { + loaded_primary = loaded_primary || contents->path == path; + file_contents.push_back(*contents); + } for (const auto& it : cache_loader->caches) { const std::unique_ptr& index = it.second; assert(index); @@ -930,6 +955,7 @@ std::vector DoParseFile( config, file_consumer_shared, path, args, file_contents, &perf, index); + // LOG_S(INFO) << "Parsing " << path << " gave " << indexes.size() << " indexes"; for (std::unique_ptr& new_index : indexes) { Timer time; @@ -964,7 +990,12 @@ std::vector ParseFile( FileConsumer::SharedState* file_consumer_shared, TimestampManager* timestamp_manager, bool is_interactive, - const Project::Entry& entry) { + const Project::Entry& entry, + const optional& contents) { + + optional file_contents; + if (contents) + file_contents = FileContents(entry.filename, *contents); CacheLoader cache_loader(config); @@ -973,7 +1004,7 @@ std::vector ParseFile( // complain about if indexed by itself. IndexFile* entry_cache = cache_loader.TryLoad(entry.filename); std::string tu_path = entry_cache ? entry_cache->import_file : entry.filename; - return DoParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, &cache_loader, is_interactive, tu_path, entry.args); + return DoParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, &cache_loader, is_interactive, tu_path, entry.args, file_contents); } bool IndexMain_DoParse( @@ -985,18 +1016,28 @@ bool IndexMain_DoParse( TimestampManager* timestamp_manager, clang::Index* index) { - optional request = queue->index_request.TryDequeue(); - if (!request) - return false; - - if (!import_manager->StartImport(request->path)) + bool can_import = false; + optional request = queue->index_request.TryDequeuePlusAction([&](const Index_Request& request) { + can_import = import_manager->StartImport(request.path); + }); + if (!request || !can_import) return false; Project::Entry entry; entry.filename = request->path; entry.args = request->args; - std::vector responses = ParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, request->is_interactive, entry); + std::vector responses = ParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, request->is_interactive, entry, request->contents); + // Unmark file as imported if indexing failed. + bool found_import = false; + for (auto& response : responses) { + if (response.current->path == request->path) + found_import = true; + } + if (!found_import) + import_manager->DoneImport(request->path); + + // Don't bother sending an IdMap request if there are no responses. if (responses.empty()) return false; @@ -1100,48 +1141,48 @@ bool IndexMergeIndexUpdates(QueueManager* queue) { } } -void IndexMain(Config* config, - FileConsumer::SharedState* file_consumer_shared, - ImportManager* import_manager, - TimestampManager* timestamp_manager, - Project* project, - WorkingFiles* working_files, - MultiQueueWaiter* waiter, - QueueManager* queue) { - SetCurrentThreadName("indexer"); +WorkThread::Result IndexMain( + Config* config, + FileConsumer::SharedState* file_consumer_shared, + ImportManager* import_manager, + TimestampManager* timestamp_manager, + Project* project, + WorkingFiles* working_files, + MultiQueueWaiter* waiter, + QueueManager* queue) { // TODO: dispose of index after it is not used for a while. clang::Index index(1, 0); - while (true) { - // TODO: process all off IndexMain_DoIndex before calling - // IndexMain_DoCreateIndexUpdate for - // better icache behavior. We need to have some threads spinning on - // both though - // otherwise memory usage will get bad. + // TODO: process all off IndexMain_DoIndex before calling + // IndexMain_DoCreateIndexUpdate for + // better icache behavior. We need to have some threads spinning on + // both though + // otherwise memory usage will get bad. - // We need to make sure to run both IndexMain_DoParse and - // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any - // work. Running both also lets the user query the partially constructed - // index. - bool did_parse = IndexMain_DoParse(config, working_files, queue, file_consumer_shared, import_manager, timestamp_manager, &index); + // We need to make sure to run both IndexMain_DoParse and + // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any + // work. Running both also lets the user query the partially constructed + // index. + bool did_parse = IndexMain_DoParse(config, working_files, queue, file_consumer_shared, import_manager, timestamp_manager, &index); - bool did_create_update = - IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager); + bool did_create_update = + IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager); - bool did_load_previous = IndexMain_LoadPreviousIndex(config, queue); + bool did_load_previous = IndexMain_LoadPreviousIndex(config, queue); - // Nothing to index and no index updates to create, so join some already - // created index updates to reduce work on querydb thread. - bool did_merge = false; - if (!did_parse && !did_create_update && !did_load_previous) - did_merge = IndexMergeIndexUpdates(queue); + // Nothing to index and no index updates to create, so join some already + // created index updates to reduce work on querydb thread. + bool did_merge = false; + if (!did_parse && !did_create_update && !did_load_previous) + did_merge = IndexMergeIndexUpdates(queue); - // We didn't do any work, so wait for a notification. - if (!did_parse && !did_create_update && !did_merge && !did_load_previous) { - waiter->Wait( - {&queue->index_request, &queue->on_id_mapped, &queue->load_previous_index, &queue->on_indexed}); - } + // We didn't do any work, so wait for a notification. + if (!did_parse && !did_create_update && !did_merge && !did_load_previous) { + waiter->Wait( + {&queue->index_request, &queue->on_id_mapped, &queue->load_previous_index, &queue->on_indexed}); } + + return queue->HasWork() ? WorkThread::Result::MoreWork : WorkThread::Result::NoWork; } bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import_manager, QueueManager* queue, WorkingFiles* working_files) { @@ -1157,8 +1198,8 @@ bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import // it, load the previous state from disk and rerun IdMap logic later. Do not // do this if we have already attempted in the past. if (!request->load_previous && - !request->previous && - db->usr_to_file.find(LowerPathIfCaseInsensitive(request->current->path)) != db->usr_to_file.end()) { + !request->previous && + db->usr_to_file.find(LowerPathIfCaseInsensitive(request->current->path)) != db->usr_to_file.end()) { assert(!request->load_previous); request->load_previous = true; queue->load_previous_index.Enqueue(std::move(*request)); @@ -1299,6 +1340,7 @@ bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import bool QueryDbMainLoop( Config* config, QueryDatabase* db, + bool* exit_when_idle, MultiQueueWaiter* waiter, QueueManager* queue, Project* project, @@ -1336,14 +1378,15 @@ bool QueryDbMainLoop( << " with uri " << request->params.rootUri->raw_uri; if (!request->params.initializationOptions) { - LOG_S(INFO) << "Initialization parameters (particularily cacheDirectory) are required"; + LOG_S(FATAL) << "Initialization parameters (particularily cacheDirectory) are required"; exit(1); } *config = *request->params.initializationOptions; // Check client version. - if (config->clientVersion != kExpectedClientVersion) { + if (config->clientVersion != kExpectedClientVersion && + config->clientVersion != -1 /*disable check*/) { Out_ShowLogMessage out; out.display_type = Out_ShowLogMessage::DisplayType::Show; out.params.type = lsMessageType::Error; @@ -1375,8 +1418,8 @@ bool QueryDbMainLoop( } std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl; for (int i = 0; i < config->indexerCount; ++i) { - new std::thread([&]() { - IndexMain(config, file_consumer_shared, import_manager, timestamp_manager, project, working_files, waiter, queue); + WorkThread::StartThread("indexer" + std::to_string(i), [&]() { + return IndexMain(config, file_consumer_shared, import_manager, timestamp_manager, project, working_files, waiter, queue); }); } @@ -1395,7 +1438,7 @@ bool QueryDbMainLoop( // << "] Dispatching index request for file " << entry.filename // << std::endl; bool is_interactive = working_files->GetFileByFilename(entry.filename) != nullptr; - queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive)); + queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive, nullopt)); }); // We need to support multiple concurrent index processes. @@ -1459,7 +1502,7 @@ bool QueryDbMainLoop( LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1) << "] Dispatching index request for file " << entry.filename; bool is_interactive = working_files->GetFileByFilename(entry.filename) != nullptr; - queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive)); + queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, is_interactive, nullopt)); }); break; } @@ -1674,7 +1717,7 @@ bool QueryDbMainLoop( // Submit new index request. const Project::Entry& entry = project->FindCompilationEntryForFile(path); - queue->index_request.PriorityEnqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/)); + queue->index_request.PriorityEnqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/, nullopt)); break; } @@ -1719,7 +1762,7 @@ bool QueryDbMainLoop( // if so, ignore that index response. // TODO: send as priority request Project::Entry entry = project->FindCompilationEntryForFile(path); - queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/)); + queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, true /*is_interactive*/, nullopt)); clang_complete->NotifySave(path); @@ -2572,6 +2615,45 @@ bool QueryDbMainLoop( break; } + case IpcId::CqueryIndexFile: { + auto msg = static_cast(message.get()); + queue->index_request.Enqueue(Index_Request( + NormalizePath(msg->params.path), + msg->params.args, + msg->params.is_interactive, + msg->params.contents)); + break; + } + + case IpcId::CqueryQueryDbWaitForIdleIndexer: { + auto msg = static_cast(message.get()); + LOG_S(INFO) << "Waiting for idle"; + int idle_count = 0; + while (true) { + bool has_work = false; + has_work |= import_manager->HasActiveImports(); + has_work |= queue->HasWork(); + has_work |= QueryDb_ImportMain(config, db, import_manager, queue, working_files); + if (!has_work) + ++idle_count; + else + idle_count = 0; + + // There are race conditions between each of the three checks above, + // so we retry a bunch of times to try to avoid any. + if (idle_count > 10) + break; + } + LOG_S(INFO) << "Done waiting for idle"; + break; + } + + case IpcId::CqueryExitWhenIdle: { + *exit_when_idle = true; + WorkThread::request_exit_on_idle = true; + break; + } + default: { LOG_S(INFO) << "[querydb] Unhandled IPC message " << IpcIdToString(message->method_id); exit(1); @@ -2588,10 +2670,8 @@ bool QueryDbMainLoop( return did_work; } -void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) { - // Create queues. - QueueManager queue(waiter); - +void RunQueryDbThread(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter, QueueManager* queue) { + bool exit_when_idle = false; Project project; WorkingFiles working_files; ClangCompleteManager clang_complete( @@ -2610,18 +2690,22 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* QueryDatabase db; while (true) { bool did_work = QueryDbMainLoop( - config, &db, waiter, &queue, + config, &db, &exit_when_idle, waiter, queue, &project, &file_consumer_shared, &import_manager, ×tamp_manager, &working_files, &clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); + // No more work left and exit request. Exit. + if (!did_work && exit_when_idle && WorkThread::num_active_threads == 0) + exit(0); + // Cleanup and free any unused memory. FreeUnusedMemory(); if (!did_work) { waiter->Wait({ IpcManager::instance()->threaded_queue_for_server_.get(), - &queue.do_id_map, - &queue.on_indexed + &queue->do_id_map, + &queue->on_indexed }); } } @@ -2688,8 +2772,6 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* - -// TODO: global lock on stderr output. // Separate thread whose only job is to read from stdin and // dispatch read commands to the actual indexer program. This @@ -2697,16 +2779,16 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* // blocks. // // |ipc| is connected to a server. -void LanguageServerStdinLoop(Config* config, std::unordered_map* request_times) { - IpcManager* ipc = IpcManager::instance(); +void LaunchStdinLoop(Config* config, std::unordered_map* request_times) { + + WorkThread::StartThread("stdin", [config, request_times]() { + IpcManager* ipc = IpcManager::instance(); - SetCurrentThreadName("stdin"); - while (true) { std::unique_ptr message = MessageRegistry::instance()->ReadMessageFromStdin(); // Message parsing can fail if we don't recognize the method. if (!message) - continue; + return WorkThread::Result::MoreWork; (*request_times)[message->method_id] = Timer(); @@ -2722,8 +2804,21 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map* r break; } + case IpcId::Exit: { + exit(0); + break; + } + + case IpcId::CqueryExitWhenIdle: { + // querydb needs to know to exit when idle. We return out of the stdin + // loop to exit the thread. If we keep parsing input stdin is likely + // closed so cquery will exit. + LOG_S(INFO) << "cquery will exit when all threads are idle"; + ipc->SendMessage(IpcManager::Destination::Server, std::move(message)); + return WorkThread::Result::ExitThread; + } + case IpcId::Initialize: - case IpcId::Exit: case IpcId::TextDocumentDidOpen: case IpcId::TextDocumentDidChange: case IpcId::TextDocumentDidClose: @@ -2747,7 +2842,9 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map* r case IpcId::CqueryVars: case IpcId::CqueryCallers: case IpcId::CqueryBase: - case IpcId::CqueryDerived: { + case IpcId::CqueryDerived: + case IpcId::CqueryIndexFile: + case IpcId::CqueryQueryDbWaitForIdleIndexer: { ipc->SendMessage(IpcManager::Destination::Server, std::move(message)); break; } @@ -2757,7 +2854,9 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map* r exit(1); } } - } + + return WorkThread::Result::MoreWork; + }); } @@ -2805,15 +2904,14 @@ void LanguageServerStdinLoop(Config* config, std::unordered_map* r -void StdoutMain(std::unordered_map* request_times, MultiQueueWaiter* waiter) { - SetCurrentThreadName("stdout"); - IpcManager* ipc = IpcManager::instance(); +void LaunchStdoutThread(std::unordered_map* request_times, MultiQueueWaiter* waiter, QueueManager* queue) { + WorkThread::StartThread("stdout", [=]() { + IpcManager* ipc = IpcManager::instance(); - while (true) { std::vector> messages = ipc->GetMessages(IpcManager::Destination::Client); if (messages.empty()) { - waiter->Wait({ipc->threaded_queue_for_client_.get()}); - continue; + waiter->Wait({ ipc->threaded_queue_for_client_.get() }); + return queue->HasWork() ? WorkThread::Result::MoreWork : WorkThread::Result::NoWork; } for (auto& message : messages) { @@ -2839,26 +2937,24 @@ void StdoutMain(std::unordered_map* request_times, MultiQueueWaite } } } - } + + return WorkThread::Result::MoreWork; + }); } void LanguageServerMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) { + QueueManager queue(waiter); std::unordered_map request_times; - // Start stdin reader. Reading from stdin is a blocking operation so this - // needs a dedicated thread. - new std::thread([&]() { - LanguageServerStdinLoop(config, &request_times); - }); - - // Start querydb thread. querydb will start indexer threads as needed. - new std::thread([&]() { - QueryDbMain(bin_name, config, waiter); - }); + LaunchStdinLoop(config, &request_times); // We run a dedicated thread for writing to stdout because there can be an // unknown number of delays when output information. - StdoutMain(&request_times, waiter); + LaunchStdoutThread(&request_times, waiter, &queue); + + // Start querydb which takes over this thread. The querydb will launch + // indexer threads as needed. + RunQueryDbThread(bin_name, config, waiter, &queue); } diff --git a/src/include_complete.cc b/src/include_complete.cc index cf120747..e3362d43 100644 --- a/src/include_complete.cc +++ b/src/include_complete.cc @@ -5,6 +5,7 @@ #include "project.h" #include "standard_includes.h" #include "timer.h" +#include "work_thread.h" #include @@ -114,8 +115,7 @@ void IncludeComplete::Rescan() { match_ = MakeUnique(config_->includeCompletionWhitelist, config_->includeCompletionBlacklist); is_scanning = true; - new std::thread([this]() { - SetCurrentThreadName("include_scanner"); + WorkThread::StartThread("include_scanner", [this]() { Timer timer; InsertStlIncludes(); @@ -127,6 +127,8 @@ void IncludeComplete::Rescan() { timer.ResetAndPrint("[perf] Scanning for includes"); is_scanning = false; + + return WorkThread::Result::ExitThread; }); } diff --git a/src/ipc.cc b/src/ipc.cc index ba03cc1c..60db889a 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -71,6 +71,14 @@ const char* IpcIdToString(IpcId id) { case IpcId::Cout: return "$cout"; + + case IpcId::CqueryIndexFile: + return "$cquery/indexFile"; + case IpcId::CqueryQueryDbWaitForIdleIndexer: + return "$cquery/queryDbWaitForIdleIndexer"; + case IpcId::CqueryExitWhenIdle: + return "$cquery/exitWhenIdle"; + default: assert(false && "missing IpcId string name"); exit(1); diff --git a/src/ipc.h b/src/ipc.h index 2a89ee7d..a0811fd3 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -46,7 +46,14 @@ enum class IpcId : int { CqueryDerived, // Show all derived types/methods. // Internal implementation detail. - Cout + Cout, + + // Index the given file contents. Used in tests. + CqueryIndexFile, + // Make querydb wait for the indexer to be idle. Used in tests. + CqueryQueryDbWaitForIdleIndexer, + // Exit after all messages have been read/processes. Used in tests. + CqueryExitWhenIdle }; MAKE_ENUM_HASHABLE(IpcId) MAKE_REFLECT_TYPE_PROXY(IpcId, int) @@ -68,4 +75,28 @@ struct Ipc_Cout : public IpcMessage { std::string content; IpcId original_ipc_id; }; -MAKE_REFLECT_STRUCT(Ipc_Cout, content); \ No newline at end of file +MAKE_REFLECT_STRUCT(Ipc_Cout, content); + +struct Ipc_CqueryIndexFile : public IpcMessage { + static constexpr IpcId kIpcId = IpcId::CqueryIndexFile; + + struct Params { + std::string path; + std::vector args; + bool is_interactive = false; + std::string contents; + }; + Params params; +}; +MAKE_REFLECT_STRUCT(Ipc_CqueryIndexFile::Params, path, args, is_interactive, contents); +MAKE_REFLECT_STRUCT(Ipc_CqueryIndexFile, params); + +struct Ipc_CqueryQueryDbWaitForIdleIndexer : public IpcMessage { + static constexpr IpcId kIpcId = IpcId::CqueryQueryDbWaitForIdleIndexer; +}; +MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryQueryDbWaitForIdleIndexer); + +struct Ipc_CqueryExitWhenIdle : public IpcMessage { + static constexpr IpcId kIpcId = IpcId::CqueryExitWhenIdle; +}; +MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryExitWhenIdle); \ No newline at end of file diff --git a/src/language_server_api.cc b/src/language_server_api.cc index 08d3c062..9fa9dd7c 100644 --- a/src/language_server_api.cc +++ b/src/language_server_api.cc @@ -1,5 +1,8 @@ #include "language_server_api.h" +#include +#include + void Reflect(Writer& visitor, lsRequestId& value) { assert(value.id0.has_value() || value.id1.has_value()); @@ -22,55 +25,113 @@ void Reflect(Reader& visitor, lsRequestId& id) { MessageRegistry* MessageRegistry::instance_ = nullptr; -std::unique_ptr MessageRegistry::ReadMessageFromStdin() { - int content_length = -1; - int iteration = 0; +// Reads a JsonRpc message. |read| returns the next input character. +optional ReadJsonRpcContentFrom(std::function()> read) { + // Read the content length. It is terminated by the "\r\n" sequence. + int exit_seq = 0; + std::string stringified_content_length; while (true) { - if (++iteration > 10) { - assert(false && "bad parser state"); - exit(1); + optional opt_c = read(); + if (!opt_c) { + LOG_S(INFO) << "No more input when reading content length header"; + return nullopt; } + char c = *opt_c; - std::string line; - std::getline(std::cin, line); + if (exit_seq == 0 && c == '\r') ++exit_seq; + if (exit_seq == 1 && c == '\n') break; + + stringified_content_length += c; + } + constexpr char* kContentLengthStart = "Content-Length: "; + assert(StartsWith(stringified_content_length, kContentLengthStart)); + int content_length = atoi(stringified_content_length.c_str() + strlen(kContentLengthStart)); - // No content; end of stdin. - if (line.empty()) { - std::cerr << "stdin closed; exiting" << std::endl; - exit(0); - } - - // std::cin >> line; - // std::cerr << "Read line " << line; - - if (line.compare(0, 14, "Content-Length") == 0) { - content_length = atoi(line.c_str() + 16); - } - - if (line == "\r") - break; + // There is always a "\r\n" sequence before the actual content. + auto expect_char = [&](char expected) { + optional opt_c = read(); + return opt_c && *opt_c == expected; + }; + if (!expect_char('\r') || !expect_char('\n')) { + LOG_S(INFO) << "Unexpected token (expected \r\n sequence)"; + return nullopt; } - // bad input that is not a message. - if (content_length < 0) { - std::cerr << "parsing command failed (no Content-Length header)" - << std::endl; - return nullptr; - } - - // TODO: maybe use std::cin.read(c, content_length) + // Read content. std::string content; content.reserve(content_length); - for (int i = 0; i < content_length; ++i) { - char c; - std::cin.read(&c, 1); - content += c; + for (size_t i = 0; i < content_length; ++i) { + optional c = read(); + if (!c) { + LOG_S(INFO) << "No more input when reading content body"; + return nullopt; + } + content += *c; + } + + return content; +} + +TEST_SUITE("FindIncludeLine"); + +auto MakeContentReader(std::string* content, bool can_be_empty) { + return [=]() -> optional { + if (!can_be_empty) + REQUIRE(!content->empty()); + if (content->empty()) + return nullopt; + char c = (*content)[0]; + content->erase(content->begin()); + return c; + }; +} + +TEST_CASE("ReadContentFromSource") { + auto parse_correct = [](std::string content) { + auto reader = MakeContentReader(&content, false /*can_be_empty*/); + auto got = ReadJsonRpcContentFrom(reader); + REQUIRE(got); + return got.value(); + }; + + auto parse_incorrect = [](std::string content) { + auto reader = MakeContentReader(&content, true /*can_be_empty*/); + return ReadJsonRpcContentFrom(reader); + }; + + REQUIRE(parse_correct("Content-Length: 0\r\n\r\n") == ""); + REQUIRE(parse_correct("Content-Length: 1\r\n\r\na") == "a"); + REQUIRE(parse_correct("Content-Length: 4\r\n\r\nabcd") == "abcd"); + + REQUIRE(parse_incorrect("ggg") == optional()); + REQUIRE(parse_incorrect("Content-Length: 0\r\n") == optional()); + REQUIRE(parse_incorrect("Content-Length: 5\r\n\r\nab") == optional()); +} + +TEST_SUITE_END(); + +optional ReadCharFromStdinBlocking() { + // Bad stdin means parent process has probably exited. Either way, cquery + // can no longer be communicated with so just exit. + if (!std::cin.good()) { + LOG_S(FATAL) << "std::cin.good() is false; exiting"; + exit(1); } - //std::cerr << content.c_str() << std::endl; + char c = 0; + std::cin.read(&c, 1); + return c; +} + +std::unique_ptr MessageRegistry::ReadMessageFromStdin() { + optional content = ReadJsonRpcContentFrom(&ReadCharFromStdinBlocking); + if (!content) { + LOG_S(FATAL) << "Failed to read JsonRpc input; exiting"; + exit(1); + } rapidjson::Document document; - document.Parse(content.c_str(), content_length); + document.Parse(content->c_str(), content->length()); assert(!document.HasParseError()); return Parse(document); @@ -86,7 +147,7 @@ std::unique_ptr MessageRegistry::Parse(Reader& visitor) { ReflectMember(visitor, "method", method); if (allocators.find(method) == allocators.end()) { - std::cerr << "Unable to find registered handler for method \"" << method << "\"" << std::endl; + LOG_S(ERROR) << "Unable to find registered handler for method \"" << method << "\"" << std::endl; return nullptr; } diff --git a/src/threaded_queue.h b/src/threaded_queue.h index 93f7a1e3..8df109f7 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -1,5 +1,6 @@ #pragma once +#include "work_thread.h" #include #include @@ -38,8 +39,11 @@ struct MultiQueueWaiter { // HasState() is called data gets posted but before we begin waiting for // the condition variable, we will miss the notification. The timeout of 5 // means that if this happens we will delay operation for 5 seconds. + // + // If we're trying to exit (WorkThread::request_exit_on_idle), do not + // bother waiting. - while (!HasState(queues)) { + while (!HasState(queues) && !WorkThread::request_exit_on_idle) { std::unique_lock l(m); cv.wait_for(l, std::chrono::seconds(5)); } @@ -132,7 +136,8 @@ public: // Get the first element from the queue without blocking. Returns a null // value if the queue is empty. - optional TryDequeue() { + template + optional TryDequeuePlusAction(TAction action) { std::lock_guard lock(mutex_); if (priority_.empty() && queue_.empty()) return nullopt; @@ -145,9 +150,16 @@ public: auto val = std::move(queue_.front()); queue_.pop(); + + action(val); + return std::move(val); } + optional TryDequeue() { + return TryDequeuePlusAction([](const T&) {}); + } + private: std::queue priority_; mutable std::mutex mutex_; diff --git a/src/work_thread.cc b/src/work_thread.cc new file mode 100644 index 00000000..f6b905b4 --- /dev/null +++ b/src/work_thread.cc @@ -0,0 +1,28 @@ +#include "work_thread.h" + +#include "platform.h" + +std::atomic WorkThread::num_active_threads; +std::atomic WorkThread::request_exit_on_idle; + +// static +void WorkThread::StartThread( + const std::string& thread_name, + const std::function& entry_point) { + new std::thread([thread_name, entry_point]() { + SetCurrentThreadName(thread_name); + + ++num_active_threads; + + // Main loop. + while (true) { + Result result = entry_point(); + if (result == Result::ExitThread) + break; + if (request_exit_on_idle && result == Result::NoWork) + break; + } + + --num_active_threads; + }); +} \ No newline at end of file diff --git a/src/work_thread.h b/src/work_thread.h new file mode 100644 index 00000000..6a80c547 --- /dev/null +++ b/src/work_thread.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include + +// Helper methods for starting threads that do some work. Enables test code to +// wait for all work to complete. +struct WorkThread { + enum class Result { + MoreWork, + NoWork, + ExitThread + }; + + // The number of active worker threads. + static std::atomic num_active_threads; + // Set to true to request all work thread instances to exit. + static std::atomic request_exit_on_idle; + + // Launch a new thread. |entry_point| will be called continously. It should + // return true if it there is still known work to be done. + static void StartThread( + const std::string& thread_name, + const std::function& entry_point); + + // Static-only class. + WorkThread() = delete; +}; \ No newline at end of file diff --git a/test_runner_e2e.py b/test_runner_e2e.py index b1c350d0..617a3397 100644 --- a/test_runner_e2e.py +++ b/test_runner_e2e.py @@ -1,7 +1,12 @@ import json +import re import shlex from subprocess import Popen, PIPE + +CQUERY_PATH = 'x64/Debug/cquery.exe' +CACHE_DIR = 'e2e_CACHE' + # Content-Length: ...\r\n # \r\n # { @@ -22,15 +27,33 @@ from subprocess import Popen, PIPE class TestBuilder: def __init__(self): - self.files = [] self.sent = [] - self.received = [] + self.expected = [] - def WithFile(self, filename, contents): + def IndexFile(self, path, contents): """ Writes the file contents to disk so that the language server can access it. """ - self.files.append((filename, contents)) + self.Send({ + 'method': '$cquery/indexFile', + 'params': { + 'path': path, + 'contents': contents, + 'args': [ + '-xc++', + '-std=c++11', + '-isystemC:/Program Files (x86)/Microsoft Visual Studio/2017/Community/VC/Tools/MSVC/14.10.25017/include', + '-isystemC:/Program Files (x86)/Windows Kits/10/Include/10.0.15063.0/ucrt' + ] + } + }) + return self + + def WaitForIdle(self): + """ + Blocks the querydb thread until any active imports are complete. + """ + self.Send({'method': '$cquery/queryDbWaitForIdleIndexer'}) return self def Send(self, stdin): @@ -41,11 +64,12 @@ class TestBuilder: self.sent.append(stdin) return self - def Expect(self, stdout): + def Expect(self, expected): """ Expect a message from the language server. """ - self.received.append(stdout) + expected['jsonrpc'] = '2.0' + self.expected.append(expected) return self def SetupCommonInit(self): @@ -57,15 +81,45 @@ class TestBuilder: 'method': 'initialize', 'params': { 'processId': 123, - 'rootPath': 'cquery', + 'rootUri': 'cquery', 'capabilities': {}, - 'trace': 'off' + 'trace': 'off', + 'initializationOptions': { + 'cacheDirectory': CACHE_DIR, + 'clientVersion': -1 # Disables the check + } } }) self.Expect({ 'id': 0, - 'method': 'initialized', - 'result': {} + 'result': { + 'capabilities': { + 'textDocumentSync': 2, + 'hoverProvider': True, + 'completionProvider': { + 'resolveProvider': False, + 'triggerCharacters': [ '.', ':', '>', '#' ] + }, + 'signatureHelpProvider': { + 'triggerCharacters': [ '(', ',' ] + }, + 'definitionProvider': True, + 'referencesProvider': True, + 'documentHighlightProvider': True, + 'documentSymbolProvider': True, + 'workspaceSymbolProvider': True, + 'codeActionProvider': True, + 'codeLensProvider': { + 'resolveProvider': False + }, + 'documentFormattingProvider': False, + 'documentRangeFormattingProvider': False, + 'renameProvider': True, + 'documentLinkProvider': { + 'resolveProvider': False + } + } + } }) return self @@ -79,32 +133,76 @@ def _ExecuteTest(name, func): if not isinstance(test_builder, TestBuilder): raise Exception('%s does not return a TestBuilder instance' % name) - test_builder.Send({ 'method': 'exit' }) - - # Possible test runner implementation - cmd = "x64/Debug/indexer.exe --language-server" - process = Popen(shlex.split(cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) + # Add a final exit message. + test_builder.Send({ 'method': '$cquery/exitWhenIdle' }) + # Convert messages to a stdin byte array. stdin = '' for message in test_builder.sent: payload = json.dumps(message) wrapped = 'Content-Length: %s\r\n\r\n%s' % (len(payload), payload) stdin += wrapped + stdin_bytes = stdin.encode(encoding='UTF-8') - print('## %s ##' % name) - print('== STDIN ==') - print(stdin) - (stdout, stderr) = process.communicate(stdin) - if stdout: - print('== STDOUT ==') - print(stdout) - if stderr: - print('== STDERR ==') - print(stderr) + # Finds all messages in |string| by parsing Content-Length headers. + def GetMessages(string): + messages = [] + for match in re.finditer('Content-Length: (\d+)\r\n\r\n', string): + start = match.span()[1] + length = int(match.groups()[0]) + message = string[start:start + length] + messages.append(json.loads(message)) + return messages - # TODO: Actually verify stdout. + # Utility method to print a byte array. + def PrintByteArray(bytes): + for line in bytes.split(b'\r\n'): + print(line.decode('utf8')) + + # Execute program. + cmd = "%s --language-server" % CQUERY_PATH + process = Popen(shlex.split(cmd), stdin=PIPE, stdout=PIPE, stderr=PIPE) + (stdout, stderr) = process.communicate(stdin_bytes) + exit_code = process.wait(); + + # Check if test succeeded. + actual = GetMessages(stdout.decode('utf8')) + success = actual == test_builder.expected + + # Print failure messages. + if success: + print('== Passed %s with exit_code=%s ==' % (name, exit_code)) + else: + print('== FAILED %s with exit_code=%s ==' % (name, exit_code)) + print('## STDIN:') + for message in GetMessages(stdin): + print(json.dumps(message, indent=True)) + if stdout: + print('## STDOUT:') + for message in GetMessages(stdout.decode('utf8')): + print(json.dumps(message, indent=True)) + if stderr: + print('## STDERR:') + PrintByteArray(stderr) + + print('## Expected output') + for message in test_builder.expected: + print(message) + print('## Actual output') + for message in actual: + print(message) + print('## Difference') + common_end = min(len(test_builder.expected), len(actual)) + for i in range(0, common_end): + if test_builder.expected[i] != actual[i]: + print('i=%s' % i) + print('- Expected %s' % str(test_builder.expected[i])) + print('- Actual %s' % str(actual[i])) + for i in range(common_end, len(test_builder.expected)): + print('Extra expected: %s' % str(test_builder.expected[i])) + for i in range(common_end, len(actual)): + print('Extra actual: %s' % str(actual[i])) - exit_code = process.wait() def _DiscoverTests(): """ @@ -122,12 +220,16 @@ def _RunTests(): Executes all tests. """ for name, func in _DiscoverTests(): - print('Running test function %s' % name) _ExecuteTest(name, func) + + +#### EXAMPLE TESTS #### + + class lsSymbolKind: Function = 1 @@ -138,24 +240,32 @@ def lsSymbolInfo(name, position, kind): 'kind': kind } -def Test_Init(): +def DISABLED_Test_Init(): return (TestBuilder() .SetupCommonInit() ) -def _Test_Outline(): +def Test_Outline(): return (TestBuilder() .SetupCommonInit() - .WithFile("foo.cc", - """ - void main() {} - """ - ) + # .IndexFile("file:///C%3A/Users/jacob/Desktop/cquery/foo.cc", + .IndexFile("foo.cc", + """void foobar();""") + .WaitForIdle() .Send({ 'id': 1, 'method': 'textDocument/documentSymbol', - 'params': {} + 'params': { + 'textDocument': { + 'uri': 'C:/Users/jacob/Desktop/cquery/foo.cc' + } + } }) + # .Expect({ + # 'jsonrpc': '2.0', + # 'id': 1, + # 'error': {'code': -32603, 'message': 'Unable to find file '} + # })) .Expect({ 'id': 1, 'result': [ @@ -165,4 +275,4 @@ def _Test_Outline(): if __name__ == '__main__': - _RunTests() \ No newline at end of file + _RunTests()