From 7939aec7439a862d82f08e6a4cf8d03f53863b5c Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Thu, 28 Dec 2017 09:18:54 -0800 Subject: [PATCH] Remove WorkThread::Result. WorkThread::StartThread is now a more typical main function. --- src/command_line.cc | 180 ++++++++++++++++++------------------ src/entry_points.h | 16 ++-- src/import_pipeline.cc | 77 ++++++++------- src/include_complete.cc | 2 - src/messages/cquery_wait.cc | 7 +- src/messages/initialize.cc | 6 +- src/work_thread.cc | 10 +- src/work_thread.h | 5 +- 8 files changed, 143 insertions(+), 160 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 03706dba..59383845 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -187,9 +187,9 @@ void RunQueryDbThread(const std::string& bin_name, SetCurrentThreadName("querydb"); while (true) { bool did_work = QueryDbMainLoop( - config, &db, waiter, &project, &file_consumer_shared, - &import_manager, ×tamp_manager, &semantic_cache, &working_files, - &clang_complete, &include_complete, global_code_complete_cache.get(), + config, &db, waiter, &project, &file_consumer_shared, &import_manager, + ×tamp_manager, &semantic_cache, &working_files, &clang_complete, + &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); // Cleanup and free any unused memory. @@ -234,76 +234,75 @@ void LaunchStdinLoop(Config* config, WorkThread::StartThread("stdin", [request_times]() { auto* queue = QueueManager::instance(); - std::unique_ptr message = - MessageRegistry::instance()->ReadMessageFromStdin( - g_log_stdin_stdout_to_stderr); + while (true) { + std::unique_ptr message = + MessageRegistry::instance()->ReadMessageFromStdin( + g_log_stdin_stdout_to_stderr); - // Message parsing can fail if we don't recognize the method. - if (!message) - return WorkThread::Result::MoreWork; + // Message parsing can fail if we don't recognize the method. + if (!message) + continue; - // Cache |method_id| so we can access it after moving |message|. - IpcId method_id = message->method_id; + // Cache |method_id| so we can access it after moving |message|. + IpcId method_id = message->method_id; - (*request_times)[message->method_id] = Timer(); + (*request_times)[message->method_id] = Timer(); - switch (method_id) { - case IpcId::Initialized: { - // TODO: don't send output until we get this notification + switch (method_id) { + case IpcId::Initialized: { + // TODO: don't send output until we get this notification + break; + } + + case IpcId::CancelRequest: { + // TODO: support cancellation + break; + } + + case IpcId::Exit: + case IpcId::Initialize: + case IpcId::TextDocumentDidOpen: + case IpcId::CqueryTextDocumentDidView: + case IpcId::TextDocumentDidChange: + case IpcId::TextDocumentDidClose: + case IpcId::TextDocumentDidSave: + case IpcId::TextDocumentRename: + case IpcId::TextDocumentCompletion: + case IpcId::TextDocumentSignatureHelp: + case IpcId::TextDocumentDefinition: + case IpcId::TextDocumentDocumentHighlight: + case IpcId::TextDocumentHover: + case IpcId::TextDocumentReferences: + case IpcId::TextDocumentDocumentSymbol: + case IpcId::TextDocumentDocumentLink: + case IpcId::TextDocumentCodeAction: + case IpcId::TextDocumentCodeLens: + case IpcId::WorkspaceSymbol: + case IpcId::CqueryFreshenIndex: + case IpcId::CqueryTypeHierarchyTree: + case IpcId::CqueryCallTreeInitial: + case IpcId::CqueryCallTreeExpand: + case IpcId::CqueryVars: + case IpcId::CqueryCallers: + case IpcId::CqueryBase: + case IpcId::CqueryDerived: + case IpcId::CqueryIndexFile: + case IpcId::CqueryWait: { + queue->for_querydb.Enqueue(std::move(message)); + break; + } + + default: { + LOG_S(ERROR) << "Unhandled IPC message " << IpcIdToString(method_id); + exit(1); + } + } + + // If the message was to exit then querydb will take care of the actual + // exit. Stop reading from stdin since it might be detached. + if (method_id == IpcId::Exit) break; - } - - case IpcId::CancelRequest: { - // TODO: support cancellation - break; - } - - case IpcId::Exit: - case IpcId::Initialize: - case IpcId::TextDocumentDidOpen: - case IpcId::CqueryTextDocumentDidView: - case IpcId::TextDocumentDidChange: - case IpcId::TextDocumentDidClose: - case IpcId::TextDocumentDidSave: - case IpcId::TextDocumentRename: - case IpcId::TextDocumentCompletion: - case IpcId::TextDocumentSignatureHelp: - case IpcId::TextDocumentDefinition: - case IpcId::TextDocumentDocumentHighlight: - case IpcId::TextDocumentHover: - case IpcId::TextDocumentReferences: - case IpcId::TextDocumentDocumentSymbol: - case IpcId::TextDocumentDocumentLink: - case IpcId::TextDocumentCodeAction: - case IpcId::TextDocumentCodeLens: - case IpcId::WorkspaceSymbol: - case IpcId::CqueryFreshenIndex: - case IpcId::CqueryTypeHierarchyTree: - case IpcId::CqueryCallTreeInitial: - case IpcId::CqueryCallTreeExpand: - case IpcId::CqueryVars: - case IpcId::CqueryCallers: - case IpcId::CqueryBase: - case IpcId::CqueryDerived: - case IpcId::CqueryIndexFile: - case IpcId::CqueryWait: { - queue->for_querydb.Enqueue(std::move(message)); - break; - } - - default: { - LOG_S(ERROR) << "Unhandled IPC message " - << IpcIdToString(method_id); - exit(1); - } } - - // If the message was to exit then querydb will take care of the actual - // exit. Stop reading from stdin since it might be detached. - if (method_id == IpcId::Exit) - return WorkThread::Result::ExitThread; - - return WorkThread::Result::MoreWork; }); } @@ -312,34 +311,33 @@ void LaunchStdoutThread(std::unordered_map* request_times, WorkThread::StartThread("stdout", [=]() { auto* queue = QueueManager::instance(); - std::vector messages = queue->for_stdout.DequeueAll(); - if (messages.empty()) { - waiter->Wait({&queue->for_stdout}); - return queue->HasWork() ? WorkThread::Result::MoreWork - : WorkThread::Result::NoWork; - } - - for (auto& message : messages) { - if (ShouldDisplayIpcTiming(message.id)) { - Timer time = (*request_times)[message.id]; - time.ResetAndPrint("[e2e] Running " + - std::string(IpcIdToString(message.id))); + while (true) { + std::vector messages = queue->for_stdout.DequeueAll(); + if (messages.empty()) { + waiter->Wait({&queue->for_stdout}); + continue; } - if (g_log_stdin_stdout_to_stderr) { - std::ostringstream sstream; - sstream << "[COUT] |"; - sstream << message.content; - sstream << "|\n"; - std::cerr << sstream.str(); - std::cerr.flush(); + for (auto& message : messages) { + if (ShouldDisplayIpcTiming(message.id)) { + Timer time = (*request_times)[message.id]; + time.ResetAndPrint("[e2e] Running " + + std::string(IpcIdToString(message.id))); + } + + if (g_log_stdin_stdout_to_stderr) { + std::ostringstream sstream; + sstream << "[COUT] |"; + sstream << message.content; + sstream << "|\n"; + std::cerr << sstream.str(); + std::cerr.flush(); + } + + std::cout << message.content; + std::cout.flush(); } - - std::cout << message.content; - std::cout.flush(); } - - return WorkThread::Result::MoreWork; }); } diff --git a/src/entry_points.h b/src/entry_points.h index f7ef0a10..615dd680 100644 --- a/src/entry_points.h +++ b/src/entry_points.h @@ -20,11 +20,11 @@ bool QueryDb_ImportMain(Config* config, SemanticHighlightSymbolCache* semantic_cache, WorkingFiles* working_files); -WorkThread::Result IndexMain(Config* config, - FileConsumer::SharedState* file_consumer_shared, - TimestampManager* timestamp_manager, - ImportManager* import_manager, - ImportPipelineStatus* status, - Project* project, - WorkingFiles* working_files, - MultiQueueWaiter* waiter); \ No newline at end of file +void IndexMain(Config* config, + FileConsumer::SharedState* file_consumer_shared, + TimestampManager* timestamp_manager, + ImportManager* import_manager, + ImportPipelineStatus* status, + Project* project, + WorkingFiles* working_files, + MultiQueueWaiter* waiter); \ No newline at end of file diff --git a/src/import_pipeline.cc b/src/import_pipeline.cc index 58a7ab7f..15223a03 100644 --- a/src/import_pipeline.cc +++ b/src/import_pipeline.cc @@ -434,56 +434,55 @@ bool IndexMergeIndexUpdates() { } } -WorkThread::Result IndexMain(Config* config, - FileConsumer::SharedState* file_consumer_shared, - TimestampManager* timestamp_manager, - ImportManager* import_manager, - ImportPipelineStatus* status, - Project* project, - WorkingFiles* working_files, - MultiQueueWaiter* waiter) { - status->num_active_threads++; - - EmitProgress(config); - +void IndexMain(Config* config, + FileConsumer::SharedState* file_consumer_shared, + TimestampManager* timestamp_manager, + ImportManager* import_manager, + ImportPipelineStatus* status, + Project* project, + WorkingFiles* working_files, + MultiQueueWaiter* waiter) { // Build one index per-indexer, as building the index acquires a global lock. ClangIndex index; - // TODO: process all off IndexMain_DoIndex before calling - // IndexMain_DoCreateIndexUpdate for better icache behavior. We need to have - // some threads spinning on both though otherwise memory usage will get bad. + while (true) { + status->num_active_threads++; - // We need to make sure to run both IndexMain_DoParse and - // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any - // work. Running both also lets the user query the partially constructed - // index. - bool did_parse = - IndexMain_DoParse(config, working_files, file_consumer_shared, - timestamp_manager, import_manager, &index); + EmitProgress(config); - bool did_create_update = - IndexMain_DoCreateIndexUpdate(config, timestamp_manager); + // TODO: process all off IndexMain_DoIndex before calling + // IndexMain_DoCreateIndexUpdate for better icache behavior. We need to have + // some threads spinning on both though otherwise memory usage will get bad. - bool did_load_previous = IndexMain_LoadPreviousIndex(config); + // We need to make sure to run both IndexMain_DoParse and + // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any + // work. Running both also lets the user query the partially constructed + // index. + bool did_parse = + IndexMain_DoParse(config, working_files, file_consumer_shared, + timestamp_manager, import_manager, &index); - // Nothing to index and no index updates to create, so join some already - // created index updates to reduce work on querydb thread. - bool did_merge = false; - if (!did_parse && !did_create_update && !did_load_previous) - did_merge = IndexMergeIndexUpdates(); + bool did_create_update = + IndexMain_DoCreateIndexUpdate(config, timestamp_manager); - status->num_active_threads--; + bool did_load_previous = IndexMain_LoadPreviousIndex(config); - auto* queue = QueueManager::instance(); + // Nothing to index and no index updates to create, so join some already + // created index updates to reduce work on querydb thread. + bool did_merge = false; + if (!did_parse && !did_create_update && !did_load_previous) + did_merge = IndexMergeIndexUpdates(); - // We didn't do any work, so wait for a notification. - if (!did_parse && !did_create_update && !did_merge && !did_load_previous) { - waiter->Wait({&queue->index_request, &queue->on_id_mapped, - &queue->load_previous_index, &queue->on_indexed}); + status->num_active_threads--; + + auto* queue = QueueManager::instance(); + + // We didn't do any work, so wait for a notification. + if (!did_parse && !did_create_update && !did_merge && !did_load_previous) { + waiter->Wait({&queue->index_request, &queue->on_id_mapped, + &queue->load_previous_index, &queue->on_indexed}); + } } - - return queue->HasWork() ? WorkThread::Result::MoreWork - : WorkThread::Result::NoWork; } bool QueryDb_ImportMain(Config* config, diff --git a/src/include_complete.cc b/src/include_complete.cc index 6385524b..b304f069 100644 --- a/src/include_complete.cc +++ b/src/include_complete.cc @@ -136,8 +136,6 @@ void IncludeComplete::Rescan() { timer.ResetAndPrint("[perf] Scanning for includes"); is_scanning = false; - - return WorkThread::Result::ExitThread; }); } diff --git a/src/messages/cquery_wait.cc b/src/messages/cquery_wait.cc index e41bcd66..b0c542af 100644 --- a/src/messages/cquery_wait.cc +++ b/src/messages/cquery_wait.cc @@ -4,17 +4,14 @@ #include namespace { -struct Ipc_CqueryWait - : public IpcMessage { +struct Ipc_CqueryWait : public IpcMessage { static constexpr IpcId kIpcId = IpcId::CqueryWait; }; MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryWait); REGISTER_IPC_MESSAGE(Ipc_CqueryWait); struct CqueryWaitHandler : MessageHandler { - IpcId GetId() const override { - return IpcId::CqueryWait; - } + IpcId GetId() const override { return IpcId::CqueryWait; } void Run(std::unique_ptr request) override { // TODO: use status message system here, then run querydb as normal? Maybe // this cannot be a normal message, ie, it needs to be re-entrant. diff --git a/src/messages/initialize.cc b/src/messages/initialize.cc index 1250a4ec..af5ec661 100644 --- a/src/messages/initialize.cc +++ b/src/messages/initialize.cc @@ -173,9 +173,9 @@ struct InitializeHandler : BaseMessageHandler { LOG_S(INFO) << "Starting " << config->indexerCount << " indexers"; for (int i = 0; i < config->indexerCount; ++i) { WorkThread::StartThread("indexer" + std::to_string(i), [=]() { - return IndexMain(config, file_consumer_shared, timestamp_manager, - import_manager, import_pipeline_status, project, - working_files, waiter); + IndexMain(config, file_consumer_shared, timestamp_manager, + import_manager, import_pipeline_status, project, + working_files, waiter); }); } diff --git a/src/work_thread.cc b/src/work_thread.cc index 32df6a61..b627485f 100644 --- a/src/work_thread.cc +++ b/src/work_thread.cc @@ -4,15 +4,9 @@ // static void WorkThread::StartThread(const std::string& thread_name, - const std::function& entry_point) { + std::function entry_point) { new std::thread([thread_name, entry_point]() { SetCurrentThreadName(thread_name); - - // Main loop. - while (true) { - Result result = entry_point(); - if (result == Result::ExitThread) - break; - } + entry_point(); }); } \ No newline at end of file diff --git a/src/work_thread.h b/src/work_thread.h index 0dbb2160..d11c237a 100644 --- a/src/work_thread.h +++ b/src/work_thread.h @@ -9,13 +9,10 @@ // Helper methods for starting threads that do some work. Enables test code to // wait for all work to complete. struct WorkThread { - // FIXME: remove result, have entry_point run a while(true) loop. - enum class Result { MoreWork, NoWork, ExitThread }; - // Launch a new thread. |entry_point| will be called continously. It should // return true if it there is still known work to be done. static void StartThread(const std::string& thread_name, - const std::function& entry_point); + std::function entry_point); // Static-only class. WorkThread() = delete;