From b7923b4abe7548f8ea6e294ce1bbd137844944fa Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Mon, 13 Mar 2017 23:30:41 -0700 Subject: [PATCH] partial messages work --- command_line.cc | 37 +++--- ipc.cc | 304 +++++++++++++++++++++++++++++------------------- ipc.h | 4 + platform.h | 2 +- 4 files changed, 211 insertions(+), 136 deletions(-) diff --git a/command_line.cc b/command_line.cc index 51ade8d5..42199a3c 100644 --- a/command_line.cc +++ b/command_line.cc @@ -663,27 +663,24 @@ void LanguageServerMain(std::string process_name) { std::thread stdio_reader(&LanguageServerStdinLoop, &client_ipc); - - // No server. Run it in-process. if (!has_server) { - - QueryableDatabase db; - IpcServer server_ipc("languageserver"); - - while (true) { - QueryDbMainLoop(&server_ipc, &db); - LanguageServerMainLoop(&client_ipc); - // TODO: use a condition variable. - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - } + // No server. Run it in-process. + new std::thread([&]() { + IpcServer server_ipc("languageserver"); + QueryableDatabase db; + while (true) { + QueryDbMainLoop(&server_ipc, &db); + // TODO: use a condition variable. + std::this_thread::sleep_for(std::chrono::microseconds(0)); + } + }); } - else { - while (true) { - LanguageServerMainLoop(&client_ipc); - // TODO: use a condition variable. - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - } + // Run language client. + while (true) { + LanguageServerMainLoop(&client_ipc); + // TODO: use a condition variable. + std::this_thread::sleep_for(std::chrono::microseconds(0)); } } @@ -734,6 +731,10 @@ void LanguageServerMain(std::string process_name) { int main(int argc, char** argv) { + bool loop = false; + while (loop) + std::this_thread::sleep_for(std::chrono::milliseconds(16)); + if (argc == 1) { RunTests(); return 0; diff --git a/ipc.cc b/ipc.cc index b52b783e..196db2d9 100644 --- a/ipc.cc +++ b/ipc.cc @@ -2,44 +2,9 @@ #include "serializer.h" namespace { - struct BufferBuilder { - void* memory; - size_t size; - size_t capacity; - - BufferBuilder() { - memory = malloc(128); - size = 0; - capacity = 128; - } - - ~BufferBuilder() { - free(memory); - size = 0; - capacity = 0; - } - - void AppendToBuffer(void* content, size_t content_size) { - if (size + content_size > capacity) { - // Grow memory if needed. - size_t new_size = capacity * 2; - while (new_size < size + content_size) - new_size *= 2; - void* new_memory = malloc(capacity); - memcpy(new_memory, memory, size); - free(memory); - memory = new_memory; - - // Append new content into memory. - memcpy((char*)memory + size, content, content_size); - size += content_size; - } - } - - void Reset() { - size = 0; - } - }; + // The absolute smallest partial payload we should send. This must be >0, ie, 1 is the + // minimum. Keep a reasonably high value so we don't send needlessly send tiny payloads. + const int kMinimumPartialPayloadSize = 128; // JSON-encoded message that is passed across shared memory. // @@ -49,14 +14,20 @@ namespace { // completely different address. struct JsonMessage { IpcId ipc_id; + int partial_message_id; + bool has_more_chunks; size_t payload_size; - - const char* payload() { - return reinterpret_cast(this) + sizeof(JsonMessage); + void* payload() { + return reinterpret_cast(this) + sizeof(JsonMessage); } - void SetPayload(size_t payload_size, const char* payload) { - char* payload_dest = reinterpret_cast(this) + sizeof(JsonMessage); + + void Setup(IpcId ipc_id, int partial_message_id, bool has_more_chunks, size_t payload_size, const char* payload) { + this->ipc_id = ipc_id; + this->partial_message_id = partial_message_id; + this->has_more_chunks = has_more_chunks; this->payload_size = payload_size; + + char* payload_dest = reinterpret_cast(this) + sizeof(JsonMessage); memcpy(payload_dest, payload, payload_size); } }; @@ -89,15 +60,10 @@ struct IpcDirectionalChannel::MessageBuffer { size_t real_buffer_size; template - T* Offset(size_t offset) { + T* Offset(size_t offset) const { return reinterpret_cast(static_cast(real_buffer) + offset); } - // Number of bytes available in buffer_start. Note that this - // is smaller than the total buffer size, since there is some - // metadata stored at the start of the buffer. - size_t buffer_size; - struct Metadata { // The number of bytes that are currently used in the buffer minus the // size of this Metadata struct. @@ -106,22 +72,26 @@ struct IpcDirectionalChannel::MessageBuffer { int num_outstanding_partial_messages = 0; }; - Metadata* metadata() { + Metadata* metadata() const { return Offset(0); } - JsonMessage* message_at_offset(size_t offset) { + size_t bytes_available() const { + return real_buffer_size - sizeof(Metadata) - metadata()->bytes_used; + } + + JsonMessage* message_at_offset(size_t offset) const { return Offset(sizeof(Metadata) + offset); } // First json message. - JsonMessage* first_message() { + JsonMessage* first_message() const { return message_at_offset(0); } // First free, writable json message. Make sure to increase *bytes_used() // by any written size. - JsonMessage* free_message() { + JsonMessage* free_message() const { return message_at_offset(metadata()->bytes_used); } @@ -129,26 +99,32 @@ struct IpcDirectionalChannel::MessageBuffer { void* buffer; size_t remaining_bytes; - Iterator(void* buffer, size_t remaining_bytes) : remaining_bytes(remaining_bytes) {} + Iterator(void* buffer, size_t remaining_bytes) : buffer(buffer), remaining_bytes(remaining_bytes) {} JsonMessage* get() const { assert(buffer); return reinterpret_cast(buffer); } + JsonMessage* operator*() const { + return get(); + } + JsonMessage* operator->() const { return get(); } - Iterator operator++() const { + void operator++() { size_t next_message_offset = sizeof(JsonMessage) + get()->payload_size; if (next_message_offset >= remaining_bytes) { assert(next_message_offset == remaining_bytes); - return Iterator(nullptr, 0); + buffer = nullptr; + remaining_bytes = 0; + return; } - auto* next_message = (char*)buffer + next_message_offset; - return Iterator(next_message, remaining_bytes - next_message_offset); + buffer = (char*)buffer + next_message_offset; + remaining_bytes -= next_message_offset; } bool operator==(const Iterator& other) const { @@ -159,19 +135,76 @@ struct IpcDirectionalChannel::MessageBuffer { } }; - Iterator begin() { + Iterator begin() const { + if (metadata()->bytes_used == 0) + return end(); + return Iterator(first_message(), metadata()->bytes_used); } - Iterator end() { + Iterator end() const { return Iterator(nullptr, 0); } }; +struct IpcDirectionalChannel::ResizableBuffer { + void* memory; + size_t size; + size_t capacity; + + ResizableBuffer() { + memory = malloc(128); + size = 0; + capacity = 128; + } + + ~ResizableBuffer() { + free(memory); + size = 0; + capacity = 0; + } + + void Append(void* content, size_t content_size) { + assert(capacity); + + // Grow memory if needed. + if ((size + content_size) >= capacity) { + size_t new_capacity = capacity * 2; + while (new_capacity < size + content_size) + new_capacity *= 2; + void* new_memory = malloc(new_capacity); + assert(size < capacity); + memcpy(new_memory, memory, size); + free(memory); + memory = new_memory; + capacity = new_capacity; + } + + // Append new content into memory. + memcpy((char*)memory + size, content, content_size); + size += content_size; + } + + void Reset() { + size = 0; + } +}; + +IpcDirectionalChannel::ResizableBuffer* IpcDirectionalChannel::CreateOrFindResizableBuffer(int id) { + auto it = resizable_buffers.find(id); + if (it != resizable_buffers.end()) + return it->second.get(); + return (resizable_buffers[id] = MakeUnique()).get(); +} + +void IpcDirectionalChannel::RemoveResizableBuffer(int id) { + resizable_buffers.erase(id); +} + IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) { shared = CreatePlatformSharedMemory(name + "memory"); mutex = CreatePlatformMutex(name + "mutex"); local = std::unique_ptr(new char[shmem_size]); - + // TODO: connecting a client will allocate reset shared state on the // buffer. We need to store if we "initialized". shared_buffer = MakeUnique(shared->shared, shmem_size); @@ -180,16 +213,37 @@ IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) { IpcDirectionalChannel::~IpcDirectionalChannel() {} -// TODO: -// We need to send partial payloads. But other payloads may appear in -// the middle of the payload. -// -// -// int partial_payload_id = 0 -// int num_uncompleted_payloads = 0 +enum class DispatchResult { + RunAgain, + Break +}; + +// Run |action| an arbitrary number of times. +void IpcDispatch(PlatformMutex* mutex, std::function action) { + bool first = true; + int log_iteration_count = 0; + int log_count = 0; + while (true) { + if (!first) { + if (log_iteration_count > 1000) { + log_iteration_count = 0; + std::cerr << "[info]: shmem full, waiting (" << log_count++ << ")" << std::endl; // TODO: remove + } + ++log_iteration_count; + std::this_thread::sleep_for(std::chrono::microseconds(0)); + } + first = false; + + std::unique_ptr lock = CreatePlatformScopedMutexLock(mutex); + if (action() == DispatchResult::RunAgain) + continue; + break; + } +} void IpcDirectionalChannel::PushMessage(IpcMessage* message) { assert(message->ipc_id != IpcId::Invalid); + assert(shmem_size > sizeof(JsonMessage) + kMinimumPartialPayloadSize); rapidjson::StringBuffer output; rapidjson::PrettyWriter writer(output); @@ -200,37 +254,61 @@ void IpcDirectionalChannel::PushMessage(IpcMessage* message) { //std::cerr << "Sending message with id " << message->runtime_id() << " (hash " << message->hashed_runtime_id() << ")" << std::endl; - size_t payload_size = strlen(output.GetString()); - assert(payload_size < shmem_size && "Increase shared memory size, payload will never fit"); - bool first = true; - bool did_log = false; - while (true) { - if (!first) { - if (!did_log) { - std::cerr << "[info]: shmem full, waiting" << std::endl; // TODO: remove - did_log = true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(16)); + size_t payload_size = output.GetSize(); + const char* payload = output.GetString(); + if (payload_size == 0) + return; + + int partial_message_id = 0; // TODO + + std::cerr << "Starting dispatch of payload with size " << payload_size << std::endl; + + IpcDispatch(mutex.get(), [&]() { + assert(payload_size > 0); + + // We cannot find the entire payload in the buffer. We + // have to send chunks of it over time. + if ((sizeof(JsonMessage) + payload_size) > shared_buffer->bytes_available()) { + if ((sizeof(JsonMessage) + kMinimumPartialPayloadSize) > shared_buffer->bytes_available()) + return DispatchResult::RunAgain; + + if (partial_message_id == 0) + partial_message_id = ++shared_buffer->metadata()->next_partial_message_id; // note: pre-increment so we 1 as initial value + + size_t sent_payload_size = shared_buffer->bytes_available() - sizeof(JsonMessage); + shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, true /*has_more_chunks*/, sent_payload_size, payload); + shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + sent_payload_size; + shared_buffer->free_message()->ipc_id = IpcId::Invalid; + std::cerr << "Sending partial message with payload_size=" << sent_payload_size << std::endl; + + // Prepare for next time. + payload_size -= sent_payload_size; + payload += sent_payload_size; + return DispatchResult::RunAgain; } - first = false; + // The entire payload fits. Send it all now. + else { + // Include partial message id, as there could have been previous parts of this payload. + shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, false /*has_more_chunks*/, payload_size, payload); + shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + payload_size; + shared_buffer->free_message()->ipc_id = IpcId::Invalid; + std::cerr << "Sending full message with payload_size=" << payload_size << std::endl; - std::unique_ptr lock = CreatePlatformScopedMutexLock(mutex.get()); + return DispatchResult::Break; + } + }); +} - // Try again later when there is room in shared memory. - if ((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata) + sizeof(JsonMessage) + payload_size) >= shmem_size) - continue; - - shared_buffer->free_message()->ipc_id = message->ipc_id; - shared_buffer->free_message()->SetPayload(payload_size, output.GetString()); - - shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + shared_buffer->free_message()->payload_size; - assert((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata)) < shmem_size); - assert(shared_buffer->metadata()->bytes_used >= 0); - shared_buffer->free_message()->ipc_id = IpcId::Invalid; - break; - } +void AddIpcMessageFromJsonMessage(std::vector>& result, IpcId ipc_id, void* payload, size_t payload_size) { + rapidjson::Document document; + document.Parse(reinterpret_cast(payload), payload_size); + bool has_error = document.HasParseError(); + auto error = document.GetParseError(); + std::unique_ptr base_message = IpcRegistry::instance()->Allocate(ipc_id); + base_message->Deserialize(document); + result.emplace_back(std::move(base_message)); } std::vector> IpcDirectionalChannel::TakeMessages() { @@ -250,31 +328,23 @@ std::vector> IpcDirectionalChannel::TakeMessages() { std::vector> result; - // TODO - for (auto it = local_buffer->begin(); it != local_buffer->end(); ++it) { - // TODO: partial payload, maybe something like this: - // - // if (it->partial_id != 0) { - // auto* buf = CreateOrFindResizableBuffer(it->partial_id); - // buf->Append(it->payload(), it->payload_size()); - // if (it->is_complete) { - // Process(buf.payload(), buff.payload_size()) - // RemoveResizableBuffer(it->partial_id) - // } - // } - // else { - // Process(it->payload(), it->payload_size()) - // } - // - rapidjson::Document document; - document.Parse(it->payload(), it->payload_size); - bool has_error = document.HasParseError(); - auto error = document.GetParseError(); + for (JsonMessage* message : *local_buffer) { + std::cerr << "Got message with payload_size=" << message->payload_size << std::endl; - std::unique_ptr base_message = IpcRegistry::instance()->Allocate(it->ipc_id); - base_message->Deserialize(document); - result.emplace_back(std::move(base_message)); + if (message->partial_message_id != 0) { + auto* buf = CreateOrFindResizableBuffer(message->partial_message_id); + buf->Append(message->payload(), message->payload_size); + if (!message->has_more_chunks) { + AddIpcMessageFromJsonMessage(result, message->ipc_id, buf->memory, buf->size); + RemoveResizableBuffer(message->partial_message_id); + } + } + else { + assert(!message->has_more_chunks); + AddIpcMessageFromJsonMessage(result, message->ipc_id, message->payload(), message->payload_size); + } } + local_buffer->metadata()->bytes_used = 0; return result; } diff --git a/ipc.h b/ipc.h index be3ab9ca..359ddea2 100644 --- a/ipc.h +++ b/ipc.h @@ -93,7 +93,11 @@ struct IpcDirectionalChannel { std::vector> TakeMessages(); struct MessageBuffer; + struct ResizableBuffer; + ResizableBuffer* CreateOrFindResizableBuffer(int id); + void RemoveResizableBuffer(int id); + std::unordered_map> resizable_buffers; // Pointer to process shared memory and process shared mutex. std::unique_ptr shared; diff --git a/platform.h b/platform.h index 53135039..6d1c696d 100644 --- a/platform.h +++ b/platform.h @@ -14,7 +14,7 @@ struct PlatformSharedMemory { void* shared; }; -const int shmem_size = 50;// 1024 * 1024 * 32; // number of chars/bytes (32mb) +const int shmem_size = 200;// 1024 * 1024 * 32; // number of chars/bytes (32mb) std::unique_ptr CreatePlatformMutex(const std::string& name); std::unique_ptr CreatePlatformScopedMutexLock(PlatformMutex* mutex);