diff --git a/src/buffer.cc b/src/buffer.cc deleted file mode 100644 index 93486125..00000000 --- a/src/buffer.cc +++ /dev/null @@ -1,122 +0,0 @@ -#include "buffer.h" - -#include "platform.h" -#include "utils.h" - -#include - -#include -#include -#include - -namespace { - -struct ScopedLockLocal : public ScopedLock { - ScopedLockLocal(std::mutex& mutex) : guard(mutex) {} - std::lock_guard guard; -}; - -struct BufferLocal : public Buffer { - explicit BufferLocal(size_t capacity) { - this->data = malloc(capacity); - this->capacity = capacity; - } - ~BufferLocal() override { - free(data); - data = nullptr; - capacity = 0; - } - - std::unique_ptr WaitForExclusiveAccess() override { - return MakeUnique(mutex_); - } - - std::mutex mutex_; -}; - -struct ScopedLockPlatform : public ScopedLock { - ScopedLockPlatform(PlatformMutex* mutex) - : guard(CreatePlatformScopedMutexLock(mutex)) {} - - std::unique_ptr guard; -}; - -struct BufferPlatform : public Buffer { - explicit BufferPlatform(const std::string& name, size_t capacity) - : memory_(CreatePlatformSharedMemory(name + "_mem", capacity)), - mutex_(CreatePlatformMutex(name + "_mtx")) { - this->data = memory_->data; - this->capacity = memory_->capacity; - } - - ~BufferPlatform() override { - data = nullptr; - capacity = 0; - } - - std::unique_ptr WaitForExclusiveAccess() override { - return MakeUnique(mutex_.get()); - } - - std::unique_ptr memory_; - std::unique_ptr mutex_; -}; - -} // namespace - -std::unique_ptr Buffer::Create(size_t capacity) { - return MakeUnique(capacity); -} - -std::unique_ptr Buffer::CreateSharedBuffer(const std::string& name, - size_t capacity) { - return MakeUnique(name, capacity); -} - -TEST_SUITE("BufferLocal"); - -TEST_CASE("create") { - std::unique_ptr b = Buffer::Create(24); - REQUIRE(b->data); - REQUIRE(b->capacity == 24); - - b = Buffer::CreateSharedBuffer("indexertest", 24); - REQUIRE(b->data); - REQUIRE(b->capacity == 24); -} - -TEST_CASE("lock") { - auto buffers = {Buffer::Create(sizeof(int)), - Buffer::CreateSharedBuffer("indexertest", sizeof(int))}; - - for (auto& b : buffers) { - int* data = reinterpret_cast(b->data); - *data = 0; - - std::unique_ptr thread; - { - auto lock = b->WaitForExclusiveAccess(); - *data = 1; - - // Start a second thread, wait until it has attempted to acquire a lock. - volatile bool did_read = false; - thread = MakeUnique([&did_read, &b, &data]() { - did_read = true; - auto l = b->WaitForExclusiveAccess(); - *data = 2; - }); - while (!did_read) - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - - // Verify lock acquisition is waiting. - REQUIRE(*data == 1); - } - - // Wait for thread to acquire lock, verify it writes to data. - thread->join(); - REQUIRE(*data == 2); - } -} - -TEST_SUITE_END(); diff --git a/src/buffer.h b/src/buffer.h deleted file mode 100644 index 90f7219b..00000000 --- a/src/buffer.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include -#include - -struct ScopedLock { - virtual ~ScopedLock() = default; -}; - -// Points to a generic block of memory. Note that |data| is relocatable, ie, -// multiple Buffer instantiations may point to the same underlying block of -// memory but the data pointer has different values. -struct Buffer { - // Create a new buffer of the given capacity using process-local memory. - static std::unique_ptr Create(size_t capacity); - // Create a buffer pointing to memory shared across processes with the given - // capacity. - static std::unique_ptr CreateSharedBuffer(const std::string& name, - size_t capacity); - - virtual ~Buffer() = default; - - // Acquire a lock on the buffer, ie, become the only code that can read or - // write to it. The lock lasts so long as the returned object is alive. - virtual std::unique_ptr WaitForExclusiveAccess() = 0; - - void* data = nullptr; - size_t capacity = 0; -}; diff --git a/src/command_line.cc b/src/command_line.cc index 58ab0d1a..dd60bbf2 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -1,5 +1,4 @@ // TODO: cleanup includes -#include "buffer.h" #include "cache.h" #include "clang_complete.h" #include "file_consumer.h" @@ -9,7 +8,6 @@ #include "language_server_api.h" #include "lex_utils.h" #include "match.h" -#include "message_queue.h" #include "options.h" #include "platform.h" #include "project.h" diff --git a/src/message_queue.cc b/src/message_queue.cc deleted file mode 100644 index 8ca733e5..00000000 --- a/src/message_queue.cc +++ /dev/null @@ -1,416 +0,0 @@ -#include "message_queue.h" - -#include -#include -#include -#include -#include - -#include - -#include "platform.h" -#include "resizable_buffer.h" -#include "utils.h" - -// TODO: figure out a logging solution -//#define MESSAGE_QUEUE_LOG - -namespace { - -const int kMinimumPartialPayloadSize = 128; - -struct MessageHeader { - MessageHeader(uint32_t partial_id, bool has_more_chunks, size_t size) - : partial_id(partial_id), has_more_chunks(has_more_chunks), size(size) {} - - uint32_t partial_id; - bool has_more_chunks; - size_t size; -}; - -struct BufferMessageIterator { - static BufferMessageIterator Begin(void* buffer, size_t bytes_used) { - if (bytes_used == 0) - return End(); - - return BufferMessageIterator(buffer, bytes_used); - } - static BufferMessageIterator End() { - return BufferMessageIterator(nullptr, 0); - } - - // Start of buffer to iterate. - uint8_t* buffer; - // Number of bytes left in buffer to parse. - size_t remaining_bytes; - - BufferMessageIterator(void* buffer, size_t remaining_bytes) - : buffer(reinterpret_cast(buffer)), - remaining_bytes(remaining_bytes) {} - - MessageHeader* get() const { - return reinterpret_cast(buffer); - } - MessageHeader* operator*() const { return get(); } - MessageHeader* operator->() const { return get(); } - - void operator++() { - size_t next_message_offset = sizeof(MessageHeader) + get()->size; - if (next_message_offset >= remaining_bytes) { - assert(next_message_offset == remaining_bytes); - buffer = nullptr; - remaining_bytes = 0; - return; - } - - buffer = buffer + next_message_offset; - remaining_bytes -= next_message_offset; - } - - void* message_data() const { - return reinterpret_cast(buffer + sizeof(MessageHeader)); - } - - bool operator==(const BufferMessageIterator& other) const { - return buffer == other.buffer && remaining_bytes == other.remaining_bytes; - } - bool operator!=(const BufferMessageIterator& other) const { - return !(*this == other); - } -}; - -enum class RepeatResult { RunAgain, Break }; - -// Run |action| an arbitrary number of times. -void Repeat(std::function action) { - bool first = true; -#if defined(MESSAGE_QUEUE_LOG) - int log_iteration_count = 0; - int log_count = 0; -#endif - while (true) { - if (!first) { -#if defined(MESSAGE_QUEUE_LOG) - if (log_iteration_count > 1000) { - log_iteration_count = 0; - std::cerr << "[info]: Buffer full, waiting (" << log_count++ << ")" - << std::endl; - } - ++log_iteration_count; -#endif - // TODO: See if we can figure out a way to use condition variables - // cross-process. - std::this_thread::sleep_for(std::chrono::microseconds(0)); - } - first = false; - - if (action() == RepeatResult::RunAgain) - continue; - break; - } -} - -ResizableBuffer* CreateOrFindResizableBuffer( - std::unordered_map>& - resizable_buffers, - uint32_t id) { - auto it = resizable_buffers.find(id); - if (it != resizable_buffers.end()) - return it->second.get(); - return (resizable_buffers[id] = MakeUnique()).get(); -} - -std::unique_ptr MakeBuffer(void* content, size_t size) { - auto buffer = Buffer::Create(size); - memcpy(buffer->data, content, size); - return buffer; -} - -} // namespace - -Message::Message(void* data, size_t size) : data(data), size(size) {} - -struct MessageQueue::BufferMetadata { - // Reset buffer. - void reset() { total_message_bytes_ = 0; } - - // Total number of used bytes excluding the sizeof this metadata object. - void add_used_bytes(size_t used_bytes) { total_message_bytes_ += used_bytes; } - - // The total number of bytes in use. - size_t total_bytes_used_including_metadata() { - return total_message_bytes_ + sizeof(BufferMetadata); - } - - // The total number of bytes currently used for messages. This does not - // include the sizeof the buffer metadata. - size_t total_message_bytes() { return total_message_bytes_; } - - int next_partial_message_id = 0; - - private: - size_t total_message_bytes_ = 0; -}; - -MessageQueue::MessageQueue(std::unique_ptr buffer, bool buffer_has_data) - : buffer_(std::move(buffer)) { - assert(buffer_->capacity >= - (sizeof(BufferMetadata) + kMinimumPartialPayloadSize)); - - if (!buffer_has_data) - new (buffer_->data) BufferMetadata(); - - local_buffer_ = Buffer::Create(buffer_->capacity - sizeof(BufferMetadata)); - memset(local_buffer_->data, 0, local_buffer_->capacity); -} - -void MessageQueue::Enqueue(const Message& message) { -#if defined(MESSAGE_QUEUE_LOG) - int count = 0; -#endif - uint32_t partial_id = 0; - uint8_t* payload_data = reinterpret_cast(message.data); - size_t payload_size = message.size; - - Repeat([&]() { -#if defined(MESSAGE_QUEUE_LOG) - if (count++ > 500) { - std::cerr << "x500 Sending partial message payload_size=" << payload_size - << std::endl; - count = 0; - } -#endif - - auto lock = buffer_->WaitForExclusiveAccess(); - - // We cannot find the entire payload in the buffer. We have to send chunks - // of it over time. - if (payload_size >= BytesAvailableInBuffer()) { - // There's not enough room for our minimum payload size, so try again - // later. - if ((sizeof(MessageHeader) + kMinimumPartialPayloadSize) > - BytesAvailableInBuffer()) - return RepeatResult::RunAgain; - - if (partial_id == 0) { - // note: pre-increment so we use 1 as the initial value - partial_id = ++metadata()->next_partial_message_id; - } - - size_t sent_payload_size = - BytesAvailableInBuffer() - sizeof(MessageHeader); - // |sent_payload_size| must always be smaller than |payload_size|. If it - // is equal to |payload_size|, than we could have sent it as a normal, - // non-partial message. It's also an error if it is larger than - // payload_size (we're sending garbage data). - assert(sent_payload_size < payload_size); - - CopyPayloadToBuffer(partial_id, payload_data, sent_payload_size, - true /*has_more_chunks*/); - payload_data += sent_payload_size; - payload_size -= sent_payload_size; - - // Prepare for next time. - return RepeatResult::RunAgain; - } - - // The entire payload fits. Send it all now. - else { - // Include partial message id, as there could have been previous parts of - // this payload. - CopyPayloadToBuffer(partial_id, payload_data, payload_size, - false /*has_more_chunks*/); - -#if defined(MESSAGE_QUEUE_LOG) - std::cerr << "Sending full message with payload_size=" << payload_size - << std::endl; -#endif - return RepeatResult::Break; - } - }); -} - -std::vector> MessageQueue::DequeueAll() { - std::unordered_map> - resizable_buffers; - - std::vector> result; - - while (true) { - size_t local_buffer_size = 0; - - // Move data from shared memory into a local buffer. Do this - // before parsing the blocks so that other processes can begin - // posting data as soon as possible. - { - std::unique_ptr lock = buffer_->WaitForExclusiveAccess(); - assert(BytesAvailableInBuffer() >= 0); - - // note: Do not copy over buffer_ metadata. - local_buffer_size = metadata()->total_message_bytes(); - memcpy(local_buffer_->data, first_message_in_buffer(), local_buffer_size); - - metadata()->reset(); - } - - // Parse blocks from shared memory. - for (auto it = BufferMessageIterator::Begin(local_buffer_->data, - local_buffer_size); - it != BufferMessageIterator::End(); ++it) { -#if defined(MESSAGE_QUEUE_LOG) - std::cerr << "Got message with partial_id=" << it->partial_id - << ", payload_size=" << it->size - << ", has_more_chunks=" << it->has_more_chunks << std::endl; -#endif - - if (it->partial_id != 0) { - auto* buf = - CreateOrFindResizableBuffer(resizable_buffers, it->partial_id); - buf->Append(it.message_data(), it->size); - - if (!it->has_more_chunks) { - result.push_back(MakeBuffer(buf->buffer, buf->size)); - resizable_buffers.erase(it->partial_id); - } - } else { - // Note: we can't just return pointers to |local_buffer_| because if we - // read a partial message we will invalidate all of the existing - // pointers. We could jump through hoops to make it work (ie, if no - // partial messages return pointers to local_buffer_) but it is not - // worth the effort. - assert(!it->has_more_chunks); - result.push_back(MakeBuffer(it.message_data(), it->size)); - } - } - - // We're waiting for data to be posted to result. Delay a little so we - // don't push the CPU so hard. - if (!resizable_buffers.empty()) - std::this_thread::sleep_for(std::chrono::microseconds(0)); - else - break; - } - - return result; -} - -void MessageQueue::CopyPayloadToBuffer(uint32_t partial_id, - void* payload, - size_t payload_size, - bool has_more_chunks) { - assert(BytesAvailableInBuffer() >= (sizeof(MessageHeader) + payload_size)); - - // Copy header. - MessageHeader header(partial_id, has_more_chunks, payload_size); - memcpy(first_free_address_in_buffer(), &header, sizeof(MessageHeader)); - metadata()->add_used_bytes(sizeof(MessageHeader)); - // Copy payload. - memcpy(first_free_address_in_buffer(), payload, payload_size); - metadata()->add_used_bytes(payload_size); -} - -MessageQueue::BufferMetadata* MessageQueue::metadata() const { - return reinterpret_cast(buffer_->data); -} - -size_t MessageQueue::BytesAvailableInBuffer() const { - return buffer_->capacity - metadata()->total_bytes_used_including_metadata(); -} - -Message* MessageQueue::first_message_in_buffer() const { - return reinterpret_cast(reinterpret_cast(buffer_->data) + - sizeof(BufferMetadata)); -} - -void* MessageQueue::first_free_address_in_buffer() const { - if (metadata()->total_bytes_used_including_metadata() >= buffer_->capacity) - return nullptr; - return reinterpret_cast( - reinterpret_cast(buffer_->data) + - metadata()->total_bytes_used_including_metadata()); -} - -TEST_SUITE("MessageQueue"); - -TEST_CASE("simple") { - MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5), - false /*buffer_has_data*/); - - int data = 0; - data = 1; - queue.Enqueue(Message(&data, sizeof(data))); - data = 2; - queue.Enqueue(Message(&data, sizeof(data))); - - int expected = 0; - for (std::unique_ptr& m : queue.DequeueAll()) { - ++expected; - - REQUIRE(m->capacity == sizeof(data)); - int* value = reinterpret_cast(m->data); - REQUIRE(expected == *value); - } -} - -TEST_CASE("large payload") { - MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5), - false /*buffer_has_data*/); - - // Allocate big buffer. - size_t num_ints = kMinimumPartialPayloadSize * 100; - int* sent_ints = reinterpret_cast(malloc(sizeof(int) * num_ints)); - for (int i = 0; i < num_ints; ++i) - sent_ints[i] = i; - - // Queue big buffer. Add surrounding messages to make sure they get sent - // correctly. - // Run in a separate thread because Enqueue will block. - volatile bool done_sending = false; - std::thread sender([&]() { - int small = 5; - queue.Enqueue(Message(&small, sizeof(small))); - queue.Enqueue(Message(sent_ints, sizeof(int) * num_ints)); - queue.Enqueue(Message(&small, sizeof(small))); - done_sending = true; - }); - - // Receive sent messages. - { - // Keep dequeuing messages until we have three. - std::vector> messages; - while (messages.size() != 3) { - for (auto& message : queue.DequeueAll()) - messages.emplace_back(std::move(message)); - } - sender.join(); - - // Small - { - REQUIRE(sizeof(int) == messages[0]->capacity); - int* value = reinterpret_cast(messages[0]->data); - REQUIRE(*value == 5); - } - - // Big - { - int* received_ints = reinterpret_cast(messages[1]->data); - REQUIRE(received_ints != sent_ints); - REQUIRE(messages[1]->capacity == (sizeof(int) * num_ints)); - for (int i = 0; i < num_ints; ++i) { - REQUIRE(received_ints[i] == i); - REQUIRE(received_ints[i] == sent_ints[i]); - } - } - - // Small - { - REQUIRE(sizeof(int) == messages[2]->capacity); - int* value = reinterpret_cast(messages[2]->data); - REQUIRE(*value == 5); - } - } - - free(sent_ints); -} - -TEST_SUITE_END(); diff --git a/src/message_queue.h b/src/message_queue.h deleted file mode 100644 index d8a58db9..00000000 --- a/src/message_queue.h +++ /dev/null @@ -1,80 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "buffer.h" - -struct ResizableBuffer; - -struct Message { - Message(void* data, size_t size); - - void* data; - size_t size; -}; - -// A MessageQueue is a FIFO container storing messages in an arbitrary memory -// buffer that is cross-thread and cross-process safe. This means: -// - Multiple separate MessageQueues instantiations can point to the -// same underlying buffer and use it at the same time. -// - The buffer is fully relocatable, ie, it can have multiple different -// addresses (as is the case for memory shared across processes). -// -// There can be multiple writers, but there can only be one reader. -struct MessageQueue { - // Create a new MessageQueue using |buffer| as the backing data storage. - // This does *not* take ownership over the memory stored in |buffer|. - // - // If |buffer_has_data| is true, then it is assumed that |buffer| contains - // data and has already been initialized. It is a perfectly acceptable - // use-case to have multiple completely separate MessageQueue - // instantiations pointing to the same memory. - explicit MessageQueue(std::unique_ptr buffer, bool buffer_has_data); - MessageQueue(const MessageQueue&) = delete; - - // Enqueue a message to the queue. This will wait until there is room in - // queue. If the message is too large to fit into the queue, this will - // wait until the message has been fully sent, which may involve multiple - // IPC roundtrips (ie, Enqueue -> DequeueAll -> Enqueue) - so this method - // may take a long time to run. - // - // TODO: Consider copying message memory to a temporary buffer and running - // enqueues on a worker thread. - void Enqueue(const Message& message); - - // Take all messages from the queue. - std::vector> DequeueAll(); - - private: - struct BufferMetadata; - - void CopyPayloadToBuffer(uint32_t partial_id, - void* payload, - size_t payload_size, - bool has_more_chunks); - - BufferMetadata* metadata() const; - // Returns the number of bytes currently available in the buffer. - size_t BytesAvailableInBuffer() const; - Message* first_message_in_buffer() const; - // First free message in the buffer. - void* first_free_address_in_buffer() const; - - std::unique_ptr buffer_; - std::unique_ptr local_buffer_; -}; - -/* -// TODO: We convert IpcMessage <-> Message as a user-level operation. -// MessageQueue doesn't know about IpcMessage. -struct IpcMessage { - std::unique_ptr ToMessage(); - void BuildFromMessage(std::unique_ptr message); - - // Serialize/deserialize the message. - virtual void Serialize(Writer& writer) = 0; - virtual void Deserialize(Reader& reader) = 0; -}; -*/ diff --git a/src/resizable_buffer.cc b/src/resizable_buffer.cc deleted file mode 100644 index 0486eafc..00000000 --- a/src/resizable_buffer.cc +++ /dev/null @@ -1,100 +0,0 @@ -#include "resizable_buffer.h" - -#include - -#include -#include -#include -#include - -namespace { -const size_t kInitialCapacity = 128; -} - -ResizableBuffer::ResizableBuffer() { - buffer = malloc(kInitialCapacity); - size = 0; - capacity_ = kInitialCapacity; -} - -ResizableBuffer::~ResizableBuffer() { - free(buffer); - size = 0; - capacity_ = 0; -} - -void ResizableBuffer::Append(void* content, size_t content_size) { - assert(capacity_ >= 0); - - size_t new_size = size + content_size; - - // Grow buffer capacity if needed. - if (new_size >= capacity_) { - size_t new_capacity = capacity_ * 2; - while (new_size >= new_capacity) - new_capacity *= 2; - void* new_memory = malloc(new_capacity); - assert(size < capacity_); - memcpy(new_memory, buffer, size); - free(buffer); - buffer = new_memory; - capacity_ = new_capacity; - } - - // Append new content into memory. - memcpy(reinterpret_cast(buffer) + size, content, content_size); - size = new_size; -} - -void ResizableBuffer::Reset() { - size = 0; -} - -TEST_SUITE("ResizableBuffer"); - -TEST_CASE("buffer starts with zero size") { - ResizableBuffer b; - REQUIRE(b.buffer); - REQUIRE(b.size == 0); -} - -TEST_CASE("append and reset") { - int content = 1; - ResizableBuffer b; - - b.Append(&content, sizeof(content)); - REQUIRE(b.size == sizeof(content)); - - b.Append(&content, sizeof(content)); - REQUIRE(b.size == (2 * sizeof(content))); - - b.Reset(); - REQUIRE(b.size == 0); -} - -TEST_CASE("appended content is copied into buffer w/ resize") { - int content = 0; - ResizableBuffer b; - - // go past kInitialCapacity to verify resize works too - while (b.size < kInitialCapacity * 2) { - b.Append(&content, sizeof(content)); - content += 1; - } - - for (int i = 0; i < content; ++i) - REQUIRE(i == *(reinterpret_cast(b.buffer) + i)); -} - -TEST_CASE("reset does not reallocate") { - ResizableBuffer b; - - while (b.size < kInitialCapacity) - b.Append(&b, sizeof(b)); - - void* buffer = b.buffer; - b.Reset(); - REQUIRE(b.buffer == buffer); -} - -TEST_SUITE_END(); \ No newline at end of file diff --git a/src/resizable_buffer.h b/src/resizable_buffer.h deleted file mode 100644 index 23abde1d..00000000 --- a/src/resizable_buffer.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include - -// Points to a generic block of memory that can be resized. This class owns -// and has the only pointer to the underlying memory buffer. -struct ResizableBuffer { - ResizableBuffer(); - ResizableBuffer(const ResizableBuffer&) = delete; - ~ResizableBuffer(); - - void Append(void* content, size_t content_size); - void Reset(); - - // Buffer content. - void* buffer; - // Number of bytes in |buffer|. Note that the actual buffer may be larger - // than |size|. - size_t size; - - private: - size_t capacity_; -}; diff --git a/src/typed_bidi_message_queue.h b/src/typed_bidi_message_queue.h deleted file mode 100644 index e5538871..00000000 --- a/src/typed_bidi_message_queue.h +++ /dev/null @@ -1,104 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "buffer.h" -#include "message_queue.h" -#include "serializer.h" - -// TypedBidiMessageQueue provides a type-safe server/client implementation on -// top of a couple MessageQueue instances. -template -struct TypedBidiMessageQueue { - using Serializer = std::function; - using Deserializer = - std::function(Reader& visitor)>; - - TypedBidiMessageQueue(const std::string& name, size_t buffer_size) - : for_server(Buffer::CreateSharedBuffer(name + "_fs", buffer_size), - false /*buffer_has_data*/), - for_client(Buffer::CreateSharedBuffer(name + "_fc", buffer_size), - true /*buffer_has_data*/) {} - - void RegisterId(TId id, - const Serializer& serializer, - const Deserializer& deserializer) { - assert(serializers_.find(id) == serializers_.end() && - deserializers_.find(id) == deserializers_.end() && - "Duplicate registration"); - - serializers_[id] = serializer; - deserializers_[id] = deserializer; - } - - void SendMessage(MessageQueue* destination, TId id, TMessage& message) { - // Create writer. - rapidjson::StringBuffer output; - rapidjson::PrettyWriter writer(output); - writer.SetIndent(' ', 0); - - // Serialize the message. - assert(serializers_.find(id) != serializers_.end() && - "No registered serializer"); - const Serializer& serializer = serializers_.find(id)->second; - serializer(writer, message); - - // Send message. - void* payload = malloc(sizeof(MessageHeader) + output.GetSize()); - reinterpret_cast(payload)->id = id; - memcpy( - (void*)(reinterpret_cast(payload) + sizeof(MessageHeader)), - output.GetString(), output.GetSize()); - destination->Enqueue( - Message(payload, sizeof(MessageHeader) + output.GetSize())); - free(payload); - } - - // Retrieve all messages from the given |queue|. - std::vector> GetMessages( - MessageQueue* queue) const { - assert(queue == &for_server || queue == &for_client); - - std::vector> messages = queue->DequeueAll(); - std::vector> result; - result.reserve(messages.size()); - - for (std::unique_ptr& buffer : messages) { - MessageHeader* header = reinterpret_cast(buffer->data); - - // Parse message content. - rapidjson::Document document; - document.Parse( - reinterpret_cast(buffer->data) + sizeof(MessageHeader), - buffer->capacity - sizeof(MessageHeader)); - if (document.HasParseError()) { - std::cerr << "[FATAL]: Unable to parse IPC message" << std::endl; - exit(1); - } - - // Deserialize it. - assert(deserializers_.find(header->id) != deserializers_.end() && - "No registered deserializer"); - const Deserializer& deserializer = - deserializers_.find(header->id)->second; - result.emplace_back(deserializer(document)); - } - - return result; - } - - // Messages which the server process should handle. - MessageQueue for_server; - // Messages which the client process should handle. - MessageQueue for_client; - - private: - struct MessageHeader { - TId id; - }; - - std::unordered_map serializers_; - std::unordered_map deserializers_; -};