diff --git a/ipc.cc b/ipc.cc index 9df2e21e..1adecab7 100644 --- a/ipc.cc +++ b/ipc.cc @@ -270,7 +270,7 @@ void IpcDirectionalChannel::PushMessage(IpcMessage* message) { if (payload_size == 0) return; - int partial_message_id = 0; // TODO + int partial_message_id = 0; std::cerr << "Starting dispatch of payload with size " << payload_size << std::endl; int count = 0; diff --git a/src/message_queue.cc b/src/message_queue.cc index 92798696..31609bf2 100644 --- a/src/message_queue.cc +++ b/src/message_queue.cc @@ -1,6 +1,93 @@ #include "message_queue.h" #include +#include +#include +#include + +#include "platform.h" + +namespace { + +const int kMinimumPartialPayloadSize = 128; + +struct BufferMessageIterator { + static BufferMessageIterator Begin(void* buffer, size_t bytes_used) { + if (bytes_used == 0) + return End(); + + return BufferMessageIterator(buffer, bytes_used); + } + static BufferMessageIterator End() { + return BufferMessageIterator(nullptr, 0); + } + + + // Start of buffer to iterate. + void* buffer; + // Number of bytes left in buffer to parse. + size_t remaining_bytes; + + BufferMessageIterator(void* buffer, size_t remaining_bytes) + : buffer(buffer), remaining_bytes(remaining_bytes) {} + + Message* get() const { + assert(buffer); + return reinterpret_cast(buffer); + } + Message* operator*() const { return get(); } + Message* operator->() const { return get(); } + + void operator++() { + size_t next_message_offset = get()->total_size; + if (next_message_offset >= remaining_bytes) { + assert(next_message_offset == remaining_bytes); + buffer = nullptr; + remaining_bytes = 0; + return; + } + + buffer = reinterpret_cast(buffer) + next_message_offset; + remaining_bytes -= next_message_offset; + } + + bool operator==(const BufferMessageIterator& other) const { + return buffer == other.buffer && remaining_bytes == other.remaining_bytes; + } + bool operator!=(const BufferMessageIterator& other) const { + return !(*this == other); + } +}; + +enum class RepeatResult { + RunAgain, + Break +}; + +// Run |action| an arbitrary number of times. +void Repeat(std::function action) { + bool first = true; + int log_iteration_count = 0; + int log_count = 0; + while (true) { + if (!first) { + if (log_iteration_count > 1000) { + log_iteration_count = 0; + std::cerr << "[info]: shmem full, waiting (" << log_count++ << ")" << std::endl; // TODO: remove + } + ++log_iteration_count; + // TODO: See if we can figure out a way to use condition variables cross-process. + std::this_thread::sleep_for(std::chrono::microseconds(0)); + } + first = false; + + if (action() == RepeatResult::RunAgain) + continue; + break; + } +} + +} // namespace struct MessageQueue::BufferMetadata { // Total number of used bytes exluding the sizeof this metadata object. @@ -17,18 +104,116 @@ struct MessageQueue::BufferMetadata { // include the sizeof the buffer metadata. size_t total_message_bytes() { return total_message_bytes_; } - private: + int next_partial_message_id = 0; + +private: size_t total_message_bytes_ = 0; }; MessageQueue::MessageQueue(std::unique_ptr buffer, bool buffer_has_data) - : buffer_(std::move(buffer)) { + : buffer_(std::move(buffer)) { if (!buffer_has_data) new (buffer_->data) BufferMetadata(); } -void MessageQueue::Enqueue(const Message& message) {} +void MessageQueue::Enqueue(const Message& message) { + //BufferMessageIterator::Begin(first_message_in_buffer(), metadata()->total_message_bytes); -MessageQueue::BufferMetadata* MessageQueue::Metadata() { + int partial_message_id = 0; + + Repeat([&]() { + auto lock = buffer_->WaitForExclusiveAccess(); + + // We cannot find the entire payload in the buffer. We have to send chunks + // of it over time. + if (message.total_size >= BytesAvailableInBuffer()) { + // There's not enough room for our minimum payload size, so try again later. + if ((sizeof(Message) + kMinimumPartialPayloadSize) > BytesAvailableInBuffer()) + return RepeatResult::RunAgain; + + if (partial_message_id == 0) + partial_message_id = ++metadata()->next_partial_message_id; // note: pre-increment so we 1 as initial value + + + size_t sent_payload_size = BytesAvailableInBuffer() - sizeof(Message); + free_message_in_buffer()->CopyFrom(message); + metadata()->set_total_messages_byte_count( + metadata()->total_message_bytes() + sizeof(Message) + sent_payload_size); + + //shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, true /*has_more_chunks*/, sent_payload_size, payload); + //shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + sent_payload_size; + //shared_buffer->free_message()->ipc_id = IpcId::Invalid; // Note: free_message() may be past writable memory. + + if (count++ > 50) { + std::cerr << "x50 Sending partial message with payload_size=" << sent_payload_size << std::endl; + count = 0; + } + + // Prepare for next time. + payload_size -= sent_payload_size; + payload += sent_payload_size; + return RepeatResult::RunAgain; + } + + return RepeatResult::Break; + +#if false + assert(payload_size > 0); + + // We cannot find the entire payload in the buffer. We + // have to send chunks of it over time. + if ((sizeof(JsonMessage) + payload_size) > shared_buffer->bytes_available()) { + if ((sizeof(JsonMessage) + kMinimumPartialPayloadSize) > shared_buffer->bytes_available()) + return DispatchResult::RunAgain; + + if (partial_message_id == 0) + partial_message_id = ++shared_buffer->metadata()->next_partial_message_id; // note: pre-increment so we 1 as initial value + + size_t sent_payload_size = shared_buffer->bytes_available() - sizeof(JsonMessage); + shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, true /*has_more_chunks*/, sent_payload_size, payload); + shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + sent_payload_size; + //shared_buffer->free_message()->ipc_id = IpcId::Invalid; // Note: free_message() may be past writable memory. + + if (count++ > 50) { + std::cerr << "x50 Sending partial message with payload_size=" << sent_payload_size << std::endl; + count = 0; + } + + // Prepare for next time. + payload_size -= sent_payload_size; + payload += sent_payload_size; + return RepeatResult::RunAgain; + } + // The entire payload fits. Send it all now. + else { + // Include partial message id, as there could have been previous parts of this payload. + shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, false /*has_more_chunks*/, payload_size, payload); + shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + payload_size; + shared_buffer->free_message()->ipc_id = IpcId::Invalid; + //std::cerr << "Sending full message with payload_size=" << payload_size << std::endl; + + return RepeatResult::Break; + } +#endif +}); +} + +MessageQueue::BufferMetadata* MessageQueue::metadata() const { return reinterpret_cast(buffer_->data); +} + +size_t MessageQueue::BytesAvailableInBuffer() const { + return buffer_->capacity - metadata()->total_bytes_used_including_metadata(); +} + +Message* MessageQueue::first_message_in_buffer() const { + return reinterpret_cast( + reinterpret_cast(buffer_->data) + sizeof(BufferMetadata)); +} + +Message* MessageQueue::free_message_in_buffer() const { + if (metadata()->total_bytes_used_including_metadata >= buffer_->capacity) + return nullptr; + return reinterpret_cast( + reinterpret_cast(buffer_->data) + metadata()->total_bytes_used_including_metadata()); } \ No newline at end of file diff --git a/src/message_queue.h b/src/message_queue.h index 183e2e9e..e2ab030e 100644 --- a/src/message_queue.h +++ b/src/message_queue.h @@ -11,6 +11,9 @@ struct Message { // Total size of the message (including metadata that this object stores). size_t total_size; + + // Size of the extra message data immediately following the message payload. + size_t message_size() const { return total_size - sizeof(Message); } }; // A MessageQueue is a FIFO container storing messages in an arbitrary memory @@ -55,7 +58,12 @@ struct MessageQueue { private: struct BufferMetadata; - BufferMetadata* Metadata(); + BufferMetadata* metadata() const; + // Returns the number of bytes currently available in the buffer. + size_t BytesAvailableInBuffer() const; + Message* first_message_in_buffer() const; + // First free message in the buffer. + Message* free_message_in_buffer() const; std::unique_ptr buffer_; };