diff --git a/ipc.cc b/ipc.cc index 721fcf23..b52b783e 100644 --- a/ipc.cc +++ b/ipc.cc @@ -2,6 +2,45 @@ #include "serializer.h" namespace { + struct BufferBuilder { + void* memory; + size_t size; + size_t capacity; + + BufferBuilder() { + memory = malloc(128); + size = 0; + capacity = 128; + } + + ~BufferBuilder() { + free(memory); + size = 0; + capacity = 0; + } + + void AppendToBuffer(void* content, size_t content_size) { + if (size + content_size > capacity) { + // Grow memory if needed. + size_t new_size = capacity * 2; + while (new_size < size + content_size) + new_size *= 2; + void* new_memory = malloc(capacity); + memcpy(new_memory, memory, size); + free(memory); + memory = new_memory; + + // Append new content into memory. + memcpy((char*)memory + size, content, content_size); + size += content_size; + } + } + + void Reset() { + size = 0; + } + }; + // JSON-encoded message that is passed across shared memory. // // Messages are funky objects. They contain potentially variable amounts of @@ -85,6 +124,47 @@ struct IpcDirectionalChannel::MessageBuffer { JsonMessage* free_message() { return message_at_offset(metadata()->bytes_used); } + + struct Iterator { + void* buffer; + size_t remaining_bytes; + + Iterator(void* buffer, size_t remaining_bytes) : remaining_bytes(remaining_bytes) {} + + JsonMessage* get() const { + assert(buffer); + return reinterpret_cast(buffer); + } + + JsonMessage* operator->() const { + return get(); + } + + Iterator operator++() const { + size_t next_message_offset = sizeof(JsonMessage) + get()->payload_size; + if (next_message_offset >= remaining_bytes) { + assert(next_message_offset == remaining_bytes); + return Iterator(nullptr, 0); + } + + auto* next_message = (char*)buffer + next_message_offset; + return Iterator(next_message, remaining_bytes - next_message_offset); + } + + bool operator==(const Iterator& other) const { + return buffer == other.buffer && remaining_bytes == other.remaining_bytes; + } + bool operator!=(const Iterator& other) const { + return !(*this == other); + } + }; + + Iterator begin() { + return Iterator(first_message(), metadata()->bytes_used); + } + Iterator end() { + return Iterator(nullptr, 0); + } }; IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) { @@ -170,19 +250,28 @@ std::vector> IpcDirectionalChannel::TakeMessages() { std::vector> result; - size_t offset = 0; - while (remaining_bytes > 0) { - JsonMessage* message = local_buffer->message_at_offset(offset); - std::cerr << "remaining_bytes=" << remaining_bytes << ", offset=" << offset << ", message->payload_size=" << message->payload_size << std::endl; - offset += message->payload_size; - remaining_bytes -= sizeof(JsonMessage) + message->payload_size; - + // TODO + for (auto it = local_buffer->begin(); it != local_buffer->end(); ++it) { + // TODO: partial payload, maybe something like this: + // + // if (it->partial_id != 0) { + // auto* buf = CreateOrFindResizableBuffer(it->partial_id); + // buf->Append(it->payload(), it->payload_size()); + // if (it->is_complete) { + // Process(buf.payload(), buff.payload_size()) + // RemoveResizableBuffer(it->partial_id) + // } + // } + // else { + // Process(it->payload(), it->payload_size()) + // } + // rapidjson::Document document; - document.Parse(message->payload(), message->payload_size); + document.Parse(it->payload(), it->payload_size); bool has_error = document.HasParseError(); auto error = document.GetParseError(); - std::unique_ptr base_message = IpcRegistry::instance()->Allocate(message->ipc_id); + std::unique_ptr base_message = IpcRegistry::instance()->Allocate(it->ipc_id); base_message->Deserialize(document); result.emplace_back(std::move(base_message)); }