Remove WorkThread::Result.

WorkThread::StartThread is now a more typical main function.
This commit is contained in:
Jacob Dufault 2017-12-28 09:18:54 -08:00
parent 8468ef09c3
commit 7939aec743
8 changed files with 143 additions and 160 deletions

View File

@ -187,9 +187,9 @@ void RunQueryDbThread(const std::string& bin_name,
SetCurrentThreadName("querydb"); SetCurrentThreadName("querydb");
while (true) { while (true) {
bool did_work = QueryDbMainLoop( bool did_work = QueryDbMainLoop(
config, &db, waiter, &project, &file_consumer_shared, config, &db, waiter, &project, &file_consumer_shared, &import_manager,
&import_manager, &timestamp_manager, &semantic_cache, &working_files, &timestamp_manager, &semantic_cache, &working_files, &clang_complete,
&clang_complete, &include_complete, global_code_complete_cache.get(), &include_complete, global_code_complete_cache.get(),
non_global_code_complete_cache.get(), signature_cache.get()); non_global_code_complete_cache.get(), signature_cache.get());
// Cleanup and free any unused memory. // Cleanup and free any unused memory.
@ -234,76 +234,75 @@ void LaunchStdinLoop(Config* config,
WorkThread::StartThread("stdin", [request_times]() { WorkThread::StartThread("stdin", [request_times]() {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
std::unique_ptr<BaseIpcMessage> message = while (true) {
MessageRegistry::instance()->ReadMessageFromStdin( std::unique_ptr<BaseIpcMessage> message =
g_log_stdin_stdout_to_stderr); MessageRegistry::instance()->ReadMessageFromStdin(
g_log_stdin_stdout_to_stderr);
// Message parsing can fail if we don't recognize the method. // Message parsing can fail if we don't recognize the method.
if (!message) if (!message)
return WorkThread::Result::MoreWork; continue;
// Cache |method_id| so we can access it after moving |message|. // Cache |method_id| so we can access it after moving |message|.
IpcId method_id = message->method_id; IpcId method_id = message->method_id;
(*request_times)[message->method_id] = Timer(); (*request_times)[message->method_id] = Timer();
switch (method_id) { switch (method_id) {
case IpcId::Initialized: { case IpcId::Initialized: {
// TODO: don't send output until we get this notification // TODO: don't send output until we get this notification
break;
}
case IpcId::CancelRequest: {
// TODO: support cancellation
break;
}
case IpcId::Exit:
case IpcId::Initialize:
case IpcId::TextDocumentDidOpen:
case IpcId::CqueryTextDocumentDidView:
case IpcId::TextDocumentDidChange:
case IpcId::TextDocumentDidClose:
case IpcId::TextDocumentDidSave:
case IpcId::TextDocumentRename:
case IpcId::TextDocumentCompletion:
case IpcId::TextDocumentSignatureHelp:
case IpcId::TextDocumentDefinition:
case IpcId::TextDocumentDocumentHighlight:
case IpcId::TextDocumentHover:
case IpcId::TextDocumentReferences:
case IpcId::TextDocumentDocumentSymbol:
case IpcId::TextDocumentDocumentLink:
case IpcId::TextDocumentCodeAction:
case IpcId::TextDocumentCodeLens:
case IpcId::WorkspaceSymbol:
case IpcId::CqueryFreshenIndex:
case IpcId::CqueryTypeHierarchyTree:
case IpcId::CqueryCallTreeInitial:
case IpcId::CqueryCallTreeExpand:
case IpcId::CqueryVars:
case IpcId::CqueryCallers:
case IpcId::CqueryBase:
case IpcId::CqueryDerived:
case IpcId::CqueryIndexFile:
case IpcId::CqueryWait: {
queue->for_querydb.Enqueue(std::move(message));
break;
}
default: {
LOG_S(ERROR) << "Unhandled IPC message " << IpcIdToString(method_id);
exit(1);
}
}
// If the message was to exit then querydb will take care of the actual
// exit. Stop reading from stdin since it might be detached.
if (method_id == IpcId::Exit)
break; break;
}
case IpcId::CancelRequest: {
// TODO: support cancellation
break;
}
case IpcId::Exit:
case IpcId::Initialize:
case IpcId::TextDocumentDidOpen:
case IpcId::CqueryTextDocumentDidView:
case IpcId::TextDocumentDidChange:
case IpcId::TextDocumentDidClose:
case IpcId::TextDocumentDidSave:
case IpcId::TextDocumentRename:
case IpcId::TextDocumentCompletion:
case IpcId::TextDocumentSignatureHelp:
case IpcId::TextDocumentDefinition:
case IpcId::TextDocumentDocumentHighlight:
case IpcId::TextDocumentHover:
case IpcId::TextDocumentReferences:
case IpcId::TextDocumentDocumentSymbol:
case IpcId::TextDocumentDocumentLink:
case IpcId::TextDocumentCodeAction:
case IpcId::TextDocumentCodeLens:
case IpcId::WorkspaceSymbol:
case IpcId::CqueryFreshenIndex:
case IpcId::CqueryTypeHierarchyTree:
case IpcId::CqueryCallTreeInitial:
case IpcId::CqueryCallTreeExpand:
case IpcId::CqueryVars:
case IpcId::CqueryCallers:
case IpcId::CqueryBase:
case IpcId::CqueryDerived:
case IpcId::CqueryIndexFile:
case IpcId::CqueryWait: {
queue->for_querydb.Enqueue(std::move(message));
break;
}
default: {
LOG_S(ERROR) << "Unhandled IPC message "
<< IpcIdToString(method_id);
exit(1);
}
} }
// If the message was to exit then querydb will take care of the actual
// exit. Stop reading from stdin since it might be detached.
if (method_id == IpcId::Exit)
return WorkThread::Result::ExitThread;
return WorkThread::Result::MoreWork;
}); });
} }
@ -312,34 +311,33 @@ void LaunchStdoutThread(std::unordered_map<IpcId, Timer>* request_times,
WorkThread::StartThread("stdout", [=]() { WorkThread::StartThread("stdout", [=]() {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
std::vector<Stdout_Request> messages = queue->for_stdout.DequeueAll(); while (true) {
if (messages.empty()) { std::vector<Stdout_Request> messages = queue->for_stdout.DequeueAll();
waiter->Wait({&queue->for_stdout}); if (messages.empty()) {
return queue->HasWork() ? WorkThread::Result::MoreWork waiter->Wait({&queue->for_stdout});
: WorkThread::Result::NoWork; continue;
}
for (auto& message : messages) {
if (ShouldDisplayIpcTiming(message.id)) {
Timer time = (*request_times)[message.id];
time.ResetAndPrint("[e2e] Running " +
std::string(IpcIdToString(message.id)));
} }
if (g_log_stdin_stdout_to_stderr) { for (auto& message : messages) {
std::ostringstream sstream; if (ShouldDisplayIpcTiming(message.id)) {
sstream << "[COUT] |"; Timer time = (*request_times)[message.id];
sstream << message.content; time.ResetAndPrint("[e2e] Running " +
sstream << "|\n"; std::string(IpcIdToString(message.id)));
std::cerr << sstream.str(); }
std::cerr.flush();
if (g_log_stdin_stdout_to_stderr) {
std::ostringstream sstream;
sstream << "[COUT] |";
sstream << message.content;
sstream << "|\n";
std::cerr << sstream.str();
std::cerr.flush();
}
std::cout << message.content;
std::cout.flush();
} }
std::cout << message.content;
std::cout.flush();
} }
return WorkThread::Result::MoreWork;
}); });
} }

View File

@ -20,11 +20,11 @@ bool QueryDb_ImportMain(Config* config,
SemanticHighlightSymbolCache* semantic_cache, SemanticHighlightSymbolCache* semantic_cache,
WorkingFiles* working_files); WorkingFiles* working_files);
WorkThread::Result IndexMain(Config* config, void IndexMain(Config* config,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
TimestampManager* timestamp_manager, TimestampManager* timestamp_manager,
ImportManager* import_manager, ImportManager* import_manager,
ImportPipelineStatus* status, ImportPipelineStatus* status,
Project* project, Project* project,
WorkingFiles* working_files, WorkingFiles* working_files,
MultiQueueWaiter* waiter); MultiQueueWaiter* waiter);

