mirror of
https://github.com/MaskRay/ccls.git
synced 2025-01-18 19:45:49 +00:00
Remove some unused code.
This commit is contained in:
parent
601af73ca9
commit
42f744ba29
122
src/buffer.cc
122
src/buffer.cc
@ -1,122 +0,0 @@
|
||||
#include "buffer.h"
|
||||
|
||||
#include "platform.h"
|
||||
#include "utils.h"
|
||||
|
||||
#include <doctest/doctest.h>
|
||||
|
||||
#include <cstdlib>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
namespace {
|
||||
|
||||
struct ScopedLockLocal : public ScopedLock {
|
||||
ScopedLockLocal(std::mutex& mutex) : guard(mutex) {}
|
||||
std::lock_guard<std::mutex> guard;
|
||||
};
|
||||
|
||||
struct BufferLocal : public Buffer {
|
||||
explicit BufferLocal(size_t capacity) {
|
||||
this->data = malloc(capacity);
|
||||
this->capacity = capacity;
|
||||
}
|
||||
~BufferLocal() override {
|
||||
free(data);
|
||||
data = nullptr;
|
||||
capacity = 0;
|
||||
}
|
||||
|
||||
std::unique_ptr<ScopedLock> WaitForExclusiveAccess() override {
|
||||
return MakeUnique<ScopedLockLocal>(mutex_);
|
||||
}
|
||||
|
||||
std::mutex mutex_;
|
||||
};
|
||||
|
||||
struct ScopedLockPlatform : public ScopedLock {
|
||||
ScopedLockPlatform(PlatformMutex* mutex)
|
||||
: guard(CreatePlatformScopedMutexLock(mutex)) {}
|
||||
|
||||
std::unique_ptr<PlatformScopedMutexLock> guard;
|
||||
};
|
||||
|
||||
struct BufferPlatform : public Buffer {
|
||||
explicit BufferPlatform(const std::string& name, size_t capacity)
|
||||
: memory_(CreatePlatformSharedMemory(name + "_mem", capacity)),
|
||||
mutex_(CreatePlatformMutex(name + "_mtx")) {
|
||||
this->data = memory_->data;
|
||||
this->capacity = memory_->capacity;
|
||||
}
|
||||
|
||||
~BufferPlatform() override {
|
||||
data = nullptr;
|
||||
capacity = 0;
|
||||
}
|
||||
|
||||
std::unique_ptr<ScopedLock> WaitForExclusiveAccess() override {
|
||||
return MakeUnique<ScopedLockPlatform>(mutex_.get());
|
||||
}
|
||||
|
||||
std::unique_ptr<PlatformSharedMemory> memory_;
|
||||
std::unique_ptr<PlatformMutex> mutex_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
std::unique_ptr<Buffer> Buffer::Create(size_t capacity) {
|
||||
return MakeUnique<BufferLocal>(capacity);
|
||||
}
|
||||
|
||||
std::unique_ptr<Buffer> Buffer::CreateSharedBuffer(const std::string& name,
|
||||
size_t capacity) {
|
||||
return MakeUnique<BufferPlatform>(name, capacity);
|
||||
}
|
||||
|
||||
TEST_SUITE("BufferLocal");
|
||||
|
||||
TEST_CASE("create") {
|
||||
std::unique_ptr<Buffer> b = Buffer::Create(24);
|
||||
REQUIRE(b->data);
|
||||
REQUIRE(b->capacity == 24);
|
||||
|
||||
b = Buffer::CreateSharedBuffer("indexertest", 24);
|
||||
REQUIRE(b->data);
|
||||
REQUIRE(b->capacity == 24);
|
||||
}
|
||||
|
||||
TEST_CASE("lock") {
|
||||
auto buffers = {Buffer::Create(sizeof(int)),
|
||||
Buffer::CreateSharedBuffer("indexertest", sizeof(int))};
|
||||
|
||||
for (auto& b : buffers) {
|
||||
int* data = reinterpret_cast<int*>(b->data);
|
||||
*data = 0;
|
||||
|
||||
std::unique_ptr<std::thread> thread;
|
||||
{
|
||||
auto lock = b->WaitForExclusiveAccess();
|
||||
*data = 1;
|
||||
|
||||
// Start a second thread, wait until it has attempted to acquire a lock.
|
||||
volatile bool did_read = false;
|
||||
thread = MakeUnique<std::thread>([&did_read, &b, &data]() {
|
||||
did_read = true;
|
||||
auto l = b->WaitForExclusiveAccess();
|
||||
*data = 2;
|
||||
});
|
||||
while (!did_read)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
|
||||
// Verify lock acquisition is waiting.
|
||||
REQUIRE(*data == 1);
|
||||
}
|
||||
|
||||
// Wait for thread to acquire lock, verify it writes to data.
|
||||
thread->join();
|
||||
REQUIRE(*data == 2);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_SUITE_END();
|
29
src/buffer.h
29
src/buffer.h
@ -1,29 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
struct ScopedLock {
|
||||
virtual ~ScopedLock() = default;
|
||||
};
|
||||
|
||||
// Points to a generic block of memory. Note that |data| is relocatable, ie,
|
||||
// multiple Buffer instantiations may point to the same underlying block of
|
||||
// memory but the data pointer has different values.
|
||||
struct Buffer {
|
||||
// Create a new buffer of the given capacity using process-local memory.
|
||||
static std::unique_ptr<Buffer> Create(size_t capacity);
|
||||
// Create a buffer pointing to memory shared across processes with the given
|
||||
// capacity.
|
||||
static std::unique_ptr<Buffer> CreateSharedBuffer(const std::string& name,
|
||||
size_t capacity);
|
||||
|
||||
virtual ~Buffer() = default;
|
||||
|
||||
// Acquire a lock on the buffer, ie, become the only code that can read or
|
||||
// write to it. The lock lasts so long as the returned object is alive.
|
||||
virtual std::unique_ptr<ScopedLock> WaitForExclusiveAccess() = 0;
|
||||
|
||||
void* data = nullptr;
|
||||
size_t capacity = 0;
|
||||
};
|
@ -1,5 +1,4 @@
|
||||
// TODO: cleanup includes
|
||||
#include "buffer.h"
|
||||
#include "cache.h"
|
||||
#include "clang_complete.h"
|
||||
#include "file_consumer.h"
|
||||
@ -9,7 +8,6 @@
|
||||
#include "language_server_api.h"
|
||||
#include "lex_utils.h"
|
||||
#include "match.h"
|
||||
#include "message_queue.h"
|
||||
#include "options.h"
|
||||
#include "platform.h"
|
||||
#include "project.h"
|
||||
|
@ -1,416 +0,0 @@
|
||||
#include "message_queue.h"
|
||||
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include <doctest/doctest.h>
|
||||
|
||||
#include "platform.h"
|
||||
#include "resizable_buffer.h"
|
||||
#include "utils.h"
|
||||
|
||||
// TODO: figure out a logging solution
|
||||
//#define MESSAGE_QUEUE_LOG
|
||||
|
||||
namespace {
|
||||
|
||||
const int kMinimumPartialPayloadSize = 128;
|
||||
|
||||
struct MessageHeader {
|
||||
MessageHeader(uint32_t partial_id, bool has_more_chunks, size_t size)
|
||||
: partial_id(partial_id), has_more_chunks(has_more_chunks), size(size) {}
|
||||
|
||||
uint32_t partial_id;
|
||||
bool has_more_chunks;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
struct BufferMessageIterator {
|
||||
static BufferMessageIterator Begin(void* buffer, size_t bytes_used) {
|
||||
if (bytes_used == 0)
|
||||
return End();
|
||||
|
||||
return BufferMessageIterator(buffer, bytes_used);
|
||||
}
|
||||
static BufferMessageIterator End() {
|
||||
return BufferMessageIterator(nullptr, 0);
|
||||
}
|
||||
|
||||
// Start of buffer to iterate.
|
||||
uint8_t* buffer;
|
||||
// Number of bytes left in buffer to parse.
|
||||
size_t remaining_bytes;
|
||||
|
||||
BufferMessageIterator(void* buffer, size_t remaining_bytes)
|
||||
: buffer(reinterpret_cast<uint8_t*>(buffer)),
|
||||
remaining_bytes(remaining_bytes) {}
|
||||
|
||||
MessageHeader* get() const {
|
||||
return reinterpret_cast<MessageHeader*>(buffer);
|
||||
}
|
||||
MessageHeader* operator*() const { return get(); }
|
||||
MessageHeader* operator->() const { return get(); }
|
||||
|
||||
void operator++() {
|
||||
size_t next_message_offset = sizeof(MessageHeader) + get()->size;
|
||||
if (next_message_offset >= remaining_bytes) {
|
||||
assert(next_message_offset == remaining_bytes);
|
||||
buffer = nullptr;
|
||||
remaining_bytes = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
buffer = buffer + next_message_offset;
|
||||
remaining_bytes -= next_message_offset;
|
||||
}
|
||||
|
||||
void* message_data() const {
|
||||
return reinterpret_cast<void*>(buffer + sizeof(MessageHeader));
|
||||
}
|
||||
|
||||
bool operator==(const BufferMessageIterator& other) const {
|
||||
return buffer == other.buffer && remaining_bytes == other.remaining_bytes;
|
||||
}
|
||||
bool operator!=(const BufferMessageIterator& other) const {
|
||||
return !(*this == other);
|
||||
}
|
||||
};
|
||||
|
||||
enum class RepeatResult { RunAgain, Break };
|
||||
|
||||
// Run |action| an arbitrary number of times.
|
||||
void Repeat(std::function<RepeatResult()> action) {
|
||||
bool first = true;
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
int log_iteration_count = 0;
|
||||
int log_count = 0;
|
||||
#endif
|
||||
while (true) {
|
||||
if (!first) {
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
if (log_iteration_count > 1000) {
|
||||
log_iteration_count = 0;
|
||||
std::cerr << "[info]: Buffer full, waiting (" << log_count++ << ")"
|
||||
<< std::endl;
|
||||
}
|
||||
++log_iteration_count;
|
||||
#endif
|
||||
// TODO: See if we can figure out a way to use condition variables
|
||||
// cross-process.
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(0));
|
||||
}
|
||||
first = false;
|
||||
|
||||
if (action() == RepeatResult::RunAgain)
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ResizableBuffer* CreateOrFindResizableBuffer(
|
||||
std::unordered_map<uint32_t, std::unique_ptr<ResizableBuffer>>&
|
||||
resizable_buffers,
|
||||
uint32_t id) {
|
||||
auto it = resizable_buffers.find(id);
|
||||
if (it != resizable_buffers.end())
|
||||
return it->second.get();
|
||||
return (resizable_buffers[id] = MakeUnique<ResizableBuffer>()).get();
|
||||
}
|
||||
|
||||
std::unique_ptr<Buffer> MakeBuffer(void* content, size_t size) {
|
||||
auto buffer = Buffer::Create(size);
|
||||
memcpy(buffer->data, content, size);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Message::Message(void* data, size_t size) : data(data), size(size) {}
|
||||
|
||||
struct MessageQueue::BufferMetadata {
|
||||
// Reset buffer.
|
||||
void reset() { total_message_bytes_ = 0; }
|
||||
|
||||
// Total number of used bytes excluding the sizeof this metadata object.
|
||||
void add_used_bytes(size_t used_bytes) { total_message_bytes_ += used_bytes; }
|
||||
|
||||
// The total number of bytes in use.
|
||||
size_t total_bytes_used_including_metadata() {
|
||||
return total_message_bytes_ + sizeof(BufferMetadata);
|
||||
}
|
||||
|
||||
// The total number of bytes currently used for messages. This does not
|
||||
// include the sizeof the buffer metadata.
|
||||
size_t total_message_bytes() { return total_message_bytes_; }
|
||||
|
||||
int next_partial_message_id = 0;
|
||||
|
||||
private:
|
||||
size_t total_message_bytes_ = 0;
|
||||
};
|
||||
|
||||
MessageQueue::MessageQueue(std::unique_ptr<Buffer> buffer, bool buffer_has_data)
|
||||
: buffer_(std::move(buffer)) {
|
||||
assert(buffer_->capacity >=
|
||||
(sizeof(BufferMetadata) + kMinimumPartialPayloadSize));
|
||||
|
||||
if (!buffer_has_data)
|
||||
new (buffer_->data) BufferMetadata();
|
||||
|
||||
local_buffer_ = Buffer::Create(buffer_->capacity - sizeof(BufferMetadata));
|
||||
memset(local_buffer_->data, 0, local_buffer_->capacity);
|
||||
}
|
||||
|
||||
void MessageQueue::Enqueue(const Message& message) {
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
int count = 0;
|
||||
#endif
|
||||
uint32_t partial_id = 0;
|
||||
uint8_t* payload_data = reinterpret_cast<uint8_t*>(message.data);
|
||||
size_t payload_size = message.size;
|
||||
|
||||
Repeat([&]() {
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
if (count++ > 500) {
|
||||
std::cerr << "x500 Sending partial message payload_size=" << payload_size
|
||||
<< std::endl;
|
||||
count = 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
auto lock = buffer_->WaitForExclusiveAccess();
|
||||
|
||||
// We cannot find the entire payload in the buffer. We have to send chunks
|
||||
// of it over time.
|
||||
if (payload_size >= BytesAvailableInBuffer()) {
|
||||
// There's not enough room for our minimum payload size, so try again
|
||||
// later.
|
||||
if ((sizeof(MessageHeader) + kMinimumPartialPayloadSize) >
|
||||
BytesAvailableInBuffer())
|
||||
return RepeatResult::RunAgain;
|
||||
|
||||
if (partial_id == 0) {
|
||||
// note: pre-increment so we use 1 as the initial value
|
||||
partial_id = ++metadata()->next_partial_message_id;
|
||||
}
|
||||
|
||||
size_t sent_payload_size =
|
||||
BytesAvailableInBuffer() - sizeof(MessageHeader);
|
||||
// |sent_payload_size| must always be smaller than |payload_size|. If it
|
||||
// is equal to |payload_size|, than we could have sent it as a normal,
|
||||
// non-partial message. It's also an error if it is larger than
|
||||
// payload_size (we're sending garbage data).
|
||||
assert(sent_payload_size < payload_size);
|
||||
|
||||
CopyPayloadToBuffer(partial_id, payload_data, sent_payload_size,
|
||||
true /*has_more_chunks*/);
|
||||
payload_data += sent_payload_size;
|
||||
payload_size -= sent_payload_size;
|
||||
|
||||
// Prepare for next time.
|
||||
return RepeatResult::RunAgain;
|
||||
}
|
||||
|
||||
// The entire payload fits. Send it all now.
|
||||
else {
|
||||
// Include partial message id, as there could have been previous parts of
|
||||
// this payload.
|
||||
CopyPayloadToBuffer(partial_id, payload_data, payload_size,
|
||||
false /*has_more_chunks*/);
|
||||
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
std::cerr << "Sending full message with payload_size=" << payload_size
|
||||
<< std::endl;
|
||||
#endif
|
||||
return RepeatResult::Break;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<std::unique_ptr<Buffer>> MessageQueue::DequeueAll() {
|
||||
std::unordered_map<uint32_t, std::unique_ptr<ResizableBuffer>>
|
||||
resizable_buffers;
|
||||
|
||||
std::vector<std::unique_ptr<Buffer>> result;
|
||||
|
||||
while (true) {
|
||||
size_t local_buffer_size = 0;
|
||||
|
||||
// Move data from shared memory into a local buffer. Do this
|
||||
// before parsing the blocks so that other processes can begin
|
||||
// posting data as soon as possible.
|
||||
{
|
||||
std::unique_ptr<ScopedLock> lock = buffer_->WaitForExclusiveAccess();
|
||||
assert(BytesAvailableInBuffer() >= 0);
|
||||
|
||||
// note: Do not copy over buffer_ metadata.
|
||||
local_buffer_size = metadata()->total_message_bytes();
|
||||
memcpy(local_buffer_->data, first_message_in_buffer(), local_buffer_size);
|
||||
|
||||
metadata()->reset();
|
||||
}
|
||||
|
||||
// Parse blocks from shared memory.
|
||||
for (auto it = BufferMessageIterator::Begin(local_buffer_->data,
|
||||
local_buffer_size);
|
||||
it != BufferMessageIterator::End(); ++it) {
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
std::cerr << "Got message with partial_id=" << it->partial_id
|
||||
<< ", payload_size=" << it->size
|
||||
<< ", has_more_chunks=" << it->has_more_chunks << std::endl;
|
||||
#endif
|
||||
|
||||
if (it->partial_id != 0) {
|
||||
auto* buf =
|
||||
CreateOrFindResizableBuffer(resizable_buffers, it->partial_id);
|
||||
buf->Append(it.message_data(), it->size);
|
||||
|
||||
if (!it->has_more_chunks) {
|
||||
result.push_back(MakeBuffer(buf->buffer, buf->size));
|
||||
resizable_buffers.erase(it->partial_id);
|
||||
}
|
||||
} else {
|
||||
// Note: we can't just return pointers to |local_buffer_| because if we
|
||||
// read a partial message we will invalidate all of the existing
|
||||
// pointers. We could jump through hoops to make it work (ie, if no
|
||||
// partial messages return pointers to local_buffer_) but it is not
|
||||
// worth the effort.
|
||||
assert(!it->has_more_chunks);
|
||||
result.push_back(MakeBuffer(it.message_data(), it->size));
|
||||
}
|
||||
}
|
||||
|
||||
// We're waiting for data to be posted to result. Delay a little so we
|
||||
// don't push the CPU so hard.
|
||||
if (!resizable_buffers.empty())
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(0));
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void MessageQueue::CopyPayloadToBuffer(uint32_t partial_id,
|
||||
void* payload,
|
||||
size_t payload_size,
|
||||
bool has_more_chunks) {
|
||||
assert(BytesAvailableInBuffer() >= (sizeof(MessageHeader) + payload_size));
|
||||
|
||||
// Copy header.
|
||||
MessageHeader header(partial_id, has_more_chunks, payload_size);
|
||||
memcpy(first_free_address_in_buffer(), &header, sizeof(MessageHeader));
|
||||
metadata()->add_used_bytes(sizeof(MessageHeader));
|
||||
// Copy payload.
|
||||
memcpy(first_free_address_in_buffer(), payload, payload_size);
|
||||
metadata()->add_used_bytes(payload_size);
|
||||
}
|
||||
|
||||
MessageQueue::BufferMetadata* MessageQueue::metadata() const {
|
||||
return reinterpret_cast<BufferMetadata*>(buffer_->data);
|
||||
}
|
||||
|
||||
size_t MessageQueue::BytesAvailableInBuffer() const {
|
||||
return buffer_->capacity - metadata()->total_bytes_used_including_metadata();
|
||||
}
|
||||
|
||||
Message* MessageQueue::first_message_in_buffer() const {
|
||||
return reinterpret_cast<Message*>(reinterpret_cast<uint8_t*>(buffer_->data) +
|
||||
sizeof(BufferMetadata));
|
||||
}
|
||||
|
||||
void* MessageQueue::first_free_address_in_buffer() const {
|
||||
if (metadata()->total_bytes_used_including_metadata() >= buffer_->capacity)
|
||||
return nullptr;
|
||||
return reinterpret_cast<void*>(
|
||||
reinterpret_cast<uint8_t*>(buffer_->data) +
|
||||
metadata()->total_bytes_used_including_metadata());
|
||||
}
|
||||
|
||||
TEST_SUITE("MessageQueue");
|
||||
|
||||
TEST_CASE("simple") {
|
||||
MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5),
|
||||
false /*buffer_has_data*/);
|
||||
|
||||
int data = 0;
|
||||
data = 1;
|
||||
queue.Enqueue(Message(&data, sizeof(data)));
|
||||
data = 2;
|
||||
queue.Enqueue(Message(&data, sizeof(data)));
|
||||
|
||||
int expected = 0;
|
||||
for (std::unique_ptr<Buffer>& m : queue.DequeueAll()) {
|
||||
++expected;
|
||||
|
||||
REQUIRE(m->capacity == sizeof(data));
|
||||
int* value = reinterpret_cast<int*>(m->data);
|
||||
REQUIRE(expected == *value);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("large payload") {
|
||||
MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5),
|
||||
false /*buffer_has_data*/);
|
||||
|
||||
// Allocate big buffer.
|
||||
size_t num_ints = kMinimumPartialPayloadSize * 100;
|
||||
int* sent_ints = reinterpret_cast<int*>(malloc(sizeof(int) * num_ints));
|
||||
for (int i = 0; i < num_ints; ++i)
|
||||
sent_ints[i] = i;
|
||||
|
||||
// Queue big buffer. Add surrounding messages to make sure they get sent
|
||||
// correctly.
|
||||
// Run in a separate thread because Enqueue will block.
|
||||
volatile bool done_sending = false;
|
||||
std::thread sender([&]() {
|
||||
int small = 5;
|
||||
queue.Enqueue(Message(&small, sizeof(small)));
|
||||
queue.Enqueue(Message(sent_ints, sizeof(int) * num_ints));
|
||||
queue.Enqueue(Message(&small, sizeof(small)));
|
||||
done_sending = true;
|
||||
});
|
||||
|
||||
// Receive sent messages.
|
||||
{
|
||||
// Keep dequeuing messages until we have three.
|
||||
std::vector<std::unique_ptr<Buffer>> messages;
|
||||
while (messages.size() != 3) {
|
||||
for (auto& message : queue.DequeueAll())
|
||||
messages.emplace_back(std::move(message));
|
||||
}
|
||||
sender.join();
|
||||
|
||||
// Small
|
||||
{
|
||||
REQUIRE(sizeof(int) == messages[0]->capacity);
|
||||
int* value = reinterpret_cast<int*>(messages[0]->data);
|
||||
REQUIRE(*value == 5);
|
||||
}
|
||||
|
||||
// Big
|
||||
{
|
||||
int* received_ints = reinterpret_cast<int*>(messages[1]->data);
|
||||
REQUIRE(received_ints != sent_ints);
|
||||
REQUIRE(messages[1]->capacity == (sizeof(int) * num_ints));
|
||||
for (int i = 0; i < num_ints; ++i) {
|
||||
REQUIRE(received_ints[i] == i);
|
||||
REQUIRE(received_ints[i] == sent_ints[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Small
|
||||
{
|
||||
REQUIRE(sizeof(int) == messages[2]->capacity);
|
||||
int* value = reinterpret_cast<int*>(messages[2]->data);
|
||||
REQUIRE(*value == 5);
|
||||
}
|
||||
}
|
||||
|
||||
free(sent_ints);
|
||||
}
|
||||
|
||||
TEST_SUITE_END();
|
@ -1,80 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "buffer.h"
|
||||
|
||||
struct ResizableBuffer;
|
||||
|
||||
struct Message {
|
||||
Message(void* data, size_t size);
|
||||
|
||||
void* data;
|
||||
size_t size;
|
||||
};
|
||||
|
||||
// A MessageQueue is a FIFO container storing messages in an arbitrary memory
|
||||
// buffer that is cross-thread and cross-process safe. This means:
|
||||
// - Multiple separate MessageQueues instantiations can point to the
|
||||
// same underlying buffer and use it at the same time.
|
||||
// - The buffer is fully relocatable, ie, it can have multiple different
|
||||
// addresses (as is the case for memory shared across processes).
|
||||
//
|
||||
// There can be multiple writers, but there can only be one reader.
|
||||
struct MessageQueue {
|
||||
// Create a new MessageQueue using |buffer| as the backing data storage.
|
||||
// This does *not* take ownership over the memory stored in |buffer|.
|
||||
//
|
||||
// If |buffer_has_data| is true, then it is assumed that |buffer| contains
|
||||
// data and has already been initialized. It is a perfectly acceptable
|
||||
// use-case to have multiple completely separate MessageQueue
|
||||
// instantiations pointing to the same memory.
|
||||
explicit MessageQueue(std::unique_ptr<Buffer> buffer, bool buffer_has_data);
|
||||
MessageQueue(const MessageQueue&) = delete;
|
||||
|
||||
// Enqueue a message to the queue. This will wait until there is room in
|
||||
// queue. If the message is too large to fit into the queue, this will
|
||||
// wait until the message has been fully sent, which may involve multiple
|
||||
// IPC roundtrips (ie, Enqueue -> DequeueAll -> Enqueue) - so this method
|
||||
// may take a long time to run.
|
||||
//
|
||||
// TODO: Consider copying message memory to a temporary buffer and running
|
||||
// enqueues on a worker thread.
|
||||
void Enqueue(const Message& message);
|
||||
|
||||
// Take all messages from the queue.
|
||||
std::vector<std::unique_ptr<Buffer>> DequeueAll();
|
||||
|
||||
private:
|
||||
struct BufferMetadata;
|
||||
|
||||
void CopyPayloadToBuffer(uint32_t partial_id,
|
||||
void* payload,
|
||||
size_t payload_size,
|
||||
bool has_more_chunks);
|
||||
|
||||
BufferMetadata* metadata() const;
|
||||
// Returns the number of bytes currently available in the buffer.
|
||||
size_t BytesAvailableInBuffer() const;
|
||||
Message* first_message_in_buffer() const;
|
||||
// First free message in the buffer.
|
||||
void* first_free_address_in_buffer() const;
|
||||
|
||||
std::unique_ptr<Buffer> buffer_;
|
||||
std::unique_ptr<Buffer> local_buffer_;
|
||||
};
|
||||
|
||||
/*
|
||||
// TODO: We convert IpcMessage <-> Message as a user-level operation.
|
||||
// MessageQueue doesn't know about IpcMessage.
|
||||
struct IpcMessage {
|
||||
std::unique_ptr<Message> ToMessage();
|
||||
void BuildFromMessage(std::unique_ptr<Message> message);
|
||||
|
||||
// Serialize/deserialize the message.
|
||||
virtual void Serialize(Writer& writer) = 0;
|
||||
virtual void Deserialize(Reader& reader) = 0;
|
||||
};
|
||||
*/
|
@ -1,100 +0,0 @@
|
||||
#include "resizable_buffer.h"
|
||||
|
||||
#include <doctest/doctest.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
|
||||
namespace {
|
||||
const size_t kInitialCapacity = 128;
|
||||
}
|
||||
|
||||
ResizableBuffer::ResizableBuffer() {
|
||||
buffer = malloc(kInitialCapacity);
|
||||
size = 0;
|
||||
capacity_ = kInitialCapacity;
|
||||
}
|
||||
|
||||
ResizableBuffer::~ResizableBuffer() {
|
||||
free(buffer);
|
||||
size = 0;
|
||||
capacity_ = 0;
|
||||
}
|
||||
|
||||
void ResizableBuffer::Append(void* content, size_t content_size) {
|
||||
assert(capacity_ >= 0);
|
||||
|
||||
size_t new_size = size + content_size;
|
||||
|
||||
// Grow buffer capacity if needed.
|
||||
if (new_size >= capacity_) {
|
||||
size_t new_capacity = capacity_ * 2;
|
||||
while (new_size >= new_capacity)
|
||||
new_capacity *= 2;
|
||||
void* new_memory = malloc(new_capacity);
|
||||
assert(size < capacity_);
|
||||
memcpy(new_memory, buffer, size);
|
||||
free(buffer);
|
||||
buffer = new_memory;
|
||||
capacity_ = new_capacity;
|
||||
}
|
||||
|
||||
// Append new content into memory.
|
||||
memcpy(reinterpret_cast<uint8_t*>(buffer) + size, content, content_size);
|
||||
size = new_size;
|
||||
}
|
||||
|
||||
void ResizableBuffer::Reset() {
|
||||
size = 0;
|
||||
}
|
||||
|
||||
TEST_SUITE("ResizableBuffer");
|
||||
|
||||
TEST_CASE("buffer starts with zero size") {
|
||||
ResizableBuffer b;
|
||||
REQUIRE(b.buffer);
|
||||
REQUIRE(b.size == 0);
|
||||
}
|
||||
|
||||
TEST_CASE("append and reset") {
|
||||
int content = 1;
|
||||
ResizableBuffer b;
|
||||
|
||||
b.Append(&content, sizeof(content));
|
||||
REQUIRE(b.size == sizeof(content));
|
||||
|
||||
b.Append(&content, sizeof(content));
|
||||
REQUIRE(b.size == (2 * sizeof(content)));
|
||||
|
||||
b.Reset();
|
||||
REQUIRE(b.size == 0);
|
||||
}
|
||||
|
||||
TEST_CASE("appended content is copied into buffer w/ resize") {
|
||||
int content = 0;
|
||||
ResizableBuffer b;
|
||||
|
||||
// go past kInitialCapacity to verify resize works too
|
||||
while (b.size < kInitialCapacity * 2) {
|
||||
b.Append(&content, sizeof(content));
|
||||
content += 1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < content; ++i)
|
||||
REQUIRE(i == *(reinterpret_cast<int*>(b.buffer) + i));
|
||||
}
|
||||
|
||||
TEST_CASE("reset does not reallocate") {
|
||||
ResizableBuffer b;
|
||||
|
||||
while (b.size < kInitialCapacity)
|
||||
b.Append(&b, sizeof(b));
|
||||
|
||||
void* buffer = b.buffer;
|
||||
b.Reset();
|
||||
REQUIRE(b.buffer == buffer);
|
||||
}
|
||||
|
||||
TEST_SUITE_END();
|
@ -1,23 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
// Points to a generic block of memory that can be resized. This class owns
|
||||
// and has the only pointer to the underlying memory buffer.
|
||||
struct ResizableBuffer {
|
||||
ResizableBuffer();
|
||||
ResizableBuffer(const ResizableBuffer&) = delete;
|
||||
~ResizableBuffer();
|
||||
|
||||
void Append(void* content, size_t content_size);
|
||||
void Reset();
|
||||
|
||||
// Buffer content.
|
||||
void* buffer;
|
||||
// Number of bytes in |buffer|. Note that the actual buffer may be larger
|
||||
// than |size|.
|
||||
size_t size;
|
||||
|
||||
private:
|
||||
size_t capacity_;
|
||||
};
|
@ -1,104 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "buffer.h"
|
||||
#include "message_queue.h"
|
||||
#include "serializer.h"
|
||||
|
||||
// TypedBidiMessageQueue provides a type-safe server/client implementation on
|
||||
// top of a couple MessageQueue instances.
|
||||
template <typename TId, typename TMessage>
|
||||
struct TypedBidiMessageQueue {
|
||||
using Serializer = std::function<void(Writer& visitor, TMessage& message)>;
|
||||
using Deserializer =
|
||||
std::function<std::unique_ptr<TMessage>(Reader& visitor)>;
|
||||
|
||||
TypedBidiMessageQueue(const std::string& name, size_t buffer_size)
|
||||
: for_server(Buffer::CreateSharedBuffer(name + "_fs", buffer_size),
|
||||
false /*buffer_has_data*/),
|
||||
for_client(Buffer::CreateSharedBuffer(name + "_fc", buffer_size),
|
||||
true /*buffer_has_data*/) {}
|
||||
|
||||
void RegisterId(TId id,
|
||||
const Serializer& serializer,
|
||||
const Deserializer& deserializer) {
|
||||
assert(serializers_.find(id) == serializers_.end() &&
|
||||
deserializers_.find(id) == deserializers_.end() &&
|
||||
"Duplicate registration");
|
||||
|
||||
serializers_[id] = serializer;
|
||||
deserializers_[id] = deserializer;
|
||||
}
|
||||
|
||||
void SendMessage(MessageQueue* destination, TId id, TMessage& message) {
|
||||
// Create writer.
|
||||
rapidjson::StringBuffer output;
|
||||
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
|
||||
writer.SetIndent(' ', 0);
|
||||
|
||||
// Serialize the message.
|
||||
assert(serializers_.find(id) != serializers_.end() &&
|
||||
"No registered serializer");
|
||||
const Serializer& serializer = serializers_.find(id)->second;
|
||||
serializer(writer, message);
|
||||
|
||||
// Send message.
|
||||
void* payload = malloc(sizeof(MessageHeader) + output.GetSize());
|
||||
reinterpret_cast<MessageHeader*>(payload)->id = id;
|
||||
memcpy(
|
||||
(void*)(reinterpret_cast<const char*>(payload) + sizeof(MessageHeader)),
|
||||
output.GetString(), output.GetSize());
|
||||
destination->Enqueue(
|
||||
Message(payload, sizeof(MessageHeader) + output.GetSize()));
|
||||
free(payload);
|
||||
}
|
||||
|
||||
// Retrieve all messages from the given |queue|.
|
||||
std::vector<std::unique_ptr<TMessage>> GetMessages(
|
||||
MessageQueue* queue) const {
|
||||
assert(queue == &for_server || queue == &for_client);
|
||||
|
||||
std::vector<std::unique_ptr<Buffer>> messages = queue->DequeueAll();
|
||||
std::vector<std::unique_ptr<TMessage>> result;
|
||||
result.reserve(messages.size());
|
||||
|
||||
for (std::unique_ptr<Buffer>& buffer : messages) {
|
||||
MessageHeader* header = reinterpret_cast<MessageHeader*>(buffer->data);
|
||||
|
||||
// Parse message content.
|
||||
rapidjson::Document document;
|
||||
document.Parse(
|
||||
reinterpret_cast<const char*>(buffer->data) + sizeof(MessageHeader),
|
||||
buffer->capacity - sizeof(MessageHeader));
|
||||
if (document.HasParseError()) {
|
||||
std::cerr << "[FATAL]: Unable to parse IPC message" << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Deserialize it.
|
||||
assert(deserializers_.find(header->id) != deserializers_.end() &&
|
||||
"No registered deserializer");
|
||||
const Deserializer& deserializer =
|
||||
deserializers_.find(header->id)->second;
|
||||
result.emplace_back(deserializer(document));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Messages which the server process should handle.
|
||||
MessageQueue for_server;
|
||||
// Messages which the client process should handle.
|
||||
MessageQueue for_client;
|
||||
|
||||
private:
|
||||
struct MessageHeader {
|
||||
TId id;
|
||||
};
|
||||
|
||||
std::unordered_map<TId, Serializer> serializers_;
|
||||
std::unordered_map<TId, Deserializer> deserializers_;
|
||||
};
|
Loading…
Reference in New Issue
Block a user