Simplify threading model a bit.

This commit is contained in:
Jacob Dufault 2017-04-23 13:19:09 -07:00
parent 4f57b711bb
commit 7741991b72
5 changed files with 515 additions and 573 deletions

View File

@ -1102,8 +1102,6 @@ void IndexMain(
// better icache behavior. We need to have some threads spinning on both though
// otherwise memory usage will get bad.
int count = 0;
// We need to make sure to run both IndexMain_DoIndex and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
@ -1112,10 +1110,9 @@ void IndexMain(
bool did_create_update = IndexMain_DoCreateIndexUpdate(queue_on_id_mapped, queue_on_indexed);
if (!did_index && !did_create_update) {
//if (count++ > 2) {
// count = 0;
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
IndexJoinIndexUpdates(queue_on_indexed);
//}
// TODO: use CV to wakeup?
std::this_thread::sleep_for(std::chrono::milliseconds(25));
@ -1170,7 +1167,6 @@ void IndexMain(
void QueryDbMainLoop(
@ -1181,6 +1177,7 @@ void QueryDbMainLoop(
Index_OnIdMappedQueue* queue_on_id_mapped,
Index_OnIndexedQueue* queue_on_indexed,
Project* project,
FileConsumer::SharedState* file_consumer_shared,
WorkingFiles* working_files,
CompletionManager* completion_manager) {
IpcManager* ipc = IpcManager::instance();
@ -1190,23 +1187,44 @@ void QueryDbMainLoop(
std::cerr << "[querydb] Processing message " << IpcIdToString(message->method_id) << std::endl;
switch (message->method_id) {
case IpcId::Quit: {
std::cerr << "[querydb] Got quit message (exiting)" << std::endl;
exit(0);
break;
case IpcId::Initialize: {
auto request = static_cast<Ipc_InitializeRequest*>(message.get());
if (request->params.rootUri) {
std::string project_path = request->params.rootUri->GetPath();
std::cerr << "[stdin] Initialize in directory " << project_path
<< " with uri " << request->params.rootUri->raw_uri
<< std::endl;
if (!request->params.initializationOptions) {
std::cerr << "Initialization parameters (particularily cacheDirectory) are required" << std::endl;
exit(1);
}
case IpcId::IsAlive: {
std::cerr << "[querydb] Sending IsAlive response to client" << std::endl;
ipc->SendMessage(IpcManager::Destination::Client, MakeUnique<Ipc_IsAlive>());
break;
*config = *request->params.initializationOptions;
// Make sure cache directory is valid.
if (config->cacheDirectory.empty()) {
std::cerr << "No cache directory" << std::endl;
exit(1);
}
config->cacheDirectory = NormalizePath(config->cacheDirectory);
if (config->cacheDirectory[config->cacheDirectory.size() - 1] != '/')
config->cacheDirectory += '/';
MakeDirectoryRecursive(config->cacheDirectory);
// Start indexer threads.
int indexer_count = std::max<int>(std::thread::hardware_concurrency(), 2) - 1;
if (config->indexerCount > 0)
indexer_count = config->indexerCount;
std::cerr << "[querydb] Starting " << indexer_count << " indexers" << std::endl;
for (int i = 0; i < indexer_count; ++i) {
new std::thread([&]() {
IndexMain(config, file_consumer_shared, project, queue_do_index, queue_do_id_map, queue_on_id_mapped, queue_on_indexed);
});
}
case IpcId::OpenProject: {
Ipc_OpenProject* msg = static_cast<Ipc_OpenProject*>(message.get());
std::string path = msg->project_path;
project->Load(path);
// Open up / load the project.
project->Load(project_path);
std::cerr << "Loaded compilation entries (" << project->entries.size() << " files)" << std::endl;
project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) {
@ -1216,7 +1234,39 @@ void QueryDbMainLoop(
queue_do_index->Enqueue(Index_DoIndex(Index_DoIndex::Type::ImportAndUpdate, entry.filename, entry.args));
});
}
// TODO: query request->params.capabilities.textDocument and support only things
// the client supports.
auto response = Out_InitializeResponse();
response.id = request->id;
//response.result.capabilities.textDocumentSync = lsTextDocumentSyncOptions();
//response.result.capabilities.textDocumentSync->openClose = true;
//response.result.capabilities.textDocumentSync->change = lsTextDocumentSyncKind::Full;
//response.result.capabilities.textDocumentSync->willSave = true;
//response.result.capabilities.textDocumentSync->willSaveWaitUntil = true;
response.result.capabilities.textDocumentSync = lsTextDocumentSyncKind::Incremental;
response.result.capabilities.renameProvider = true;
response.result.capabilities.completionProvider = lsCompletionOptions();
response.result.capabilities.completionProvider->resolveProvider = false;
response.result.capabilities.completionProvider->triggerCharacters = { ".", "::", "->" };
response.result.capabilities.codeLensProvider = lsCodeLensOptions();
response.result.capabilities.codeLensProvider->resolveProvider = false;
response.result.capabilities.definitionProvider = true;
response.result.capabilities.documentHighlightProvider = true;
response.result.capabilities.hoverProvider = true;
response.result.capabilities.referencesProvider = true;
response.result.capabilities.documentSymbolProvider = true;
response.result.capabilities.workspaceSymbolProvider = true;
ipc->SendOutMessageToClient(IpcId::Initialize, response);
break;
}
@ -1747,22 +1797,11 @@ void QueryDbMain(IndexerConfig* config) {
CompletionManager completion_manager(config, &project, &working_files);
FileConsumer::SharedState file_consumer_shared;
// Start indexer threads.
int indexer_count = std::max<int>(std::thread::hardware_concurrency(), 2) - 1;
if (config->indexerCount > 0)
indexer_count = config->indexerCount;
std::cerr << "[querydb] Starting " << indexer_count << " indexers" << std::endl;
for (int i = 0; i < indexer_count; ++i) {
new std::thread([&]() {
IndexMain(config, &file_consumer_shared, &project, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed);
});
}
// Run query db main loop.
SetCurrentThreadName("querydb");
QueryDatabase db;
while (true) {
QueryDbMainLoop(config, &db, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager);
QueryDbMainLoop(config, &db, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &file_consumer_shared, &working_files, &completion_manager);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
@ -1852,78 +1891,6 @@ void LanguageServerStdinLoop(IndexerConfig* config, std::unordered_map<IpcId, Ti
std::cerr << "[stdin] Got message \"" << IpcIdToString(message->method_id) << '"' << std::endl;
switch (message->method_id) {
// TODO: For simplicitly lets just proxy the initialize request like
// all other requests so that stdin loop thread becomes super simple.
case IpcId::Initialize: {
auto request = static_cast<Ipc_InitializeRequest*>(message.get());
if (request->params.rootUri) {
std::string project_path = request->params.rootUri->GetPath();
std::cerr << "[stdin] Initialize in directory " << project_path
<< " with uri " << request->params.rootUri->raw_uri
<< std::endl;
auto open_project = MakeUnique<Ipc_OpenProject>();
open_project->project_path = project_path;
if (!request->params.initializationOptions) {
std::cerr << "Initialization parameters (particularily cacheDirectory) are required" << std::endl;
exit(1);
}
*config = *request->params.initializationOptions;
// Make sure cache directory is valid.
if (config->cacheDirectory.empty()) {
std::cerr << "No cache directory" << std::endl;
exit(1);
}
config->cacheDirectory = NormalizePath(config->cacheDirectory);
if (config->cacheDirectory[config->cacheDirectory.size() - 1] != '/')
config->cacheDirectory += '/';
MakeDirectoryRecursive(config->cacheDirectory);
// Startup querydb now that we have initialization state.
new std::thread([&config]() {
QueryDbMain(config);
});
ipc->SendMessage(IpcManager::Destination::Server, std::move(open_project));
}
// TODO: query request->params.capabilities.textDocument and support only things
// the client supports.
auto response = Out_InitializeResponse();
response.id = request->id;
//response.result.capabilities.textDocumentSync = lsTextDocumentSyncOptions();
//response.result.capabilities.textDocumentSync->openClose = true;
//response.result.capabilities.textDocumentSync->change = lsTextDocumentSyncKind::Full;
//response.result.capabilities.textDocumentSync->willSave = true;
//response.result.capabilities.textDocumentSync->willSaveWaitUntil = true;
response.result.capabilities.textDocumentSync = lsTextDocumentSyncKind::Incremental;
response.result.capabilities.renameProvider = true;
response.result.capabilities.completionProvider = lsCompletionOptions();
response.result.capabilities.completionProvider->resolveProvider = false;
response.result.capabilities.completionProvider->triggerCharacters = { ".", "::", "->" };
response.result.capabilities.codeLensProvider = lsCodeLensOptions();
response.result.capabilities.codeLensProvider->resolveProvider = false;
response.result.capabilities.definitionProvider = true;
response.result.capabilities.documentHighlightProvider = true;
response.result.capabilities.hoverProvider = true;
response.result.capabilities.referencesProvider = true;
response.result.capabilities.documentSymbolProvider = true;
response.result.capabilities.workspaceSymbolProvider = true;
//response.Write(std::cerr);
response.Write(std::cout);
break;
}
case IpcId::Initialized: {
// TODO: don't send output until we get this notification
break;
@ -1934,6 +1901,7 @@ void LanguageServerStdinLoop(IndexerConfig* config, std::unordered_map<IpcId, Ti
break;
}
case IpcId::Initialize:
case IpcId::TextDocumentDidOpen:
case IpcId::TextDocumentDidChange:
case IpcId::TextDocumentDidClose:
@ -2011,15 +1979,9 @@ void LanguageServerMainLoop(std::unordered_map<IpcId, Timer>* request_times) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Client);
for (auto& message : messages) {
std::cerr << "[server] Processing message " << IpcIdToString(message->method_id) << std::endl;
std::cerr << "[stdout] Processing message " << IpcIdToString(message->method_id) << std::endl;
switch (message->method_id) {
case IpcId::Quit: {
std::cerr << "[server] Got quit message (exiting)" << std::endl;
exit(0);
break;
}
case IpcId::Cout: {
auto msg = static_cast<Ipc_Cout*>(message.get());
@ -2032,7 +1994,7 @@ void LanguageServerMainLoop(std::unordered_map<IpcId, Timer>* request_times) {
}
default: {
std::cerr << "[server] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
std::cerr << "[stdout] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
exit(1);
}
}
@ -2042,12 +2004,20 @@ void LanguageServerMainLoop(std::unordered_map<IpcId, Timer>* request_times) {
void LanguageServerMain(IndexerConfig* config) {
std::unordered_map<IpcId, Timer> request_times;
// Run language client.
// Start stdin reader. Reading from stdin is a blocking operation so this
// needs a dedicated thread.
new std::thread([&]() {
LanguageServerStdinLoop(config, &request_times);
});
SetCurrentThreadName("server");
// Start querydb thread. querydb will start indexer threads as needed.
new std::thread([&config]() {
QueryDbMain(config);
});
// We run a dedicated thread for writing to stdout because there can be an
// unknown number of delays when output information.
SetCurrentThreadName("stdout");
while (true) {
LanguageServerMainLoop(&request_times);
std::this_thread::sleep_for(std::chrono::milliseconds(2));

View File

@ -43,12 +43,6 @@ const char* IpcIdToString(IpcId id) {
return "$cquery/freshenIndex";
}
case IpcId::Quit:
return "$quit";
case IpcId::IsAlive:
return "$isAlive";
case IpcId::OpenProject:
return "$openProject";
case IpcId::Cout:
return "$cout";
default:

View File

@ -29,9 +29,6 @@ enum class IpcId : int {
CqueryFreshenIndex,
// Internal implementation detail.
Quit,
IsAlive,
OpenProject,
Cout
};
MAKE_ENUM_HASHABLE(IpcId)
@ -48,22 +45,6 @@ struct IpcMessage : public BaseIpcMessage {
IpcMessage() : BaseIpcMessage(T::kIpcId) {}
};
struct Ipc_Quit : public IpcMessage<Ipc_Quit> {
static constexpr IpcId kIpcId = IpcId::Quit;
};
MAKE_REFLECT_EMPTY_STRUCT(Ipc_Quit);
struct Ipc_IsAlive : public IpcMessage<Ipc_IsAlive> {
static constexpr IpcId kIpcId = IpcId::IsAlive;
};
MAKE_REFLECT_EMPTY_STRUCT(Ipc_IsAlive);
struct Ipc_OpenProject : public IpcMessage<Ipc_OpenProject> {
static constexpr IpcId kIpcId = IpcId::OpenProject;
std::string project_path;
};
MAKE_REFLECT_STRUCT(Ipc_OpenProject, project_path);
struct Ipc_Cout : public IpcMessage<Ipc_Cout> {
static constexpr IpcId kIpcId = IpcId::Cout;
std::string content;

View File

@ -149,8 +149,6 @@ std::string NormalizePath(const std::string& path) {
}
bool TryMakeDirectory(const std::string& absolute_path) {
std::cerr << "!! TryMakeDirectory " << absolute_path << std::endl;
const mode_t kMode = 0777; // UNIX style permissions
if (mkdir(absolute_path.c_str(), kMode) == -1) {
// Success if the directory exists.

View File

@ -150,7 +150,6 @@ std::string NormalizePath(const std::string& path) {
}
bool TryMakeDirectory(const std::string& absolute_path) {
std::cerr << "!! TryMakeDirectory " << absolute_path << std::endl;
if (_mkdir(absolute_path.c_str()) == -1) {
// Success if the directory exists.
return errno == EEXIST;