From 94383d589b3fcb5786290c50a9f2e08402e0e562 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Thu, 2 Mar 2017 22:16:28 -0800 Subject: [PATCH] make ipc easier to use --- .vscode/settings.json | 18 ++++++++++++++++ command_line.cc | 20 ++++++----------- ipc.cc | 45 ++++++++++++++++++++++++++++++++++---- ipc.h | 31 +++++++++++++++++++++++---- shared_memory_win.cpp | 50 ------------------------------------------- 5 files changed, 93 insertions(+), 71 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 shared_memory_win.cpp diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..7d005628 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,18 @@ +{ + "files.associations": { + "*.script": "pascal", + "functional": "cpp", + "queue": "cpp", + "stack": "cpp", + "tuple": "cpp", + "xutility": "cpp", + "xlocale": "cpp" + }, + "files.exclude": { + "**/.git": true, + "**/.svn": true, + "**/.hg": true, + "**/.DS_Store": true, + "third_party/": true + } +} \ No newline at end of file diff --git a/command_line.cc b/command_line.cc index 911ff409..da7ad40e 100644 --- a/command_line.cc +++ b/command_line.cc @@ -97,11 +97,10 @@ indexer.exe --index-file /work2/chrome/src/chrome/foo.cc #include "ipc.h" void IndexerServerMain() { - IpcMessageQueue to_server("indexer_to_server"); - IpcMessageQueue to_client("indexer_to_language_client"); + IpcServer ipc("language_server"); while (true) { - std::vector> messages = to_server.PopMessage(); + std::vector> messages = ipc.TakeMessages(); std::cout << "Server has " << messages.size() << " messages" << std::endl; for (auto& message : messages) { @@ -109,7 +108,7 @@ void IndexerServerMain() { case JsonMessage::Kind::IsAlive: { IpcMessage_IsAlive response; - to_client.PushMessage(&response); + ipc.SendToClient(0, &response); // todo: make non-blocking break; } default: @@ -125,24 +124,19 @@ void IndexerServerMain() { } void LanguageServerMain() { - // TODO: Encapsulate this pattern (ie, we generally want bi-directional channel/queue) - // TODO: Rename IpcMessageQueue to IpcDirectionalChannel - // - As per above, introduce wrapper IpcBidirectionalChannel that has two IpcDirectionalChannel instances - - IpcMessageQueue to_server("indexer_to_server"); - IpcMessageQueue to_client("indexer_to_language_client"); + IpcClient ipc("language_server", 0); // Discard any left-over messages from previous runs. - to_client.PopMessage(); + ipc.TakeMessages(); // Emit an alive check. Sleep so the server has time to respond. IpcMessage_IsAlive check_alive; - to_server.PushMessage(&check_alive); + ipc.SendToServer(&check_alive); using namespace std::chrono_literals; std::this_thread::sleep_for(50ms); // TODO: Tune this value or make it configurable. // Check if we got an IsAlive message back. - std::vector> messages = to_client.PopMessage(); + std::vector> messages = ipc.TakeMessages(); bool has_server = false; for (auto& message : messages) { if (message->kind == JsonMessage::Kind::IsAlive) { diff --git a/ipc.cc b/ipc.cc index a6d5f8fc..a61a82e3 100644 --- a/ipc.cc +++ b/ipc.cc @@ -4,6 +4,14 @@ namespace { JsonMessage* as_message(char* ptr) { return reinterpret_cast(ptr); } + + std::string NameToServerName(const std::string& name) { + return name + "_server"; + } + + std::string NameToClientName(const std::string& name, int client_id) { + return name + "_server_" + std::to_string(client_id); + } } const char* JsonMessage::payload() { @@ -52,17 +60,17 @@ void IpcMessage_CreateIndex::Deserialize(Reader& reader) { ::Deserialize(reader, "args", args); } -IpcMessageQueue::IpcMessageQueue(const std::string& name) { +IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) { local_block = new char[shmem_size]; shared = CreatePlatformSharedMemory(name + "_memory"); mutex = CreatePlatformMutex(name + "_mutex"); } -IpcMessageQueue::~IpcMessageQueue() { +IpcDirectionalChannel::~IpcDirectionalChannel() { delete[] local_block; } -void IpcMessageQueue::PushMessage(BaseIpcMessage* message) { +void IpcDirectionalChannel::PushMessage(BaseIpcMessage* message) { rapidjson::StringBuffer output; rapidjson::PrettyWriter writer(output); writer.SetFormatOptions( @@ -103,7 +111,7 @@ void IpcMessageQueue::PushMessage(BaseIpcMessage* message) { } -std::vector> IpcMessageQueue::PopMessage() { +std::vector> IpcDirectionalChannel::TakeMessages() { size_t remaining_bytes = 0; // Move data from shared memory into a local buffer. Do this // before parsing the blocks so that other processes can begin @@ -150,4 +158,33 @@ std::vector> IpcMessageQueue::PopMessage() { } return result; +} + + + +IpcServer::IpcServer(const std::string& name) + : name_(name), server_(NameToServerName(name)) {} + +void IpcServer::SendToClient(int client_id, BaseIpcMessage* message) { + // Find or create the client. + auto it = clients_.find(client_id); + if (it == clients_.end()) + clients_[client_id] = std::make_unique(NameToClientName(name_, client_id)); + + clients_[client_id]->PushMessage(message); +} + +std::vector> IpcServer::TakeMessages() { + return server_.TakeMessages(); +} + +IpcClient::IpcClient(const std::string& name, int client_id) + : server_(NameToServerName(name)), client_(NameToClientName(name, client_id)) {} + +void IpcClient::SendToServer(BaseIpcMessage* message) { + server_.PushMessage(message); +} + +std::vector> IpcClient::TakeMessages() { + return client_.TakeMessages(); } \ No newline at end of file diff --git a/ipc.h b/ipc.h index f5d0787a..eb1cf802 100644 --- a/ipc.h +++ b/ipc.h @@ -72,15 +72,15 @@ struct IpcMessage_CreateIndex : public BaseIpcMessage { void Deserialize(Reader& reader) override; }; -struct IpcMessageQueue { +struct IpcDirectionalChannel { // NOTE: We keep all pointers in terms of char* so pointer arithmetic is // always relative to bytes. - explicit IpcMessageQueue(const std::string& name); - ~IpcMessageQueue(); + explicit IpcDirectionalChannel(const std::string& name); + ~IpcDirectionalChannel(); void PushMessage(BaseIpcMessage* message); - std::vector> PopMessage(); + std::vector> TakeMessages(); private: JsonMessage* get_free_message() { @@ -93,4 +93,27 @@ private: // Pointer to process-local memory. char* local_block; +}; + +struct IpcServer { + IpcServer(const std::string& name); + + void SendToClient(int client_id, BaseIpcMessage* message); + std::vector> TakeMessages(); + +private: + std::string name_; + IpcDirectionalChannel server_; + std::unordered_map> clients_; +}; + +struct IpcClient { + IpcClient(const std::string& name, int client_id); + + void SendToServer(BaseIpcMessage* message); + std::vector> TakeMessages(); + +private: + IpcDirectionalChannel server_; + IpcDirectionalChannel client_; }; \ No newline at end of file diff --git a/shared_memory_win.cpp b/shared_memory_win.cpp deleted file mode 100644 index db8b1f23..00000000 --- a/shared_memory_win.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include -#include -#include - -#include "ipc.h" - - - -int main525252(int argc, char** argv) { - if (argc == 2) { - IpcMessageQueue queue("myqueue"); - int i = 0; - while (true) { - IpcMessage_ImportIndex m; - m.path = "foo #" + std::to_string(i); - queue.PushMessage(&m); - std::cout << "Sent " << i << std::endl;; - - using namespace std::chrono_literals; - std::this_thread::sleep_for(10ms); - - ++i; - } - } - - else { - IpcMessageQueue queue("myqueue"); - - while (true) { - std::vector> messages = queue.PopMessage(); - std::cout << "Got " << messages.size() << " messages" << std::endl; - - for (auto& message : messages) { - rapidjson::StringBuffer output; - rapidjson::PrettyWriter writer(output); - writer.SetFormatOptions( - rapidjson::PrettyFormatOptions::kFormatSingleLineArray); - writer.SetIndent(' ', 2); - message->Serialize(writer); - - std::cout << " kind=" << static_cast(message->kind) << ", json=" << output.GetString() << std::endl; - } - - using namespace std::chrono_literals; - std::this_thread::sleep_for(5s); - } - } - - return 0; -} \ No newline at end of file