From 1a6fd858c67110e9d88a1012db973875cccd8d2f Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Sun, 12 Mar 2017 13:28:19 -0700 Subject: [PATCH] wip --- ipc.cc | 94 +++++++++++++++++++++++++++++++++++++------------ ipc.h | 8 ++++- platform.h | 6 ++-- platform_win.cc | 10 ++---- 4 files changed, 83 insertions(+), 35 deletions(-) diff --git a/ipc.cc b/ipc.cc index ac9c0e65..7eb131d5 100644 --- a/ipc.cc +++ b/ipc.cc @@ -2,6 +2,12 @@ #include "serializer.h" namespace { + // JSON-encoded message that is passed across shared memory. + // + // Messages are funky objects. They contain potentially variable amounts of + // data and are passed between processes. This means that they need to be + // fully relocatable, ie, it is possible to memmove them in memory to a + // completely different address. struct JsonMessage { IpcId ipc_id; size_t payload_size; @@ -16,19 +22,6 @@ namespace { } }; - JsonMessage* get_free_message(IpcDirectionalChannel* channel) { - return reinterpret_cast(channel->shared->shared_start + *channel->shared->shared_bytes_used); - } - // Messages are funky objects. They contain potentially variable amounts of - // data and are passed between processes. This means that they need to be - // fully relocatable, ie, it is possible to memmove them in memory to a - // completely different address. - - - JsonMessage* as_message(char* ptr) { - return reinterpret_cast(ptr); - } - std::string NameToServerName(const std::string& name) { return name + "server"; } @@ -44,15 +37,70 @@ std::unique_ptr IpcRegistry::Allocate(IpcId id) { return std::unique_ptr((*allocators)[id]()); } +struct IpcDirectionalChannel::MessageBuffer { + MessageBuffer(void* buffer, size_t buffer_size) { + real_buffer = buffer; + real_buffer_size = buffer_size; + new(real_buffer) Metadata(); + } + + // Pointer to the start of the actual buffer and the + // amount of storage actually available. + void* real_buffer; + size_t real_buffer_size; + + template + T* Offset(size_t offset) { + return static_cast(static_cast(real_buffer) + offset); + } + + // Number of bytes available in buffer_start. Note that this + // is smaller than the total buffer size, since there is some + // metadata stored at the start of the buffer. + size_t buffer_size; + + struct Metadata { + // The number of bytes that are currently used in the buffer minus the + // size of this Metadata struct. + size_t bytes_used = 0; + int next_partial_message_id = 0; + int num_outstanding_partial_messages = 0; + }; + + Metadata* metadata() { + return Offset(0); + } + + // First json message. + JsonMessage* first_message() { + return Offset(sizeof(Metadata)); + } + + // First free, writable json message. Make sure to increase *bytes_used() + // by any written size. + JsonMessage* free_message() { + return Offset(sizeof(Metadata) + metadata()->bytes_used); + } +}; + IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) { - local_block = new char[shmem_size]; shared = CreatePlatformSharedMemory(name + "memory"); mutex = CreatePlatformMutex(name + "mutex"); + local = std::unique_ptr(new char[shmem_size]); + + shared_buffer = MakeUnique(shared->shared); + local_buffer = MakeUnique(local.get()); } -IpcDirectionalChannel::~IpcDirectionalChannel() { - delete[] local_block; -} +IpcDirectionalChannel::~IpcDirectionalChannel() {} + +// TODO: +// We need to send partial payloads. But other payloads may appear in +// the middle of the payload. +// +// +// int partial_payload_id = 0 +// int num_uncompleted_payloads = 0 void IpcDirectionalChannel::PushMessage(IpcMessage* message) { assert(message->ipc_id != IpcId::Invalid); @@ -84,15 +132,15 @@ void IpcDirectionalChannel::PushMessage(IpcMessage* message) { std::unique_ptr lock = CreatePlatformScopedMutexLock(mutex.get()); // Try again later when there is room in shared memory. - if ((*shared->shared_bytes_used + sizeof(JsonMessage) + payload_size) >= shmem_size) + if ((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata) + sizeof(JsonMessage) + payload_size) >= shmem_size) continue; - get_free_message(this)->ipc_id = message->ipc_id; - get_free_message(this)->SetPayload(payload_size, output.GetString()); + shared_buffer->free_message()->ipc_id = message->ipc_id; + shared_buffer->free_message()->SetPayload(payload_size, output.GetString()); - *shared->shared_bytes_used += sizeof(JsonMessage) + get_free_message(this)->payload_size; - assert(*shared->shared_bytes_used < shmem_size); - get_free_message(this)->ipc_id = IpcId::Invalid; + shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + shared_buffer->free_message()->payload_size; + assert((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata)) < shmem_size); + shared_buffer->free_message()->ipc_id = IpcId::Invalid; break; } diff --git a/ipc.h b/ipc.h index 9647e900..94529dd2 100644 --- a/ipc.h +++ b/ipc.h @@ -92,12 +92,18 @@ struct IpcDirectionalChannel { void PushMessage(IpcMessage* message); std::vector> TakeMessages(); + class MessageBuffer; + + // Pointer to process shared memory and process shared mutex. std::unique_ptr shared; std::unique_ptr mutex; // Pointer to process-local memory. - char* local_block; + std::unique_ptr local; + + std::unique_ptr shared_buffer; + std::unique_ptr local_buffer; }; struct IpcServer { diff --git a/platform.h b/platform.h index a05fb95d..4a283473 100644 --- a/platform.h +++ b/platform.h @@ -10,10 +10,8 @@ struct PlatformScopedMutexLock { virtual ~PlatformScopedMutexLock() {} }; struct PlatformSharedMemory { - virtual ~PlatformSharedMemory() {} - - size_t* shared_bytes_used; - char* shared_start; + virtual ~PlatformSharedMemory(); + void* shared; }; const int shmem_size = 1024 * 1024 * 32; // number of chars/bytes (32mb) diff --git a/platform_win.cc b/platform_win.cc index 65707531..0a76484d 100644 --- a/platform_win.cc +++ b/platform_win.cc @@ -34,7 +34,6 @@ struct PlatformScopedMutexLockWin : public PlatformScopedMutexLock { struct PlatformSharedMemoryWin : public PlatformSharedMemory { HANDLE shmem_; - void* shared_start_real_; PlatformSharedMemoryWin(const std::string& name) { shmem_ = CreateFileMapping( @@ -46,15 +45,12 @@ struct PlatformSharedMemoryWin : public PlatformSharedMemory { name.c_str() ); - shared_start_real_ = MapViewOfFile(shmem_, FILE_MAP_ALL_ACCESS, 0, 0, shmem_size); - - shared_bytes_used = reinterpret_cast(shared_start_real_); - *shared_bytes_used = 0; - shared_start = reinterpret_cast(shared_bytes_used + 1); + shared = MapViewOfFile(shmem_, FILE_MAP_ALL_ACCESS, 0, 0, shmem_size); } ~PlatformSharedMemoryWin() override { - UnmapViewOfFile(shared_start_real_); + UnmapViewOfFile(shared); + shared = nullptr; } };