Merge IpcManager and QueueManager

This commit is contained in:
Jacob Dufault 2017-12-23 16:25:18 -08:00
parent 27b5816a7f
commit f6d84cd68c
32 changed files with 133 additions and 149 deletions

View File

@ -5,7 +5,6 @@
#include "file_consumer.h"
#include "include_complete.h"
#include "indexer.h"
#include "ipc_manager.h"
#include "language_server_api.h"
#include "lex_utils.h"
#include "lru_cache.h"
@ -16,6 +15,7 @@
#include "project.h"
#include "query.h"
#include "query_utils.h"
#include "queue_manager.h"
#include "serializer.h"
#include "standard_includes.h"
#include "test.h"
@ -76,7 +76,7 @@ void EmitDiagnostics(WorkingFiles* working_files,
Out_TextDocumentPublishDiagnostics out;
out.params.uri = lsDocumentUri::FromPath(path);
out.params.diagnostics = diagnostics;
IpcManager::WriteStdout(IpcId::TextDocumentPublishDiagnostics, out);
QueueManager::WriteStdout(IpcId::TextDocumentPublishDiagnostics, out);
// Cache diagnostics so we can show fixits.
working_files->DoActionOnFile(path, [&](WorkingFile* working_file) {
@ -88,8 +88,9 @@ void EmitDiagnostics(WorkingFiles* working_files,
REGISTER_IPC_MESSAGE(Ipc_CancelRequest);
// Send indexing progress to client if reporting is enabled.
void EmitProgress(Config* config, QueueManager* queue) {
void EmitProgress(Config* config) {
if (config->enableProgressReports) {
auto* queue = QueueManager::instance();
Out_Progress out;
out.params.indexRequestCount = queue->index_request.Size();
out.params.doIdMapCount = queue->do_id_map.Size();
@ -97,7 +98,7 @@ void EmitProgress(Config* config, QueueManager* queue) {
out.params.onIdMappedCount = queue->on_id_mapped.Size();
out.params.onIndexedCount = queue->on_indexed.Size();
IpcManager::WriteStdout(IpcId::Unknown, out);
QueueManager::WriteStdout(IpcId::Unknown, out);
}
}
@ -307,7 +308,6 @@ std::vector<Index_DoIdMap> DoParseFile(
// real-time indexing.
// TODO: add option to disable this.
void IndexWithTuFromCodeCompletion(
QueueManager* queue,
FileConsumer::SharedState* file_consumer_shared,
ClangTranslationUnit* tu,
const std::vector<CXUnsavedFile>& file_contents,
@ -335,7 +335,7 @@ void IndexWithTuFromCodeCompletion(
LOG_IF_S(WARNING, result.size() > 1)
<< "Code completion index update generated more than one index";
queue->do_id_map.EnqueueAll(std::move(result));
QueueManager::instance()->do_id_map.EnqueueAll(std::move(result));
}
std::vector<Index_DoIdMap> ParseFile(
@ -366,11 +366,11 @@ std::vector<Index_DoIdMap> ParseFile(
bool IndexMain_DoParse(Config* config,
WorkingFiles* working_files,
QueueManager* queue,
FileConsumer::SharedState* file_consumer_shared,
TimestampManager* timestamp_manager,
ImportManager* import_manager,
ClangIndex* index) {
auto* queue = QueueManager::instance();
optional<Index_Request> request = queue->index_request.TryDequeue();
if (!request)
return false;
@ -392,8 +392,8 @@ bool IndexMain_DoParse(Config* config,
}
bool IndexMain_DoCreateIndexUpdate(Config* config,
QueueManager* queue,
TimestampManager* timestamp_manager) {
auto* queue = QueueManager::instance();
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue();
if (!response)
return false;
@ -457,7 +457,8 @@ bool IndexMain_DoCreateIndexUpdate(Config* config,
return true;
}
bool IndexMain_LoadPreviousIndex(Config* config, QueueManager* queue) {
bool IndexMain_LoadPreviousIndex(Config* config) {
auto* queue = QueueManager::instance();
optional<Index_DoIdMap> response = queue->load_previous_index.TryDequeue();
if (!response)
return false;
@ -471,7 +472,8 @@ bool IndexMain_LoadPreviousIndex(Config* config, QueueManager* queue) {
return true;
}
bool IndexMergeIndexUpdates(QueueManager* queue) {
bool IndexMergeIndexUpdates() {
auto* queue = QueueManager::instance();
optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue();
if (!root)
return false;
@ -501,9 +503,8 @@ WorkThread::Result IndexMain(Config* config,
ImportManager* import_manager,
Project* project,
WorkingFiles* working_files,
MultiQueueWaiter* waiter,
QueueManager* queue) {
EmitProgress(config, queue);
MultiQueueWaiter* waiter) {
EmitProgress(config);
// Build one index per-indexer, as building the index acquires a global lock.
ClangIndex index;
@ -517,19 +518,21 @@ WorkThread::Result IndexMain(Config* config,
// work. Running both also lets the user query the partially constructed
// index.
bool did_parse =
IndexMain_DoParse(config, working_files, queue, file_consumer_shared,
IndexMain_DoParse(config, working_files, file_consumer_shared,
timestamp_manager, import_manager, &index);
bool did_create_update =
IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager);
IndexMain_DoCreateIndexUpdate(config, timestamp_manager);
bool did_load_previous = IndexMain_LoadPreviousIndex(config, queue);
bool did_load_previous = IndexMain_LoadPreviousIndex(config);
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates(queue);
did_merge = IndexMergeIndexUpdates();
auto* queue = QueueManager::instance();
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
@ -544,10 +547,10 @@ WorkThread::Result IndexMain(Config* config,
bool QueryDb_ImportMain(Config* config,
QueryDatabase* db,
ImportManager* import_manager,
QueueManager* queue,
SemanticHighlightSymbolCache* semantic_cache,
WorkingFiles* working_files) {
EmitProgress(config, queue);
auto* queue = QueueManager::instance();
EmitProgress(config);
bool did_work = false;
@ -685,7 +688,6 @@ bool QueryDbMainLoop(Config* config,
QueryDatabase* db,
bool* exit_when_idle,
MultiQueueWaiter* waiter,
QueueManager* queue,
Project* project,
FileConsumer::SharedState* file_consumer_shared,
ImportManager* import_manager,
@ -697,12 +699,11 @@ bool QueryDbMainLoop(Config* config,
CodeCompleteCache* global_code_complete_cache,
CodeCompleteCache* non_global_code_complete_cache,
CodeCompleteCache* signature_cache) {
IpcManager* ipc = IpcManager::instance();
auto* queue = QueueManager::instance();
bool did_work = false;
std::vector<std::unique_ptr<BaseIpcMessage>> messages =
ipc->for_querydb.DequeueAll();
queue->for_querydb.DequeueAll();
for (auto& message : messages) {
did_work = true;
@ -722,7 +723,7 @@ bool QueryDbMainLoop(Config* config,
// TODO: consider rate-limiting and checking for IPC messages so we don't
// block requests / we can serve partial requests.
if (QueryDb_ImportMain(config, db, import_manager, queue, semantic_cache,
if (QueryDb_ImportMain(config, db, import_manager, semantic_cache,
working_files)) {
did_work = true;
}
@ -732,8 +733,7 @@ bool QueryDbMainLoop(Config* config,
void RunQueryDbThread(const std::string& bin_name,
Config* config,
MultiQueueWaiter* waiter,
QueueManager* queue) {
MultiQueueWaiter* waiter) {
bool exit_when_idle = false;
Project project;
SemanticHighlightSymbolCache semantic_cache;
@ -744,7 +744,7 @@ void RunQueryDbThread(const std::string& bin_name,
config, &project, &working_files,
std::bind(&EmitDiagnostics, &working_files, std::placeholders::_1,
std::placeholders::_2),
std::bind(&IndexWithTuFromCodeCompletion, queue, &file_consumer_shared,
std::bind(&IndexWithTuFromCodeCompletion, &file_consumer_shared,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4));
@ -762,7 +762,6 @@ void RunQueryDbThread(const std::string& bin_name,
handler->db = &db;
handler->exit_when_idle = &exit_when_idle;
handler->waiter = waiter;
handler->queue = queue;
handler->project = &project;
handler->file_consumer_shared = &file_consumer_shared;
handler->import_manager = &import_manager;
@ -781,11 +780,10 @@ void RunQueryDbThread(const std::string& bin_name,
SetCurrentThreadName("querydb");
while (true) {
bool did_work = QueryDbMainLoop(
config, &db, &exit_when_idle, waiter, queue, &project,
&file_consumer_shared, &import_manager, &timestamp_manager,
&semantic_cache, &working_files, &clang_complete, &include_complete,
global_code_complete_cache.get(), non_global_code_complete_cache.get(),
signature_cache.get());
config, &db, &exit_when_idle, waiter, &project, &file_consumer_shared,
&import_manager, &timestamp_manager, &semantic_cache, &working_files,
&clang_complete, &include_complete, global_code_complete_cache.get(),
non_global_code_complete_cache.get(), signature_cache.get());
// No more work left and exit request. Exit.
if (!did_work && exit_when_idle && WorkThread::num_active_threads == 0) {
@ -797,7 +795,8 @@ void RunQueryDbThread(const std::string& bin_name,
FreeUnusedMemory();
if (!did_work) {
waiter->Wait({&IpcManager::instance()->for_querydb, &queue->do_id_map,
auto* queue = QueueManager::instance();
waiter->Wait({&QueueManager::instance()->for_querydb, &queue->do_id_map,
&queue->on_indexed});
}
}
@ -833,8 +832,7 @@ void LaunchStdinLoop(Config* config,
std::cin.tie(nullptr);
WorkThread::StartThread("stdin", [request_times]() {
IpcManager* ipc = IpcManager::instance();
auto* queue = QueueManager::instance();
std::unique_ptr<BaseIpcMessage> message =
MessageRegistry::instance()->ReadMessageFromStdin(
g_log_stdin_stdout_to_stderr);
@ -867,7 +865,7 @@ void LaunchStdinLoop(Config* config,
// loop to exit the thread. If we keep parsing input stdin is likely
// closed so cquery will exit.
LOG_S(INFO) << "cquery will exit when all threads are idle";
ipc->for_querydb.Enqueue(std::move(message));
queue->for_querydb.Enqueue(std::move(message));
return WorkThread::Result::ExitThread;
}
@ -899,7 +897,7 @@ void LaunchStdinLoop(Config* config,
case IpcId::CqueryDerived:
case IpcId::CqueryIndexFile:
case IpcId::CqueryQueryDbWaitForIdleIndexer: {
ipc->for_querydb.Enqueue(std::move(message));
queue->for_querydb.Enqueue(std::move(message));
break;
}
@ -915,15 +913,13 @@ void LaunchStdinLoop(Config* config,
}
void LaunchStdoutThread(std::unordered_map<IpcId, Timer>* request_times,
MultiQueueWaiter* waiter,
QueueManager* queue) {
MultiQueueWaiter* waiter) {
WorkThread::StartThread("stdout", [=]() {
IpcManager* ipc = IpcManager::instance();
auto* queue = QueueManager::instance();
std::vector<IpcManager::StdoutMessage> messages =
ipc->for_stdout.DequeueAll();
std::vector<Stdout_Request> messages = queue->for_stdout.DequeueAll();
if (messages.empty()) {
waiter->Wait({&ipc->for_stdout});
waiter->Wait({&queue->for_stdout});
return queue->HasWork() ? WorkThread::Result::MoreWork
: WorkThread::Result::NoWork;
}
@ -955,18 +951,17 @@ void LaunchStdoutThread(std::unordered_map<IpcId, Timer>* request_times,
void LanguageServerMain(const std::string& bin_name,
Config* config,
MultiQueueWaiter* waiter) {
QueueManager queue(waiter);
std::unordered_map<IpcId, Timer> request_times;
LaunchStdinLoop(config, &request_times);
// We run a dedicated thread for writing to stdout because there can be an
// unknown number of delays when output information.
LaunchStdoutThread(&request_times, waiter, &queue);
LaunchStdoutThread(&request_times, waiter);
// Start querydb which takes over this thread. The querydb will launch
// indexer threads as needed.
RunQueryDbThread(bin_name, config, waiter, &queue);
RunQueryDbThread(bin_name, config, waiter);
}
////////////////////////////////////////////////////////////////////////////////
@ -997,7 +992,7 @@ int main(int argc, char** argv) {
loguru::init(argc, argv);
MultiQueueWaiter waiter;
IpcManager::CreateInstance(&waiter);
QueueManager::CreateInstance(&waiter);
// bool loop = true;
// while (loop)

View File

@ -3,7 +3,7 @@
#include "config.h"
#include "file_consumer.h"
#include "import_manager.h"
#include "ipc_manager.h"
#include "queue_manager.h"
#include "project.h"
#include "semantic_highlight_symbol_cache.h"
#include "threaded_queue.h"
@ -16,7 +16,6 @@
bool QueryDb_ImportMain(Config* config,
QueryDatabase* db,
ImportManager* import_manager,
QueueManager* queue,
SemanticHighlightSymbolCache* semantic_cache,
WorkingFiles* working_files);
@ -26,5 +25,4 @@ WorkThread::Result IndexMain(Config* config,
ImportManager* import_manager,
Project* project,
WorkingFiles* working_files,
MultiQueueWaiter* waiter,
QueueManager* queue);
MultiQueueWaiter* waiter);

View File

@ -1,6 +1,6 @@
#include "match.h"
#include "ipc_manager.h"
#include "queue_manager.h"
#include "language_server_api.h"
#include <doctest/doctest.h>
@ -34,7 +34,7 @@ optional<Matcher> Matcher::Create(const std::string& search) {
out.params.type = lsMessageType::Error;
out.params.message = "cquery: Parsing EMCAScript regex \"" + search +
"\" failed; " + e.what();
IpcManager::WriteStdout(IpcId::Unknown, out);
QueueManager::WriteStdout(IpcId::Unknown, out);
return nullopt;
}
}

View File

@ -44,7 +44,7 @@ bool FindFileOrFail(QueryDatabase* db,
out.id = *id;
out.error.code = lsErrorCodes::InternalError;
out.error.message = "Unable to find file " + absolute_path;
IpcManager::WriteStdout(IpcId::Unknown, out);
QueueManager::WriteStdout(IpcId::Unknown, out);
}
return false;
@ -59,7 +59,7 @@ void EmitInactiveLines(WorkingFile* working_file,
if (ls_skipped)
out.params.inactiveRegions.push_back(*ls_skipped);
}
IpcManager::WriteStdout(IpcId::CqueryPublishInactiveRegions, out);
QueueManager::WriteStdout(IpcId::CqueryPublishInactiveRegions, out);
}
void EmitSemanticHighlighting(QueryDatabase* db,
@ -145,7 +145,7 @@ void EmitSemanticHighlighting(QueryDatabase* db,
out.params.uri = lsDocumentUri::FromPath(working_file->filename);
for (auto& entry : grouped_symbols)
out.params.symbols.push_back(entry.second);
IpcManager::WriteStdout(IpcId::CqueryPublishSemanticHighlighting, out);
QueueManager::WriteStdout(IpcId::CqueryPublishSemanticHighlighting, out);
}
bool ShouldIgnoreFileForIndexing(const std::string& path) {

View File

@ -6,7 +6,7 @@
#include "config.h"
#include "import_manager.h"
#include "include_complete.h"
#include "ipc_manager.h"
#include "queue_manager.h"
#include "project.h"
#include "query.h"
#include "semantic_highlight_symbol_cache.h"
@ -32,7 +32,6 @@ struct MessageHandler {
QueryDatabase* db = nullptr;
bool* exit_when_idle = nullptr;
MultiQueueWaiter* waiter = nullptr;
QueueManager* queue = nullptr;
Project* project = nullptr;
FileConsumer::SharedState* file_consumer_shared = nullptr;
ImportManager* import_manager = nullptr;

View File

@ -39,7 +39,7 @@ struct CqueryBaseHandler : BaseMessageHandler<Ipc_CqueryBase> {
out.result = GetLsLocations(db, working_files, locations);
}
}
IpcManager::WriteStdout(IpcId::CqueryBase, out);
QueueManager::WriteStdout(IpcId::CqueryBase, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryBaseHandler);

View File

@ -197,7 +197,7 @@ struct CqueryCallTreeInitialHandler
}
}
IpcManager::WriteStdout(IpcId::CqueryCallTreeInitial, out);
QueueManager::WriteStdout(IpcId::CqueryCallTreeInitial, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryCallTreeInitialHandler);
@ -212,7 +212,7 @@ struct CqueryCallTreeExpandHandler
if (func_id != db->usr_to_func.end())
out.result = BuildExpandCallTree(db, working_files, func_id->second);
IpcManager::WriteStdout(IpcId::CqueryCallTreeExpand, out);
QueueManager::WriteStdout(IpcId::CqueryCallTreeExpand, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryCallTreeExpandHandler);

View File

@ -37,7 +37,7 @@ struct CqueryCallersHandler : BaseMessageHandler<Ipc_CqueryCallers> {
out.result = GetLsLocations(db, working_files, locations);
}
}
IpcManager::WriteStdout(IpcId::CqueryCallers, out);
QueueManager::WriteStdout(IpcId::CqueryCallers, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryCallersHandler);

View File

@ -37,7 +37,7 @@ struct CqueryDerivedHandler : BaseMessageHandler<Ipc_CqueryDerived> {
out.result = GetLsLocations(db, working_files, locations);
}
}
IpcManager::WriteStdout(IpcId::CqueryDerived, out);
QueueManager::WriteStdout(IpcId::CqueryDerived, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryDerivedHandler);

View File

@ -37,6 +37,8 @@ struct CqueryFreshenIndexHandler : MessageHandler {
file_consumer_shared->Reset(file.def->path);
}
auto* queue = QueueManager::instance();
// Send index requests for every file.
project->ForAllFilteredFiles(config, [&](int i,
const Project::Entry& entry) {

View File

@ -23,7 +23,7 @@ REGISTER_IPC_MESSAGE(Ipc_CqueryIndexFile);
struct CqueryIndexFileHandler : BaseMessageHandler<Ipc_CqueryIndexFile> {
void Run(Ipc_CqueryIndexFile* request) override {
queue->index_request.Enqueue(Index_Request(
QueueManager::instance()->index_request.Enqueue(Index_Request(
NormalizePath(request->params.path), request->params.args,
request->params.is_interactive, request->params.contents));
}

View File

@ -21,9 +21,9 @@ struct CqueryQueryDbWaitForIdleIndexerHandler : MessageHandler {
while (true) {
bool has_work = false;
has_work |= import_manager->HasActiveQuerydbImports();
has_work |= queue->HasWork();
has_work |= QueryDb_ImportMain(config, db, import_manager, queue,
semantic_cache, working_files);
has_work |= QueueManager::instance()->HasWork();
has_work |= QueryDb_ImportMain(config, db, import_manager, semantic_cache,
working_files);
if (!has_work)
++idle_count;
else

View File

@ -190,7 +190,7 @@ struct CqueryTypeHierarchyTreeHandler
}
}
IpcManager::WriteStdout(IpcId::CqueryTypeHierarchyTree, out);
QueueManager::WriteStdout(IpcId::CqueryTypeHierarchyTree, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryTypeHierarchyTreeHandler);

View File

@ -32,7 +32,7 @@ struct CqueryVarsHandler : BaseMessageHandler<Ipc_CqueryVars> {
out.result = GetLsLocations(db, working_files, locations);
}
}
IpcManager::WriteStdout(IpcId::CqueryVars, out);
QueueManager::WriteStdout(IpcId::CqueryVars, out);
}
};
REGISTER_MESSAGE_HANDLER(CqueryVarsHandler);

View File

@ -141,7 +141,7 @@ struct InitializeHandler : BaseMessageHandler<Ipc_InitializeRequest> {
out.result.capabilities.documentLinkProvider = lsDocumentLinkOptions();
out.result.capabilities.documentLinkProvider->resolveProvider = false;
IpcManager::WriteStdout(IpcId::Initialize, out);
QueueManager::WriteStdout(IpcId::Initialize, out);
// Set project root.
config->projectRoot = NormalizePath(request->params.rootUri->GetPath());
@ -174,8 +174,7 @@ struct InitializeHandler : BaseMessageHandler<Ipc_InitializeRequest> {
for (int i = 0; i < config->indexerCount; ++i) {
WorkThread::StartThread("indexer" + std::to_string(i), [=]() {
return IndexMain(config, file_consumer_shared, timestamp_manager,
import_manager, project, working_files, waiter,
queue);
import_manager, project, working_files, waiter);
});
}
@ -183,6 +182,7 @@ struct InitializeHandler : BaseMessageHandler<Ipc_InitializeRequest> {
// files, because that takes a long time.
include_complete->Rescan();
auto* queue = QueueManager::instance();
time.Reset();
project->ForAllFilteredFiles(
config, [&](int i, const Project::Entry& entry) {

View File

@ -531,7 +531,7 @@ struct TextDocumentCodeActionHandler
}
}
IpcManager::WriteStdout(IpcId::TextDocumentCodeAction, out);
QueueManager::WriteStdout(IpcId::TextDocumentCodeAction, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentCodeActionHandler);

View File

@ -257,7 +257,7 @@ struct TextDocumentCodeLensHandler
};
}
IpcManager::WriteStdout(IpcId::TextDocumentCodeLens, out);
QueueManager::WriteStdout(IpcId::TextDocumentCodeLens, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentCodeLensHandler);

View File

@ -145,7 +145,7 @@ struct TextDocumentCompletionHandler : MessageHandler {
}
FilterCompletionResponse(&out, buffer_line);
IpcManager::WriteStdout(IpcId::TextDocumentCompletion, out);
QueueManager::WriteStdout(IpcId::TextDocumentCompletion, out);
} else {
bool is_global_completion = false;
std::string existing_completion;
@ -165,7 +165,7 @@ struct TextDocumentCompletionHandler : MessageHandler {
// Emit completion results.
FilterCompletionResponse(&out, existing_completion);
IpcManager::WriteStdout(IpcId::TextDocumentCompletion, out);
QueueManager::WriteStdout(IpcId::TextDocumentCompletion, out);
// Cache completion results.
if (!is_cached_result) {

View File

@ -128,7 +128,7 @@ struct TextDocumentDefinitionHandler
}
}
IpcManager::WriteStdout(IpcId::TextDocumentDefinition, out);
QueueManager::WriteStdout(IpcId::TextDocumentDefinition, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentDefinitionHandler);

View File

@ -21,7 +21,7 @@ struct TextDocumentDidCloseHandler
// Clear any diagnostics for the file.
Out_TextDocumentPublishDiagnostics out;
out.params.uri = request->params.textDocument.uri;
IpcManager::WriteStdout(IpcId::TextDocumentPublishDiagnostics, out);
QueueManager::WriteStdout(IpcId::TextDocumentPublishDiagnostics, out);
// Remove internal state.
working_files->OnClose(request->params.textDocument);

View File

@ -52,7 +52,7 @@ struct TextDocumentDidOpenHandler
// Submit new index request.
const Project::Entry& entry = project->FindCompilationEntryForFile(path);
queue->index_request.PriorityEnqueue(Index_Request(
QueueManager::instance()->index_request.PriorityEnqueue(Index_Request(
entry.filename, entry.args, true /*is_interactive*/, nullopt));
}
};

View File

@ -39,7 +39,7 @@ struct TextDocumentDidSaveHandler
// if so, ignore that index response.
// TODO: send as priority request
Project::Entry entry = project->FindCompilationEntryForFile(path);
queue->index_request.Enqueue(Index_Request(
QueueManager::instance()->index_request.Enqueue(Index_Request(
entry.filename, entry.args, true /*is_interactive*/, nullopt));
clang_complete->NotifySave(path);

View File

@ -80,7 +80,7 @@ struct TextDocumentDocumentLinkHandler
}
}
IpcManager::WriteStdout(IpcId::TextDocumentDocumentLink, out);
QueueManager::WriteStdout(IpcId::TextDocumentDocumentLink, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentDocumentLinkHandler);

View File

@ -49,7 +49,7 @@ struct TextDocumentDocumentSymbolHandler
out.result.push_back(*info);
}
IpcManager::WriteStdout(IpcId::TextDocumentDocumentSymbol, out);
QueueManager::WriteStdout(IpcId::TextDocumentDocumentSymbol, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentDocumentSymbolHandler);

View File

@ -58,7 +58,7 @@ struct TextDocumentDocumentHighlightHandler
break;
}
IpcManager::WriteStdout(IpcId::TextDocumentDocumentHighlight, out);
QueueManager::WriteStdout(IpcId::TextDocumentDocumentHighlight, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentDocumentHighlightHandler);

View File

@ -107,7 +107,7 @@ struct TextDocumentHoverHandler : BaseMessageHandler<Ipc_TextDocumentHover> {
}
}
IpcManager::WriteStdout(IpcId::TextDocumentHover, out);
QueueManager::WriteStdout(IpcId::TextDocumentHover, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentHoverHandler);

View File

@ -75,7 +75,7 @@ struct TextDocumentReferencesHandler
break;
}
IpcManager::WriteStdout(IpcId::TextDocumentReferences, out);
QueueManager::WriteStdout(IpcId::TextDocumentReferences, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentReferencesHandler);

View File

@ -104,7 +104,7 @@ struct TextDocumentRenameHandler : BaseMessageHandler<Ipc_TextDocumentRename> {
break;
}
IpcManager::WriteStdout(IpcId::TextDocumentRename, out);
QueueManager::WriteStdout(IpcId::TextDocumentRename, out);
}
};
REGISTER_MESSAGE_HANDLER(TextDocumentRenameHandler);

View File

@ -135,7 +135,7 @@ struct TextDocumentSignatureHelpHandler : MessageHandler {
out.result.activeParameter = active_param;
Timer timer;
IpcManager::WriteStdout(IpcId::TextDocumentSignatureHelp, out);
QueueManager::WriteStdout(IpcId::TextDocumentSignatureHelp, out);
if (!is_cached_result) {
signature_cache->WithLock([&]() {

View File

@ -97,7 +97,7 @@ struct WorkspaceSymbolHandler : BaseMessageHandler<Ipc_WorkspaceSymbol> {
LOG_S(INFO) << "[querydb] Found " << out.result.size()
<< " results for query " << query;
IpcManager::WriteStdout(IpcId::WorkspaceSymbol, out);
QueueManager::WriteStdout(IpcId::WorkspaceSymbol, out);
}
};
REGISTER_MESSAGE_HANDLER(WorkspaceSymbolHandler);

View File

@ -1,36 +1,10 @@
#include "ipc_manager.h"
#include "queue_manager.h"
#include "language_server_api.h"
#include "query.h"
#include <sstream>
IpcManager* IpcManager::instance_ = nullptr;
// static
IpcManager* IpcManager::instance() {
return instance_;
}
// static
void IpcManager::CreateInstance(MultiQueueWaiter* waiter) {
instance_ = new IpcManager(waiter);
}
// static
void IpcManager::WriteStdout(IpcId id, lsBaseOutMessage& response) {
std::ostringstream sstream;
response.Write(sstream);
StdoutMessage out;
out.content = sstream.str();
out.id = id;
instance()->for_stdout.Enqueue(std::move(out));
}
IpcManager::IpcManager(MultiQueueWaiter* waiter)
: for_stdout(waiter), for_querydb(waiter) {}
Index_Request::Index_Request(const std::string& path,
const std::vector<std::string>& args,
bool is_interactive,
@ -66,8 +40,33 @@ Index_OnIndexed::Index_OnIndexed(IndexUpdate& update,
PerformanceImportFile perf)
: update(update), perf(perf) {}
QueueManager* QueueManager::instance_ = nullptr;
// static
QueueManager* QueueManager::instance() {
return instance_;
}
// static
void QueueManager::CreateInstance(MultiQueueWaiter* waiter) {
instance_ = new QueueManager(waiter);
}
// static
void QueueManager::WriteStdout(IpcId id, lsBaseOutMessage& response) {
std::ostringstream sstream;
response.Write(sstream);
Stdout_Request out;
out.content = sstream.str();
out.id = id;
instance()->for_stdout.Enqueue(std::move(out));
}
QueueManager::QueueManager(MultiQueueWaiter* waiter)
: index_request(waiter),
: for_stdout(waiter),
for_querydb(waiter),
index_request(waiter),
do_id_map(waiter),
load_previous_index(waiter),
on_id_mapped(waiter),

View File

@ -7,28 +7,11 @@
#include <memory>
// TODO/FIXME: Merge IpcManager and QueueManager.
struct lsBaseOutMessage;
struct IpcManager {
struct StdoutMessage {
struct Stdout_Request {
IpcId id;
std::string content;
};
ThreadedQueue<StdoutMessage> for_stdout;
ThreadedQueue<std::unique_ptr<BaseIpcMessage>> for_querydb;
static IpcManager* instance();
static void CreateInstance(MultiQueueWaiter* waiter);
static void WriteStdout(IpcId id, lsBaseOutMessage& response);
private:
explicit IpcManager(MultiQueueWaiter* waiter);
static IpcManager* instance_;
};
struct Index_Request {
@ -87,18 +70,26 @@ struct Index_OnIndexed {
};
struct QueueManager {
using Index_RequestQueue = ThreadedQueue<Index_Request>;
using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>;
using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>;
Index_RequestQueue index_request;
Index_DoIdMapQueue do_id_map;
Index_DoIdMapQueue load_previous_index;
Index_OnIdMappedQueue on_id_mapped;
Index_OnIndexedQueue on_indexed;
QueueManager(MultiQueueWaiter* waiter);
static QueueManager* instance();
static void CreateInstance(MultiQueueWaiter* waiter);
static void WriteStdout(IpcId id, lsBaseOutMessage& response);
bool HasWork();
// Runs on stdout thread.
ThreadedQueue<Stdout_Request> for_stdout;
// Runs on querydb thread.
ThreadedQueue<std::unique_ptr<BaseIpcMessage>> for_querydb;
// Runs on indexer threads.
ThreadedQueue<Index_Request> index_request;
ThreadedQueue<Index_DoIdMap> do_id_map;
ThreadedQueue<Index_DoIdMap> load_previous_index;
ThreadedQueue<Index_OnIdMapped> on_id_mapped;
ThreadedQueue<Index_OnIndexed> on_indexed;
private:
explicit QueueManager(MultiQueueWaiter* waiter);
static QueueManager* instance_;
};