View File

@ -434,56 +434,55 @@ bool IndexMergeIndexUpdates() {
} }
} }
WorkThread::Result IndexMain(Config* config, void IndexMain(Config* config,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
TimestampManager* timestamp_manager, TimestampManager* timestamp_manager,
ImportManager* import_manager, ImportManager* import_manager,
ImportPipelineStatus* status, ImportPipelineStatus* status,
Project* project, Project* project,
WorkingFiles* working_files, WorkingFiles* working_files,
MultiQueueWaiter* waiter) { MultiQueueWaiter* waiter) {
status->num_active_threads++;
EmitProgress(config);
// Build one index per-indexer, as building the index acquires a global lock. // Build one index per-indexer, as building the index acquires a global lock.
ClangIndex index; ClangIndex index;
// TODO: process all off IndexMain_DoIndex before calling while (true) {
// IndexMain_DoCreateIndexUpdate for better icache behavior. We need to have status->num_active_threads++;
// some threads spinning on both though otherwise memory usage will get bad.
// We need to make sure to run both IndexMain_DoParse and EmitProgress(config);
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
bool did_parse =
IndexMain_DoParse(config, working_files, file_consumer_shared,
timestamp_manager, import_manager, &index);
bool did_create_update = // TODO: process all off IndexMain_DoIndex before calling
IndexMain_DoCreateIndexUpdate(config, timestamp_manager); // IndexMain_DoCreateIndexUpdate for better icache behavior. We need to have
// some threads spinning on both though otherwise memory usage will get bad.
bool did_load_previous = IndexMain_LoadPreviousIndex(config); // We need to make sure to run both IndexMain_DoParse and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
bool did_parse =
IndexMain_DoParse(config, working_files, file_consumer_shared,
timestamp_manager, import_manager, &index);
// Nothing to index and no index updates to create, so join some already bool did_create_update =
// created index updates to reduce work on querydb thread. IndexMain_DoCreateIndexUpdate(config, timestamp_manager);
bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates();
status->num_active_threads--; bool did_load_previous = IndexMain_LoadPreviousIndex(config);
auto* queue = QueueManager::instance(); // Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates();
// We didn't do any work, so wait for a notification. status->num_active_threads--;
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
waiter->Wait({&queue->index_request, &queue->on_id_mapped, auto* queue = QueueManager::instance();
&queue->load_previous_index, &queue->on_indexed});
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
waiter->Wait({&queue->index_request, &queue->on_id_mapped,
&queue->load_previous_index, &queue->on_indexed});
}
} }
return queue->HasWork() ? WorkThread::Result::MoreWork
: WorkThread::Result::NoWork;
} }
bool QueryDb_ImportMain(Config* config, bool QueryDb_ImportMain(Config* config,

View File

@ -136,8 +136,6 @@ void IncludeComplete::Rescan() {
timer.ResetAndPrint("[perf] Scanning for includes"); timer.ResetAndPrint("[perf] Scanning for includes");
is_scanning = false; is_scanning = false;
return WorkThread::Result::ExitThread;
}); });
} }

