From 724d8cc3f41473f3af1d7af1388a3c2102354453 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Sun, 16 Apr 2017 12:02:29 -0700 Subject: [PATCH] Refactor ipc behind a proxy type so we can (eventually) bypass all serialization when running fully in-process. --- src/command_line.cc | 521 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 436 insertions(+), 85 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 2e32d154..e2694ea7 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -33,11 +33,197 @@ namespace { -const char* kIpcLanguageClientName = "language_client"; - const int kNumIndexers = 8 - 1; -const int kQueueSizeBytes = 1024 * 8; const int kMaxWorkspaceSearchResults = 1000; +const bool kUseMultipleProcesses = true; + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +struct IpcManager { + // TODO: Rename TypedBidiMessageQueue to IpcTransport? + using IpcMessageQueue = TypedBidiMessageQueue; + + static constexpr const char* kIpcLanguageClientName = "language_client"; + static constexpr const int kQueueSizeBytes = 1024 * 8; + + std::unique_ptr>> threaded_queue_; + std::unique_ptr ipc_queue_; + + IpcManager() { + ipc_queue_ = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes); + } + + + + template + void SendOutMessageToClient(T& response) { + std::ostringstream sstream; + response.Write(sstream); + + Ipc_Cout out; + out.content = sstream.str(); + ipc_queue_->SendMessage(&ipc_queue_->for_client, Ipc_Cout::kIpcId, out); + } + + enum class Destination { + Client, Server + }; + template + void SendMessageWithId(Destination destination, TId id, TMessage& message) { + ipc_queue_->SendMessage( + destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server, + id, + message); + } + template + void SendMessage(Destination destination, TMessage& message) { + SendMessageWithId(destination, TMessage::kIpcId, message); + } + + template + std::vector> GetMessages(Destination destination) { + return ipc_queue_->GetMessages(destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server); + } + + private: + template + void RegisterId(IpcMessageQueue* t) { + t->RegisterId(T::kIpcId, + [](Writer& visitor, BaseIpcMessage& message) { + T& m = static_cast(message); + Reflect(visitor, m); + }, [](Reader& visitor) { + auto m = MakeUnique(); + Reflect(visitor, *m); + return m; + }); + } + + std::unique_ptr BuildIpcMessageQueue(const std::string& name, size_t buffer_size) { + auto ipc = MakeUnique(name, buffer_size); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + RegisterId(ipc.get()); + return ipc; + } +}; + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + void PushBack(NonElidedVector* result, optional location) { if (location) @@ -501,6 +687,18 @@ std::vector FindSymbolsAtLocation(QueryFile* file, lsPosition positio + + + + + + + + + + + + @@ -549,67 +747,11 @@ struct Index_OnIndexed { explicit Index_OnIndexed(IndexUpdate& update) : update(update) {} }; -// TODO: Rename TypedBidiMessageQueue to IpcTransport? -using IpcMessageQueue = TypedBidiMessageQueue; using Index_DoIndexQueue = ThreadedQueue; using Index_DoIdMapQueue = ThreadedQueue; using Index_OnIdMappedQueue = ThreadedQueue; using Index_OnIndexedQueue = ThreadedQueue; -template -void SendMessage(IpcMessageQueue& t, MessageQueue* destination, TMessage& message) { - t.SendMessage(destination, TMessage::kIpcId, message); -} - -template -void SendOutMessageToClient(IpcMessageQueue* queue, T& response) { - std::ostringstream sstream; - response.Write(sstream); - - Ipc_Cout out; - out.content = sstream.str(); - queue->SendMessage(&queue->for_client, Ipc_Cout::kIpcId, out); -} - -template -void RegisterId(IpcMessageQueue* t) { - t->RegisterId(T::kIpcId, - [](Writer& visitor, BaseIpcMessage& message) { - T& m = static_cast(message); - Reflect(visitor, m); - }, [](Reader& visitor) { - auto m = MakeUnique(); - Reflect(visitor, *m); - return m; - }); -} - -std::unique_ptr BuildIpcMessageQueue(const std::string& name, size_t buffer_size) { - auto ipc = MakeUnique(name, buffer_size); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - RegisterId(ipc.get()); - return ipc; -} - void RegisterMessageTypes() { MessageRegistry::instance()->Register(); MessageRegistry::instance()->Register(); @@ -630,6 +772,39 @@ void RegisterMessageTypes() { MessageRegistry::instance()->Register(); } + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + bool IndexMain_DoIndex(FileConsumer::SharedState* file_consumer_shared, Index_DoIndexQueue* queue_do_index, Index_DoIdMapQueue* queue_do_id_map) { @@ -780,6 +955,22 @@ void IndexMain( + + + + + + + + + + + + + + + + @@ -799,7 +990,7 @@ void IndexMain( void QueryDbMainLoop( QueryDatabase* db, - IpcMessageQueue* language_client, + IpcManager* language_client, Index_DoIndexQueue* queue_do_index, Index_DoIdMapQueue* queue_do_id_map, Index_OnIdMappedQueue* queue_on_id_mapped, @@ -808,7 +999,7 @@ void QueryDbMainLoop( WorkingFiles* working_files, CompletionManager* completion_manager) { - std::vector> messages = language_client->GetMessages(&language_client->for_server); + std::vector> messages = language_client->GetMessages(IpcManager::Destination::Server); for (auto& message : messages) { //std::cerr << "[querydb] Processing message " << static_cast(message->method_id) << std::endl; @@ -821,7 +1012,7 @@ void QueryDbMainLoop( case IpcId::IsAlive: { Ipc_IsAlive response; - language_client->SendMessage(&language_client->for_client, response.method_id, response); + language_client->SendMessageWithId(IpcManager::Destination::Client, response.method_id, response); break; } @@ -908,7 +1099,7 @@ void QueryDbMainLoop( } response.Write(std::cerr); - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -986,7 +1177,7 @@ void QueryDbMainLoop( break; } - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1022,7 +1213,7 @@ void QueryDbMainLoop( break; } - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1048,7 +1239,7 @@ void QueryDbMainLoop( break; } - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1085,7 +1276,7 @@ void QueryDbMainLoop( break; } - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1111,7 +1302,7 @@ void QueryDbMainLoop( response.result.push_back(info); } - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1205,7 +1396,7 @@ void QueryDbMainLoop( }; } - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1245,7 +1436,7 @@ void QueryDbMainLoop( } std::cerr << "- Found " << response.result.size() << " results for query " << query << std::endl; - SendOutMessageToClient(language_client, response); + language_client->SendOutMessageToClient(response); break; } @@ -1297,7 +1488,7 @@ void QueryDbMain() { //std::cerr << "Running QueryDb" << std::endl; // Create queues. - std::unique_ptr ipc = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes); + IpcManager ipc; Index_DoIndexQueue queue_do_index; Index_DoIdMapQueue queue_do_id_map; Index_OnIdMappedQueue queue_on_id_mapped; @@ -1318,11 +1509,73 @@ void QueryDbMain() { // Run query db main loop. QueryDatabase db; while (true) { - QueryDbMainLoop(&db, ipc.get(), &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager); + QueryDbMainLoop(&db, &ipc, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + // TODO: global lock on stderr output. // Separate thread whose only job is to read from stdin and @@ -1331,7 +1584,7 @@ void QueryDbMain() { // blocks. // // |ipc| is connected to a server. -void LanguageServerStdinLoop(IpcMessageQueue* ipc) { +void LanguageServerStdinLoop(IpcManager* ipc) { while (true) { std::unique_ptr message = MessageRegistry::instance()->ReadMessageFromStdin(); @@ -1353,7 +1606,7 @@ void LanguageServerStdinLoop(IpcMessageQueue* ipc) { << std::endl; Ipc_OpenProject open_project; open_project.project_path = project_path; - ipc->SendMessage(&ipc->for_server, Ipc_OpenProject::kIpcId, open_project); + ipc->SendMessageWithId(IpcManager::Destination::Server, Ipc_OpenProject::kIpcId, open_project); } // TODO: query request->params.capabilities.textDocument and support only things @@ -1415,7 +1668,7 @@ void LanguageServerStdinLoop(IpcMessageQueue* ipc) { case IpcId::TextDocumentCodeLens: case IpcId::WorkspaceSymbol: { //std::cerr << "Sending message " << (int)message->method_id << std::endl; - ipc->SendMessage(&ipc->for_server, message->method_id, *message.get()); + ipc->SendMessageWithId(IpcManager::Destination::Server, message->method_id, *message.get()); break; } @@ -1428,8 +1681,8 @@ void LanguageServerStdinLoop(IpcMessageQueue* ipc) { } } -void LanguageServerMainLoop(IpcMessageQueue* ipc) { - std::vector> messages = ipc->GetMessages(&ipc->for_client); +void LanguageServerMainLoop(IpcManager* ipc) { + std::vector> messages = ipc->GetMessages(IpcManager::Destination::Client); for (auto& message : messages) { switch (message->method_id) { case IpcId::Quit: { @@ -1454,16 +1707,63 @@ void LanguageServerMainLoop(IpcMessageQueue* ipc) { } } -bool IsQueryDbProcessRunning(IpcMessageQueue* ipc) { + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +bool IsQueryDbProcessRunning(IpcManager* ipc) { // Emit an alive check. Sleep so the server has time to respond. Ipc_IsAlive check_alive; - SendMessage(*ipc, &ipc->for_server, check_alive); + ipc->SendMessage(IpcManager::Destination::Server, check_alive); // TODO: Tune this value or make it configurable. std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Check if we got an IsAlive message back. - std::vector> messages = ipc->GetMessages(&ipc->for_client); + std::vector> messages = ipc->GetMessages(IpcManager::Destination::Client); for (auto& message : messages) { if (IpcId::IsAlive == message->method_id) return true; @@ -1473,12 +1773,12 @@ bool IsQueryDbProcessRunning(IpcMessageQueue* ipc) { } void LanguageServerMain(std::string process_name) { - std::unique_ptr ipc = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes); + IpcManager ipc; // Discard any left-over messages from previous runs. - ipc->GetMessages(&ipc->for_client); + ipc.GetMessages(IpcManager::Destination::Client); - bool has_server = IsQueryDbProcessRunning(ipc.get()); + bool has_server = IsQueryDbProcessRunning(&ipc); // No server is running. Start it in-process. If the user wants to run the // server out of process they have to start it themselves. @@ -1487,14 +1787,65 @@ void LanguageServerMain(std::string process_name) { } // Run language client. - new std::thread(&LanguageServerStdinLoop, ipc.get()); + new std::thread(&LanguageServerStdinLoop, &ipc); while (true) { - LanguageServerMainLoop(ipc.get()); + LanguageServerMainLoop(&ipc); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + int main(int argc, char** argv) { + // TODO: Move to IndexInit(), remove clang-c include. clang_enableStackTraces(); clang_toggleCrashRecovery(1);