diff --git a/src/command_line.cc b/src/command_line.cc index 40d37708..beb72729 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -35,7 +35,7 @@ namespace { const int kNumIndexers = 8 - 1; const int kMaxWorkspaceSearchResults = 1000; -const bool kUseMultipleProcesses = true; +const bool kUseMultipleProcesses = false; @@ -87,43 +87,71 @@ struct IpcManager { static constexpr const char* kIpcLanguageClientName = "language_client"; static constexpr const int kQueueSizeBytes = 1024 * 8; - std::unique_ptr>> threaded_queue_; - std::unique_ptr ipc_queue_; - - IpcManager() { - ipc_queue_ = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes); + static IpcManager* instance_; + static IpcManager* instance() { + if (!instance_) + instance_ = new IpcManager(); + return instance_; } + std::unique_ptr>> threaded_queue_for_client_; + std::unique_ptr>> threaded_queue_for_server_; + std::unique_ptr ipc_queue_; + enum class Destination { + Client, Server + }; + + MessageQueue* GetMessageQueue(Destination destination) { + assert(kUseMultipleProcesses); + return destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server; + } + ThreadedQueue>* GetThreadedQueue(Destination destination) { + assert(!kUseMultipleProcesses); + return destination == Destination::Client ? threaded_queue_for_client_.get() : threaded_queue_for_server_.get(); + } void SendOutMessageToClient(lsBaseOutMessage& response) { std::ostringstream sstream; response.Write(sstream); - Ipc_Cout out; - out.content = sstream.str(); - ipc_queue_->SendMessage(&ipc_queue_->for_client, Ipc_Cout::kIpcId, out); + if (kUseMultipleProcesses) { + Ipc_Cout out; + out.content = sstream.str(); + ipc_queue_->SendMessage(&ipc_queue_->for_client, Ipc_Cout::kIpcId, out); + } + else { + auto out = MakeUnique(); + out->content = sstream.str(); + GetThreadedQueue(Destination::Client)->Enqueue(std::move(out)); + } } - enum class Destination { - Client, Server - }; - void SendMessageWithId(Destination destination, IpcId id, BaseIpcMessage& message) { - ipc_queue_->SendMessage( - destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server, - id, - message); - } - template - void SendMessage(Destination destination, TMessage& message) { - SendMessageWithId(destination, TMessage::kIpcId, message); + void SendMessage(Destination destination, std::unique_ptr message) { + if (kUseMultipleProcesses) + ipc_queue_->SendMessage(GetMessageQueue(destination), message->method_id, *message); + else + GetThreadedQueue(destination)->Enqueue(std::move(message)); } std::vector> GetMessages(Destination destination) { - return ipc_queue_->GetMessages(destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server); + if (kUseMultipleProcesses) + return ipc_queue_->GetMessages(GetMessageQueue(destination)); + else + return GetThreadedQueue(destination)->DequeueAll(); } private: + IpcManager() { + if (kUseMultipleProcesses) { + ipc_queue_ = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes); + } + else { + threaded_queue_for_client_ = MakeUnique>>(); + threaded_queue_for_server_ = MakeUnique>>(); + } + } + template void RegisterId(IpcMessageQueue* t) { t->RegisterId(T::kIpcId, @@ -164,7 +192,7 @@ struct IpcManager { } }; - +IpcManager* IpcManager::instance_ = nullptr; @@ -987,7 +1015,6 @@ void IndexMain( void QueryDbMainLoop( QueryDatabase* db, - IpcManager* language_client, Index_DoIndexQueue* queue_do_index, Index_DoIdMapQueue* queue_do_id_map, Index_OnIdMappedQueue* queue_on_id_mapped, @@ -995,10 +1022,11 @@ void QueryDbMainLoop( Project* project, WorkingFiles* working_files, CompletionManager* completion_manager) { + IpcManager* ipc = IpcManager::instance(); - std::vector> messages = language_client->GetMessages(IpcManager::Destination::Server); + std::vector> messages = ipc->GetMessages(IpcManager::Destination::Server); for (auto& message : messages) { - //std::cerr << "[querydb] Processing message " << static_cast(message->method_id) << std::endl; + std::cerr << "[querydb] Processing message " << IpcIdToString(message->method_id) << std::endl; switch (message->method_id) { case IpcId::Quit: { @@ -1008,8 +1036,8 @@ void QueryDbMainLoop( } case IpcId::IsAlive: { - Ipc_IsAlive response; - language_client->SendMessageWithId(IpcManager::Destination::Client, response.method_id, response); + std::cerr << "[querydb] Sending IsAlive response to client" << std::endl; + ipc->SendMessage(IpcManager::Destination::Client, MakeUnique()); break; } @@ -1051,7 +1079,7 @@ void QueryDbMainLoop( } case IpcId::TextDocumentDidClose: { auto msg = static_cast(message.get()); - std::cerr << "Closing " << msg->params.textDocument.uri.GetPath() << std::endl; + //std::cerr << "Closing " << msg->params.textDocument.uri.GetPath() << std::endl; working_files->OnClose(msg->params); break; } @@ -1096,7 +1124,7 @@ void QueryDbMainLoop( } response.Write(std::cerr); - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1174,7 +1202,7 @@ void QueryDbMainLoop( break; } - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1210,7 +1238,7 @@ void QueryDbMainLoop( break; } - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1236,7 +1264,7 @@ void QueryDbMainLoop( break; } - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1273,7 +1301,7 @@ void QueryDbMainLoop( break; } - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1299,7 +1327,7 @@ void QueryDbMainLoop( response.result.push_back(info); } - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1393,7 +1421,7 @@ void QueryDbMainLoop( }; } - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } @@ -1433,13 +1461,12 @@ void QueryDbMainLoop( } std::cerr << "- Found " << response.result.size() << " results for query " << query << std::endl; - language_client->SendOutMessageToClient(response); + ipc->SendOutMessageToClient(response); break; } default: { - std::cerr << "1 Unhandled IPC message with kind " - << static_cast(message->method_id) << std::endl; + std::cerr << "[querydb] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl; exit(1); } } @@ -1485,7 +1512,6 @@ void QueryDbMain() { //std::cerr << "Running QueryDb" << std::endl; // Create queues. - IpcManager ipc; Index_DoIndexQueue queue_do_index; Index_DoIdMapQueue queue_do_id_map; Index_OnIdMappedQueue queue_on_id_mapped; @@ -1506,7 +1532,7 @@ void QueryDbMain() { // Run query db main loop. QueryDatabase db; while (true) { - QueryDbMainLoop(&db, &ipc, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager); + QueryDbMainLoop(&db, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } @@ -1581,7 +1607,9 @@ void QueryDbMain() { // blocks. // // |ipc| is connected to a server. -void LanguageServerStdinLoop(IpcManager* ipc) { +void LanguageServerStdinLoop() { + IpcManager* ipc = IpcManager::instance(); + while (true) { std::unique_ptr message = MessageRegistry::instance()->ReadMessageFromStdin(); @@ -1589,8 +1617,7 @@ void LanguageServerStdinLoop(IpcManager* ipc) { if (!message) continue; - //std::cerr << "[info]: Got message of type " - // << IpcIdToString(message->method_id) << std::endl; + std::cerr << "[stdin]: Got message \"" << IpcIdToString(message->method_id) << '"' << std::endl; switch (message->method_id) { // TODO: For simplicitly lets just proxy the initialize request like // all other requests so that stdin loop thread becomes super simple. @@ -1601,9 +1628,9 @@ void LanguageServerStdinLoop(IpcManager* ipc) { std::cerr << "Initialize in directory " << project_path << " with uri " << request->params.rootUri->raw_uri << std::endl; - Ipc_OpenProject open_project; - open_project.project_path = project_path; - ipc->SendMessageWithId(IpcManager::Destination::Server, Ipc_OpenProject::kIpcId, open_project); + auto open_project = MakeUnique(); + open_project->project_path = project_path; + ipc->SendMessage(IpcManager::Destination::Server, std::move(open_project)); } // TODO: query request->params.capabilities.textDocument and support only things @@ -1664,42 +1691,43 @@ void LanguageServerStdinLoop(IpcManager* ipc) { case IpcId::TextDocumentDocumentSymbol: case IpcId::TextDocumentCodeLens: case IpcId::WorkspaceSymbol: { - //std::cerr << "Sending message " << (int)message->method_id << std::endl; - ipc->SendMessageWithId(IpcManager::Destination::Server, message->method_id, *message.get()); + ipc->SendMessage(IpcManager::Destination::Server, std::move(message)); break; } default: { - std::cerr << "3 Unhandled IPC message with kind " - << static_cast(message->method_id) << std::endl; + std::cerr << "[stdin] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl; exit(1); } } } } -void LanguageServerMainLoop(IpcManager* ipc) { +void LanguageServerMainLoop() { + IpcManager* ipc = IpcManager::instance(); + std::vector> messages = ipc->GetMessages(IpcManager::Destination::Client); for (auto& message : messages) { + std::cerr << "[server] Processing message " << IpcIdToString(message->method_id) << std::endl; + switch (message->method_id) { - case IpcId::Quit: { - std::cerr << "Got quit message (exiting)" << std::endl; - exit(0); - break; - } + case IpcId::Quit: { + std::cerr << "[server] Got quit message (exiting)" << std::endl; + exit(0); + break; + } - case IpcId::Cout: { - auto msg = static_cast(message.get()); - std::cout << msg->content; - std::cout.flush(); - break; - } + case IpcId::Cout: { + auto msg = static_cast(message.get()); + std::cout << msg->content; + std::cout.flush(); + break; + } - default: { - std::cerr << "2 Unhandled IPC message with kind " - << static_cast(message->method_id) << std::endl; - exit(1); - } + default: { + std::cerr << "[server] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl; + exit(1); + } } } } @@ -1751,10 +1779,19 @@ void LanguageServerMainLoop(IpcManager* ipc) { -bool IsQueryDbProcessRunning(IpcManager* ipc) { +bool IsQueryDbProcessRunning() { + if (!kUseMultipleProcesses) + return false; + + IpcManager* ipc = IpcManager::instance(); + + // Discard any left-over messages from previous runs. + if (kUseMultipleProcesses) + ipc->GetMessages(IpcManager::Destination::Client); + // Emit an alive check. Sleep so the server has time to respond. - Ipc_IsAlive check_alive; - ipc->SendMessage(IpcManager::Destination::Server, check_alive); + std::cerr << "[setup] Sending IsAlive request to server" << std::endl; + ipc->SendMessage(IpcManager::Destination::Server, MakeUnique()); // TODO: Tune this value or make it configurable. std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -1766,16 +1803,14 @@ bool IsQueryDbProcessRunning(IpcManager* ipc) { return true; } + // No response back. Clear out server messages so server doesn't respond to stale request. + ipc->GetMessages(IpcManager::Destination::Server); + return false; } -void LanguageServerMain(std::string process_name) { - IpcManager ipc; - - // Discard any left-over messages from previous runs. - ipc.GetMessages(IpcManager::Destination::Client); - - bool has_server = IsQueryDbProcessRunning(&ipc); +void LanguageServerMain() { + bool has_server = IsQueryDbProcessRunning(); // No server is running. Start it in-process. If the user wants to run the // server out of process they have to start it themselves. @@ -1784,9 +1819,9 @@ void LanguageServerMain(std::string process_name) { } // Run language client. - new std::thread(&LanguageServerStdinLoop, &ipc); + new std::thread(&LanguageServerStdinLoop); while (true) { - LanguageServerMainLoop(&ipc); + LanguageServerMainLoop(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } @@ -1900,7 +1935,7 @@ int main(int argc, char** argv) { } else if (HasOption(options, "--language-server")) { //std::cerr << "Running language server" << std::endl; - LanguageServerMain(argv[0]); + LanguageServerMain(); return 0; } else if (HasOption(options, "--querydb")) { @@ -1910,7 +1945,7 @@ int main(int argc, char** argv) { } else { //std::cerr << "Running language server" << std::endl; - LanguageServerMain(argv[0]); + LanguageServerMain(); return 0; } diff --git a/src/ipc.cc b/src/ipc.cc index d4007193..5d455490 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -38,8 +38,18 @@ const char* IpcIdToString(IpcId id) { return "codeLens/resolve"; case IpcId::WorkspaceSymbol: return "workspace/symbol"; + + + case IpcId::Quit: + return "$quit"; + case IpcId::IsAlive: + return "$isAlive"; + case IpcId::OpenProject: + return "$openProject"; + case IpcId::Cout: + return "$cout"; default: - assert(false); + assert(false && "missing IpcId string name"); exit(1); } } diff --git a/src/platform_win.cc b/src/platform_win.cc index 9d5a28e4..be6ceea7 100644 --- a/src/platform_win.cc +++ b/src/platform_win.cc @@ -41,7 +41,7 @@ struct PlatformMutexWin : public PlatformMutex { HANDLE raw_mutex = INVALID_HANDLE_VALUE; PlatformMutexWin(const std::string& name) { - //std::cerr << "[win] Creating mutex with name " << name << std::endl; + std::cerr << "[win] Creating mutex with name " << name << std::endl; raw_mutex = CreateMutex(nullptr, false /*initial_owner*/, name.c_str()); CheckForError({ ERROR_ALREADY_EXISTS }); } @@ -72,8 +72,8 @@ struct PlatformSharedMemoryWin : public PlatformSharedMemory { HANDLE shmem_; PlatformSharedMemoryWin(const std::string& name, size_t capacity) { - //std::cerr << "[win] Creating shared memory with name " << name - // << " and capacity " << capacity << std::endl; + std::cerr << "[win] Creating shared memory with name " << name + << " and capacity " << capacity << std::endl; this->name = name; shmem_ = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, diff --git a/src/threaded_queue.h b/src/threaded_queue.h index c8f412fd..ca0985ea 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -21,6 +21,19 @@ public: cv_.notify_one(); } + // Return all elements in the queue. + std::vector DequeueAll() { + std::lock_guard lock(mutex_); + + std::vector result; + result.reserve(queue_.size()); + while (!queue_.empty()) { + result.emplace_back(std::move(queue_.front())); + queue_.pop(); + } + return result; + } + // Get the "front"-element. // If the queue is empty, wait untill an element is avaiable. T Dequeue() { @@ -38,7 +51,7 @@ public: // Get the first element from the queue without blocking. Returns a null // value if the queue is empty. optional TryDequeue() { - std::unique_lock lock(mutex_); + std::lock_guard lock(mutex_); if (queue_.empty()) return nullopt;