View File

@ -4,17 +4,14 @@
#include <loguru.hpp> #include <loguru.hpp>
namespace { namespace {
struct Ipc_CqueryWait struct Ipc_CqueryWait : public IpcMessage<Ipc_CqueryWait> {
: public IpcMessage<Ipc_CqueryWait> {
static constexpr IpcId kIpcId = IpcId::CqueryWait; static constexpr IpcId kIpcId = IpcId::CqueryWait;
}; };
MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryWait); MAKE_REFLECT_EMPTY_STRUCT(Ipc_CqueryWait);
REGISTER_IPC_MESSAGE(Ipc_CqueryWait); REGISTER_IPC_MESSAGE(Ipc_CqueryWait);
struct CqueryWaitHandler : MessageHandler { struct CqueryWaitHandler : MessageHandler {
IpcId GetId() const override { IpcId GetId() const override { return IpcId::CqueryWait; }
return IpcId::CqueryWait;
}
void Run(std::unique_ptr<BaseIpcMessage> request) override { void Run(std::unique_ptr<BaseIpcMessage> request) override {
// TODO: use status message system here, then run querydb as normal? Maybe // TODO: use status message system here, then run querydb as normal? Maybe
// this cannot be a normal message, ie, it needs to be re-entrant. // this cannot be a normal message, ie, it needs to be re-entrant.

View File

@ -173,9 +173,9 @@ struct InitializeHandler : BaseMessageHandler<Ipc_InitializeRequest> {
LOG_S(INFO) << "Starting " << config->indexerCount << " indexers"; LOG_S(INFO) << "Starting " << config->indexerCount << " indexers";
for (int i = 0; i < config->indexerCount; ++i) { for (int i = 0; i < config->indexerCount; ++i) {
WorkThread::StartThread("indexer" + std::to_string(i), [=]() { WorkThread::StartThread("indexer" + std::to_string(i), [=]() {
return IndexMain(config, file_consumer_shared, timestamp_manager, IndexMain(config, file_consumer_shared, timestamp_manager,
import_manager, import_pipeline_status, project, import_manager, import_pipeline_status, project,
working_files, waiter); working_files, waiter);
}); });
} }

View File

@ -4,15 +4,9 @@
// static // static
void WorkThread::StartThread(const std::string& thread_name, void WorkThread::StartThread(const std::string& thread_name,
const std::function<Result()>& entry_point) { std::function<void()> entry_point) {
new std::thread([thread_name, entry_point]() { new std::thread([thread_name, entry_point]() {
SetCurrentThreadName(thread_name); SetCurrentThreadName(thread_name);
entry_point();
// Main loop.
while (true) {
Result result = entry_point();
if (result == Result::ExitThread)
break;
}
}); });
} }

View File

@ -9,13 +9,10 @@
// Helper methods for starting threads that do some work. Enables test code to // Helper methods for starting threads that do some work. Enables test code to
// wait for all work to complete. // wait for all work to complete.
struct WorkThread { struct WorkThread {
// FIXME: remove result, have entry_point run a while(true) loop.
enum class Result { MoreWork, NoWork, ExitThread };
// Launch a new thread. |entry_point| will be called continously. It should // Launch a new thread. |entry_point| will be called continously. It should
// return true if it there is still known work to be done. // return true if it there is still known work to be done.
static void StartThread(const std::string& thread_name, static void StartThread(const std::string& thread_name,
const std::function<Result()>& entry_point); std::function<void()> entry_point);
// Static-only class. // Static-only class.
WorkThread() = delete; WorkThread() = delete;