diff --git a/indexer.h b/indexer.h index 8d4eb192..82b4d6fe 100644 --- a/indexer.h +++ b/indexer.h @@ -64,7 +64,7 @@ using TypeId = Id; using FuncId = Id; using VarId = Id; -class IdCache; +struct IdCache; struct Location { bool interesting; diff --git a/ipc.cc b/ipc.cc index 7eb131d5..721fcf23 100644 --- a/ipc.cc +++ b/ipc.cc @@ -51,7 +51,7 @@ struct IpcDirectionalChannel::MessageBuffer { template T* Offset(size_t offset) { - return static_cast(static_cast(real_buffer) + offset); + return reinterpret_cast(static_cast(real_buffer) + offset); } // Number of bytes available in buffer_start. Note that this @@ -71,25 +71,31 @@ struct IpcDirectionalChannel::MessageBuffer { return Offset(0); } + JsonMessage* message_at_offset(size_t offset) { + return Offset(sizeof(Metadata) + offset); + } + // First json message. JsonMessage* first_message() { - return Offset(sizeof(Metadata)); + return message_at_offset(0); } // First free, writable json message. Make sure to increase *bytes_used() // by any written size. JsonMessage* free_message() { - return Offset(sizeof(Metadata) + metadata()->bytes_used); + return message_at_offset(metadata()->bytes_used); } }; IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) { shared = CreatePlatformSharedMemory(name + "memory"); mutex = CreatePlatformMutex(name + "mutex"); - local = std::unique_ptr(new char[shmem_size]); + local = std::unique_ptr(new char[shmem_size]); - shared_buffer = MakeUnique(shared->shared); - local_buffer = MakeUnique(local.get()); + // TODO: connecting a client will allocate reset shared state on the + // buffer. We need to store if we "initialized". + shared_buffer = MakeUnique(shared->shared, shmem_size); + local_buffer = MakeUnique(local.get(), shmem_size); } IpcDirectionalChannel::~IpcDirectionalChannel() {} @@ -140,6 +146,7 @@ void IpcDirectionalChannel::PushMessage(IpcMessage* message) { shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + shared_buffer->free_message()->payload_size; assert((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata)) < shmem_size); + assert(shared_buffer->metadata()->bytes_used >= 0); shared_buffer->free_message()->ipc_id = IpcId::Invalid; break; } @@ -153,30 +160,31 @@ std::vector> IpcDirectionalChannel::TakeMessages() { // posting data as soon as possible. { std::unique_ptr lock = CreatePlatformScopedMutexLock(mutex.get()); - remaining_bytes = *shared->shared_bytes_used; + assert(shared_buffer->metadata()->bytes_used <= shmem_size); + remaining_bytes = shared_buffer->metadata()->bytes_used; - memcpy(local_block, shared->shared_start, *shared->shared_bytes_used); - *shared->shared_bytes_used = 0; - get_free_message(this)->ipc_id = IpcId::Invalid; + memcpy(local.get(), shared->shared, sizeof(MessageBuffer::Metadata) + shared_buffer->metadata()->bytes_used); + shared_buffer->metadata()->bytes_used = 0; + shared_buffer->free_message()->ipc_id = IpcId::Invalid; } std::vector> result; - char* message = local_block; + size_t offset = 0; while (remaining_bytes > 0) { - std::unique_ptr base_message = IpcRegistry::instance()->Allocate(as_message(message)->ipc_id); + JsonMessage* message = local_buffer->message_at_offset(offset); + std::cerr << "remaining_bytes=" << remaining_bytes << ", offset=" << offset << ", message->payload_size=" << message->payload_size << std::endl; + offset += message->payload_size; + remaining_bytes -= sizeof(JsonMessage) + message->payload_size; rapidjson::Document document; - document.Parse(as_message(message)->payload(), as_message(message)->payload_size); + document.Parse(message->payload(), message->payload_size); bool has_error = document.HasParseError(); auto error = document.GetParseError(); + std::unique_ptr base_message = IpcRegistry::instance()->Allocate(message->ipc_id); base_message->Deserialize(document); - result.emplace_back(std::move(base_message)); - - remaining_bytes -= sizeof(JsonMessage) + as_message(message)->payload_size; - message = message + sizeof(JsonMessage) + as_message(message)->payload_size; } return result; diff --git a/ipc.h b/ipc.h index 94529dd2..be3ab9ca 100644 --- a/ipc.h +++ b/ipc.h @@ -92,7 +92,7 @@ struct IpcDirectionalChannel { void PushMessage(IpcMessage* message); std::vector> TakeMessages(); - class MessageBuffer; + struct MessageBuffer; // Pointer to process shared memory and process shared mutex. @@ -100,7 +100,7 @@ struct IpcDirectionalChannel { std::unique_ptr mutex; // Pointer to process-local memory. - std::unique_ptr local; + std::unique_ptr local; std::unique_ptr shared_buffer; std::unique_ptr local_buffer; diff --git a/platform.h b/platform.h index 4a283473..53135039 100644 --- a/platform.h +++ b/platform.h @@ -10,11 +10,11 @@ struct PlatformScopedMutexLock { virtual ~PlatformScopedMutexLock() {} }; struct PlatformSharedMemory { - virtual ~PlatformSharedMemory(); + virtual ~PlatformSharedMemory() {} void* shared; }; -const int shmem_size = 1024 * 1024 * 32; // number of chars/bytes (32mb) +const int shmem_size = 50;// 1024 * 1024 * 32; // number of chars/bytes (32mb) std::unique_ptr CreatePlatformMutex(const std::string& name); std::unique_ptr CreatePlatformScopedMutexLock(PlatformMutex* mutex);