From 88f2a3541a0bcb551da8c00cf80bba29f3245467 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Thu, 2 Mar 2017 01:28:07 -0800 Subject: [PATCH] cleanup ipc, basic lang client <-> indexer communication --- command_line.cc | 97 ++++++++++- ipc.cc | 153 +++++++++++++++++ ipc.h | 96 +++++++++++ platform.h | 23 +++ platform_win.cc | 71 ++++++++ shared_memory_win.cpp | 374 +----------------------------------------- 6 files changed, 446 insertions(+), 368 deletions(-) create mode 100644 ipc.cc create mode 100644 ipc.h create mode 100644 platform.h create mode 100644 platform_win.cc diff --git a/command_line.cc b/command_line.cc index 8786ae99..911ff409 100644 --- a/command_line.cc +++ b/command_line.cc @@ -71,7 +71,102 @@ bool HasOption(const std::unordered_map& options, cons return options.find(option) != options.end(); } -int main5555(int argc, char** argv) { +/* + +// Connects to a running --project-directory instance. Forks +// and creates it if not running. +// +// Implements language server spec. +indexer.exe --language-server + +// Holds the runtime db that the --language-server instance +// runs queries against. +indexer.exe --project-directory /work2/chrome/src + +// Created from the --project-directory (server) instance +indexer.exe --index-file /work2/chrome/src/chrome/foo.cc + +// Configuration data is read from a JSON file. +{ + "max_threads": 40, + "cache_directory": "/work/indexer_cache/" + +} +*/ + +#include "ipc.h" + +void IndexerServerMain() { + IpcMessageQueue to_server("indexer_to_server"); + IpcMessageQueue to_client("indexer_to_language_client"); + + while (true) { + std::vector> messages = to_server.PopMessage(); + + std::cout << "Server has " << messages.size() << " messages" << std::endl; + for (auto& message : messages) { + switch (message->kind) { + case JsonMessage::Kind::IsAlive: + { + IpcMessage_IsAlive response; + to_client.PushMessage(&response); + break; + } + default: + std::cerr << "Unhandled IPC message with kind " << static_cast(message->kind) << std::endl; + exit(1); + break; + } + } + + using namespace std::chrono_literals; + std::this_thread::sleep_for(20ms); + } +} + +void LanguageServerMain() { + // TODO: Encapsulate this pattern (ie, we generally want bi-directional channel/queue) + // TODO: Rename IpcMessageQueue to IpcDirectionalChannel + // - As per above, introduce wrapper IpcBidirectionalChannel that has two IpcDirectionalChannel instances + + IpcMessageQueue to_server("indexer_to_server"); + IpcMessageQueue to_client("indexer_to_language_client"); + + // Discard any left-over messages from previous runs. + to_client.PopMessage(); + + // Emit an alive check. Sleep so the server has time to respond. + IpcMessage_IsAlive check_alive; + to_server.PushMessage(&check_alive); + using namespace std::chrono_literals; + std::this_thread::sleep_for(50ms); // TODO: Tune this value or make it configurable. + + // Check if we got an IsAlive message back. + std::vector> messages = to_client.PopMessage(); + bool has_server = false; + for (auto& message : messages) { + if (message->kind == JsonMessage::Kind::IsAlive) { + has_server = true; + break; + } + } + + // No server is running. Start it. + if (!has_server) { + std::cerr << "Unable to detect running indexer server" << std::endl; + exit(1); + } + + std::cout << "Found indexer server" << std::endl; +} + +int main(int argc, char** argv) { + if (argc == 1) + LanguageServerMain(); + else + IndexerServerMain(); + return 0; + std::unordered_map options = ParseOptions(argc, argv); if (argc == 1 || options.find("--help") != options.end()) { diff --git a/ipc.cc b/ipc.cc new file mode 100644 index 00000000..a6d5f8fc --- /dev/null +++ b/ipc.cc @@ -0,0 +1,153 @@ +#include "ipc.h" + +namespace { + JsonMessage* as_message(char* ptr) { + return reinterpret_cast(ptr); + } +} + +const char* JsonMessage::payload() { + return reinterpret_cast(this) + sizeof(JsonMessage); +} + +void JsonMessage::SetPayload(size_t payload_size, const char* payload) { + char* payload_dest = reinterpret_cast(this) + sizeof(JsonMessage); + this->payload_size = payload_size; + memcpy(payload_dest, payload, payload_size); +} + +IpcMessage_IsAlive::IpcMessage_IsAlive() { + kind = JsonMessage::Kind::IsAlive; +} + +void IpcMessage_IsAlive::Serialize(Writer& writer) {} + +void IpcMessage_IsAlive::Deserialize(Reader& reader) {} + +IpcMessage_ImportIndex::IpcMessage_ImportIndex() { + kind = JsonMessage::Kind::ImportIndex; +} + +void IpcMessage_ImportIndex::Serialize(Writer& writer) { + writer.StartObject(); + ::Serialize(writer, "path", path); + writer.EndObject(); +} +void IpcMessage_ImportIndex::Deserialize(Reader& reader) { + ::Deserialize(reader, "path", path); +} + +IpcMessage_CreateIndex::IpcMessage_CreateIndex() { + kind = JsonMessage::Kind::CreateIndex; +} + +void IpcMessage_CreateIndex::Serialize(Writer& writer) { + writer.StartObject(); + ::Serialize(writer, "path", path); + ::Serialize(writer, "args", args); + writer.EndObject(); +} +void IpcMessage_CreateIndex::Deserialize(Reader& reader) { + ::Deserialize(reader, "path", path); + ::Deserialize(reader, "args", args); +} + +IpcMessageQueue::IpcMessageQueue(const std::string& name) { + local_block = new char[shmem_size]; + shared = CreatePlatformSharedMemory(name + "_memory"); + mutex = CreatePlatformMutex(name + "_mutex"); +} + +IpcMessageQueue::~IpcMessageQueue() { + delete[] local_block; +} + +void IpcMessageQueue::PushMessage(BaseIpcMessage* message) { + rapidjson::StringBuffer output; + rapidjson::PrettyWriter writer(output); + writer.SetFormatOptions( + rapidjson::PrettyFormatOptions::kFormatSingleLineArray); + writer.SetIndent(' ', 2); + message->Serialize(writer); + + size_t payload_size = strlen(output.GetString()); + assert(payload_size < shmem_size && "Increase shared memory size, payload will never fit"); + + bool first = true; + bool did_log = false; + while (true) { + using namespace std::chrono_literals; + if (!first) { + if (!did_log) { + std::cout << "[info]: shmem full, waiting" << std::endl; // TODO: remove + did_log = true; + } + std::this_thread::sleep_for(16ms); + } + first = false; + + std::unique_ptr lock = CreatePlatformScopedMutexLock(mutex.get()); + + // Try again later when there is room in shared memory. + if ((*shared->shared_bytes_used + sizeof(JsonMessage) + payload_size) >= shmem_size) + continue; + + get_free_message()->kind = message->kind; + get_free_message()->SetPayload(payload_size, output.GetString()); + + *shared->shared_bytes_used += sizeof(JsonMessage) + get_free_message()->payload_size; + assert(*shared->shared_bytes_used < shmem_size); + get_free_message()->kind = JsonMessage::Kind::Invalid; + break; + } + +} + +std::vector> IpcMessageQueue::PopMessage() { + size_t remaining_bytes = 0; + // Move data from shared memory into a local buffer. Do this + // before parsing the blocks so that other processes can begin + // posting data as soon as possible. + { + std::unique_ptr lock = CreatePlatformScopedMutexLock(mutex.get()); + remaining_bytes = *shared->shared_bytes_used; + + memcpy(local_block, shared->shared_start, *shared->shared_bytes_used); + *shared->shared_bytes_used = 0; + get_free_message()->kind = JsonMessage::Kind::Invalid; + } + + std::vector> result; + + char* message = local_block; + while (remaining_bytes > 0) { + std::unique_ptr base_message; + switch (as_message(message)->kind) { + case JsonMessage::Kind::IsAlive: + base_message = std::make_unique(); + break; + case JsonMessage::Kind::CreateIndex: + base_message = std::make_unique(); + break; + case JsonMessage::Kind::ImportIndex: + base_message = std::make_unique(); + break; + default: + assert(false); + } + + rapidjson::Document document; + document.Parse(as_message(message)->payload(), as_message(message)->payload_size); + bool has_error = document.HasParseError(); + auto error = document.GetParseError(); + + base_message->Deserialize(document); + + result.emplace_back(std::move(base_message)); + + remaining_bytes -= sizeof(JsonMessage) + as_message(message)->payload_size; + message = message + sizeof(JsonMessage) + as_message(message)->payload_size; + } + + return result; +} \ No newline at end of file diff --git a/ipc.h b/ipc.h new file mode 100644 index 00000000..f5d0787a --- /dev/null +++ b/ipc.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "platform.h" +#include "serializer.h" + +using Writer = rapidjson::PrettyWriter; +using Reader = rapidjson::Document; + + +// Messages are funky objects. They contain potentially variable amounts of +// data and are passed between processes. This means that they need to be +// fully relocatable, ie, it is possible to memmove them in memory to a +// completely different address. + +struct JsonMessage { + enum class Kind { + Invalid, + IsAlive, + CreateIndex, + ImportIndex, + }; + + Kind kind; + size_t payload_size; + + const char* payload(); + void SetPayload(size_t payload_size, const char* payload); +}; + +struct BaseIpcMessage { + JsonMessage::Kind kind; + virtual ~BaseIpcMessage() {} + + virtual void Serialize(Writer& writer) = 0; + virtual void Deserialize(Reader& reader) = 0; +}; + +struct IpcMessage_IsAlive : public BaseIpcMessage { + IpcMessage_IsAlive(); + + // BaseIpcMessage: + void Serialize(Writer& writer) override; + void Deserialize(Reader& reader) override; +}; + +struct IpcMessage_ImportIndex : public BaseIpcMessage { + std::string path; + + IpcMessage_ImportIndex(); + + // BaseMessage: + void Serialize(Writer& writer) override; + void Deserialize(Reader& reader) override; +}; + +struct IpcMessage_CreateIndex : public BaseIpcMessage { + std::string path; + std::vector args; + + IpcMessage_CreateIndex(); + + // BaseMessage: + void Serialize(Writer& writer) override; + void Deserialize(Reader& reader) override; +}; + +struct IpcMessageQueue { + // NOTE: We keep all pointers in terms of char* so pointer arithmetic is + // always relative to bytes. + + explicit IpcMessageQueue(const std::string& name); + ~IpcMessageQueue(); + + void PushMessage(BaseIpcMessage* message); + std::vector> PopMessage(); + +private: + JsonMessage* get_free_message() { + return reinterpret_cast(shared->shared_start + *shared->shared_bytes_used); + } + + // Pointer to process shared memory and process shared mutex. + std::unique_ptr shared; + std::unique_ptr mutex; + + // Pointer to process-local memory. + char* local_block; +}; \ No newline at end of file diff --git a/platform.h b/platform.h new file mode 100644 index 00000000..f8bc4187 --- /dev/null +++ b/platform.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include + +struct PlatformMutex { + virtual ~PlatformMutex() {} +}; +struct PlatformScopedMutexLock { + virtual ~PlatformScopedMutexLock() {} +}; +struct PlatformSharedMemory { + virtual ~PlatformSharedMemory() {} + + size_t* shared_bytes_used; + char* shared_start; +}; + +const int shmem_size = 1024; // number of chars/bytes (256kb) + +std::unique_ptr CreatePlatformMutex(const std::string& name); +std::unique_ptr CreatePlatformScopedMutexLock(PlatformMutex* mutex); +std::unique_ptr CreatePlatformSharedMemory(const std::string& name); diff --git a/platform_win.cc b/platform_win.cc new file mode 100644 index 00000000..a88f871f --- /dev/null +++ b/platform_win.cc @@ -0,0 +1,71 @@ +#include "platform.h" + +#include +#include +#include + +struct PlatformMutexWin : public PlatformMutex { + HANDLE raw_mutex = INVALID_HANDLE_VALUE; + + PlatformMutexWin(const std::string& name) { + raw_mutex = CreateMutex(nullptr, false /*initial_owner*/, name.c_str()); + assert(GetLastError() != ERROR_INVALID_HANDLE); + } + + ~PlatformMutexWin() override { + ReleaseMutex(raw_mutex); + raw_mutex = INVALID_HANDLE_VALUE; + } +}; + +struct PlatformScopedMutexLockWin : public PlatformScopedMutexLock { + HANDLE raw_mutex; + + PlatformScopedMutexLockWin(HANDLE raw_mutex) : raw_mutex(raw_mutex) { + WaitForSingleObject(raw_mutex, INFINITE); + } + + ~PlatformScopedMutexLockWin() override { + ReleaseMutex(raw_mutex); + } +}; + +struct PlatformSharedMemoryWin : public PlatformSharedMemory { + HANDLE shmem_; + void* shared_start_real_; + + PlatformSharedMemoryWin(const std::string& name) { + shmem_ = CreateFileMapping( + INVALID_HANDLE_VALUE, + NULL, + PAGE_READWRITE, + 0, + shmem_size, + name.c_str() + ); + + shared_start_real_ = MapViewOfFile(shmem_, FILE_MAP_ALL_ACCESS, 0, 0, shmem_size); + + shared_bytes_used = reinterpret_cast(shared_start_real_); + *shared_bytes_used = 0; + shared_start = reinterpret_cast(shared_bytes_used + 1); + } + + ~PlatformSharedMemoryWin() override { + UnmapViewOfFile(shared_start_real_); + } +}; + + + +std::unique_ptr CreatePlatformMutex(const std::string& name) { + return std::make_unique(name); +} + +std::unique_ptr CreatePlatformScopedMutexLock(PlatformMutex* mutex) { + return std::make_unique(static_cast(mutex)->raw_mutex); +} + +std::unique_ptr CreatePlatformSharedMemory(const std::string& name) { + return std::make_unique(name); +} \ No newline at end of file diff --git a/shared_memory_win.cpp b/shared_memory_win.cpp index ce7368c2..db8b1f23 100644 --- a/shared_memory_win.cpp +++ b/shared_memory_win.cpp @@ -1,379 +1,19 @@ #include #include #include -#include -#include -#include -#include +#include "ipc.h" -#include -#include -#include "serializer.h" -using Writer = rapidjson::PrettyWriter; -using Reader = rapidjson::Document; - -struct ProcessMutex { - HANDLE mutex_ = INVALID_HANDLE_VALUE; - - ProcessMutex() { - mutex_ = ::CreateMutex(nullptr, false /*initial_owner*/, "indexer_shmem_mutex"); - assert(GetLastError() != ERROR_INVALID_HANDLE); - } - - ~ProcessMutex() { - ::ReleaseMutex(mutex_); - mutex_ = INVALID_HANDLE_VALUE; - } -}; - -struct ScopedProcessLock { - HANDLE mutex_; - - ScopedProcessLock(ProcessMutex* mutex) : mutex_(mutex->mutex_) { - WaitForSingleObject(mutex_, INFINITE); - } - - ~ScopedProcessLock() { - ::ReleaseMutex(mutex_); - } -}; - -// Messages are funky objects. They contain potentially variable amounts of -// data and are passed between processes. This means that they need to be -// fully relocatable, ie, it is possible to memmove them in memory to a -// completely different address. - -// TODO: Let's just pipe JSON. - -struct JsonMessage { - enum class Kind { - Invalid, - CreateIndex, - ImportIndex - }; - - Kind kind; - size_t payload_size; - - const char* payload() { - return reinterpret_cast(this) + sizeof(JsonMessage); - } - void set_payload(size_t payload_size, const char* payload) { - char* payload_dest = reinterpret_cast(this) + sizeof(JsonMessage); - this->payload_size = payload_size; - memcpy(payload_dest, payload, payload_size); - } -}; - -struct BaseMessage { - JsonMessage::Kind kind; - - virtual void Serialize(Writer& writer) = 0; - virtual void Deserialize(Reader& reader) = 0; -}; - - - -struct Message_ImportIndex : public BaseMessage { - std::string path; - - Message_ImportIndex() { - kind = JsonMessage::Kind::ImportIndex; - } - - // BaseMessage: - void Serialize(Writer& writer) override { - writer.StartObject(); - ::Serialize(writer, "path", path); - writer.EndObject(); - } - void Deserialize(Reader& reader) override { - ::Deserialize(reader, "path", path); - } -}; - - - -struct Message_CreateIndex : public BaseMessage { - std::string path; - std::vector args; - - Message_CreateIndex() { - kind = JsonMessage::Kind::CreateIndex; - } - - // BaseMessage: - void Serialize(Writer& writer) override { - writer.StartObject(); - ::Serialize(writer, "path", path); - ::Serialize(writer, "args", args); - writer.EndObject(); - } - void Deserialize(Reader& reader) override { - ::Deserialize(reader, "path", path); - ::Deserialize(reader, "args", args); - } -}; - - -const int shmem_size = 1024; // number of chars/bytes (256kb) - -struct PlatformSharedMemory { - HANDLE shmem_; - void* shared_start_real_; - - - size_t* shared_bytes_used; - char* shared_start; - - - PlatformSharedMemory() { - shmem_ = ::CreateFileMapping( - INVALID_HANDLE_VALUE, - NULL, - PAGE_READWRITE, - 0, - shmem_size, - "shared_memory_name" - ); - - shared_start_real_ = MapViewOfFile(shmem_, FILE_MAP_ALL_ACCESS, 0, 0, shmem_size); - - shared_bytes_used = reinterpret_cast(shared_start_real_); - *shared_bytes_used = 0; - shared_start = reinterpret_cast(shared_bytes_used + 1); - } - - ~PlatformSharedMemory() { - ::UnmapViewOfFile(shared_start_real_); - } -}; - -struct MessageMemoryBlock { - JsonMessage* ToMessage(char* ptr) { - return reinterpret_cast(ptr); - } - JsonMessage* get_free_message() { - return reinterpret_cast(shared.shared_start + *shared.shared_bytes_used); - } - - // NOTE: We keep all pointers in terms of char* so pointer arithmetic is - // always relative to bytes. - - // Pointers to shared memory. - PlatformSharedMemory shared; - - ProcessMutex mutex; - - char* local_block; - - MessageMemoryBlock() { - local_block = new char[shmem_size]; - } - ~MessageMemoryBlock() { - delete[] local_block; - } - - - - void PushMessage(BaseMessage* message) { - rapidjson::StringBuffer output; - rapidjson::PrettyWriter writer(output); - writer.SetFormatOptions( - rapidjson::PrettyFormatOptions::kFormatSingleLineArray); - writer.SetIndent(' ', 2); - message->Serialize(writer); - - size_t payload_size = strlen(output.GetString()); - assert(payload_size < shmem_size && "Increase shared memory size, payload will never fit"); - - bool first = true; - bool did_log = false; - while (true) { - using namespace std::chrono_literals; - if (!first) { - if (!did_log) { - std::cout << "[info]: shmem full, waiting" << std::endl; // TODO: remove - did_log = true; - } - std::this_thread::sleep_for(16ms); - } - first = false; - - ScopedProcessLock lock(&mutex); - - // Try again later when there is room in shared memory. - if ((*shared.shared_bytes_used + sizeof(JsonMessage) + payload_size) >= shmem_size) - continue; - - get_free_message()->kind = message->kind; - get_free_message()->set_payload(payload_size, output.GetString()); - - *shared.shared_bytes_used += sizeof(JsonMessage) + get_free_message()->payload_size; - assert(*shared.shared_bytes_used < shmem_size); - get_free_message()->kind = JsonMessage::Kind::Invalid; - break; - } - - } - - - - std::vector> PopMessage() { - size_t remaining_bytes = 0; - // Move data from shared memory into a local buffer. Do this - // before parsing the blocks so that other processes can begin - // posting data as soon as possible. - { - ScopedProcessLock lock(&mutex); - remaining_bytes = *shared.shared_bytes_used; - - memcpy(local_block, shared.shared_start, *shared.shared_bytes_used); - *shared.shared_bytes_used = 0; - get_free_message()->kind = JsonMessage::Kind::Invalid; - } - - std::vector> result; - - char* message = local_block; - while (remaining_bytes > 0) { - std::unique_ptr base_message; - switch (ToMessage(message)->kind) { - case JsonMessage::Kind::CreateIndex: - base_message = std::make_unique(); - break; - case JsonMessage::Kind::ImportIndex: - base_message = std::make_unique(); - break; - default: - assert(false); - } - - rapidjson::Document document; - document.Parse(ToMessage(message)->payload(), ToMessage(message)->payload_size); - bool has_error = document.HasParseError(); - auto error = document.GetParseError(); - - base_message->Deserialize(document); - - result.emplace_back(std::move(base_message)); - - remaining_bytes -= sizeof(JsonMessage) + ToMessage(message)->payload_size; - message = message + sizeof(JsonMessage) + ToMessage(message)->payload_size; - } - - return result; - } -}; - - - - - - - - - - - - - -void reader() { - HANDLE shmem = INVALID_HANDLE_VALUE; - HANDLE mutex = INVALID_HANDLE_VALUE; - - mutex = ::CreateMutex(NULL, FALSE, "mutex_sample_name"); - - shmem = ::CreateFileMapping( - INVALID_HANDLE_VALUE, - NULL, - PAGE_READWRITE, - 0, - shmem_size, - "shared_memory_name" - ); - - char *buf = (char*)MapViewOfFile(shmem, FILE_MAP_ALL_ACCESS, 0, 0, shmem_size); - - - for (unsigned int c = 0; c < 60; ++c) { - // mutex lock - WaitForSingleObject(mutex, INFINITE); - - int value = buf[0]; - std::cout << "read shared memory...c=" << value << std::endl; - - // mutex unlock - ::ReleaseMutex(mutex); - - ::Sleep(1000); - } - - // release - ::UnmapViewOfFile(buf); - ::CloseHandle(shmem); - ::ReleaseMutex(mutex); -} - -void writer() { - HANDLE shmem = INVALID_HANDLE_VALUE; - HANDLE mutex = INVALID_HANDLE_VALUE; - - mutex = ::CreateMutex(NULL, FALSE, "mutex_sample_name"); - - shmem = ::CreateFileMapping( - INVALID_HANDLE_VALUE, - NULL, - PAGE_READWRITE, - 0, - shmem_size, - "shared_memory_name" - ); - - char *buf = (char*)::MapViewOfFile(shmem, FILE_MAP_ALL_ACCESS, 0, 0, shmem_size); - - for (unsigned int c = 0; c < 60; ++c) { - // mutex lock - WaitForSingleObject(mutex, INFINITE); - - // write shared memory - memset(buf, c, shmem_size); - - std::cout << "write shared memory...c=" << c << std::endl; - - // mutex unlock - ::ReleaseMutex(mutex); - - ::Sleep(1000); - } - - // release - ::UnmapViewOfFile(buf); - ::CloseHandle(shmem); - ::ReleaseMutex(mutex); -} - -int main52525252(int argc, char** argv) { - if (argc == 2) - writer(); - else - reader(); - - return 0; -} - - -int main(int argc, char** argv) { +int main525252(int argc, char** argv) { if (argc == 2) { - MessageMemoryBlock block; + IpcMessageQueue queue("myqueue"); int i = 0; while (true) { - Message_ImportIndex m; + IpcMessage_ImportIndex m; m.path = "foo #" + std::to_string(i); - block.PushMessage(&m); + queue.PushMessage(&m); std::cout << "Sent " << i << std::endl;; using namespace std::chrono_literals; @@ -384,10 +24,10 @@ int main(int argc, char** argv) { } else { - MessageMemoryBlock block; + IpcMessageQueue queue("myqueue"); while (true) { - std::vector> messages = block.PopMessage(); + std::vector> messages = queue.PopMessage(); std::cout << "Got " << messages.size() << " messages" << std::endl; for (auto& message : messages) {