mirror of
https://github.com/MaskRay/ccls.git
synced 2025-02-16 13:48:04 +00:00
format, doc cleanup
This commit is contained in:
parent
e7390c14f1
commit
95488507a0
@ -19,7 +19,7 @@ const int kMinimumPartialPayloadSize = 128;
|
||||
|
||||
struct MessageHeader {
|
||||
MessageHeader(uint32_t partial_id, bool has_more_chunks, size_t size)
|
||||
: partial_id(partial_id), has_more_chunks(has_more_chunks), size(size) {}
|
||||
: partial_id(partial_id), has_more_chunks(has_more_chunks), size(size) {}
|
||||
|
||||
uint32_t partial_id;
|
||||
bool has_more_chunks;
|
||||
@ -37,15 +37,14 @@ struct BufferMessageIterator {
|
||||
return BufferMessageIterator(nullptr, 0);
|
||||
}
|
||||
|
||||
|
||||
// Start of buffer to iterate.
|
||||
uint8_t* buffer;
|
||||
// Number of bytes left in buffer to parse.
|
||||
size_t remaining_bytes;
|
||||
|
||||
BufferMessageIterator(void* buffer, size_t remaining_bytes)
|
||||
: buffer(reinterpret_cast<uint8_t*>(buffer)), remaining_bytes(remaining_bytes) {}
|
||||
|
||||
: buffer(reinterpret_cast<uint8_t*>(buffer)),
|
||||
remaining_bytes(remaining_bytes) {}
|
||||
|
||||
MessageHeader* get() const {
|
||||
return reinterpret_cast<MessageHeader*>(buffer);
|
||||
@ -78,10 +77,7 @@ struct BufferMessageIterator {
|
||||
}
|
||||
};
|
||||
|
||||
enum class RepeatResult {
|
||||
RunAgain,
|
||||
Break
|
||||
};
|
||||
enum class RepeatResult { RunAgain, Break };
|
||||
|
||||
// Run |action| an arbitrary number of times.
|
||||
void Repeat(std::function<RepeatResult()> action) {
|
||||
@ -93,11 +89,13 @@ void Repeat(std::function<RepeatResult()> action) {
|
||||
if (log_iteration_count > 1000) {
|
||||
log_iteration_count = 0;
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
std::cerr << "[info]: Buffer full, waiting (" << log_count++ << ")" << std::endl;
|
||||
std::cerr << "[info]: Buffer full, waiting (" << log_count++ << ")"
|
||||
<< std::endl;
|
||||
#endif
|
||||
}
|
||||
++log_iteration_count;
|
||||
// TODO: See if we can figure out a way to use condition variables cross-process.
|
||||
// TODO: See if we can figure out a way to use condition variables
|
||||
// cross-process.
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(0));
|
||||
}
|
||||
first = false;
|
||||
@ -108,8 +106,10 @@ void Repeat(std::function<RepeatResult()> action) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ResizableBuffer* CreateOrFindResizableBuffer(std::unordered_map<int, std::unique_ptr<ResizableBuffer>>& resizable_buffers, uint32_t id) {
|
||||
ResizableBuffer* CreateOrFindResizableBuffer(
|
||||
std::unordered_map<int, std::unique_ptr<ResizableBuffer>>&
|
||||
resizable_buffers,
|
||||
uint32_t id) {
|
||||
auto it = resizable_buffers.find(id);
|
||||
if (it != resizable_buffers.end())
|
||||
return it->second.get();
|
||||
@ -127,14 +127,11 @@ std::unique_ptr<Buffer> MakeBuffer(void* content, size_t size) {
|
||||
Message::Message(void* data, size_t size) : data(data), size(size) {}
|
||||
|
||||
struct MessageQueue::BufferMetadata {
|
||||
// Total number of used bytes exluding the sizeof this metadata object.
|
||||
void add_used_bytes(size_t used_bytes) {
|
||||
total_message_bytes_ += used_bytes;
|
||||
}
|
||||
// Reset buffer.
|
||||
void reset() { total_message_bytes_ = 0; }
|
||||
|
||||
void reset() {
|
||||
total_message_bytes_ = 0;
|
||||
}
|
||||
// Total number of used bytes exluding the sizeof this metadata object.
|
||||
void add_used_bytes(size_t used_bytes) { total_message_bytes_ += used_bytes; }
|
||||
|
||||
// The total number of bytes in use.
|
||||
size_t total_bytes_used_including_metadata() {
|
||||
@ -147,14 +144,14 @@ struct MessageQueue::BufferMetadata {
|
||||
|
||||
int next_partial_message_id = 0;
|
||||
|
||||
private:
|
||||
private:
|
||||
size_t total_message_bytes_ = 0;
|
||||
};
|
||||
|
||||
MessageQueue::MessageQueue(std::unique_ptr<Buffer> buffer, bool buffer_has_data)
|
||||
: buffer_(std::move(buffer)) {
|
||||
|
||||
assert(buffer_->capacity >= (sizeof(BufferMetadata) + kMinimumPartialPayloadSize));
|
||||
: buffer_(std::move(buffer)) {
|
||||
assert(buffer_->capacity >=
|
||||
(sizeof(BufferMetadata) + kMinimumPartialPayloadSize));
|
||||
|
||||
if (!buffer_has_data)
|
||||
new (buffer_->data) BufferMetadata();
|
||||
@ -174,7 +171,8 @@ void MessageQueue::Enqueue(const Message& message) {
|
||||
Repeat([&]() {
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
if (count++ > 500) {
|
||||
std::cerr << "x500 Sending partial message payload_size=" << payload_size << std::endl;
|
||||
std::cerr << "x500 Sending partial message payload_size=" << payload_size
|
||||
<< std::endl;
|
||||
count = 0;
|
||||
}
|
||||
#endif
|
||||
@ -184,8 +182,10 @@ void MessageQueue::Enqueue(const Message& message) {
|
||||
// We cannot find the entire payload in the buffer. We have to send chunks
|
||||
// of it over time.
|
||||
if (payload_size >= BytesAvailableInBuffer()) {
|
||||
// There's not enough room for our minimum payload size, so try again later.
|
||||
if ((sizeof(MessageHeader) + kMinimumPartialPayloadSize) > BytesAvailableInBuffer())
|
||||
// There's not enough room for our minimum payload size, so try again
|
||||
// later.
|
||||
if ((sizeof(MessageHeader) + kMinimumPartialPayloadSize) >
|
||||
BytesAvailableInBuffer())
|
||||
return RepeatResult::RunAgain;
|
||||
|
||||
if (partial_id == 0) {
|
||||
@ -193,13 +193,16 @@ void MessageQueue::Enqueue(const Message& message) {
|
||||
partial_id = ++metadata()->next_partial_message_id;
|
||||
}
|
||||
|
||||
size_t sent_payload_size = BytesAvailableInBuffer() - sizeof(MessageHeader);
|
||||
// |sent_payload_size| must always be smaller than |payload_size|. If it is equal to
|
||||
// |payload_size|, than we could have sent it as a normal, non-partial message. It's
|
||||
// also an error if it is larger than payload_size (we're sending garbage data).
|
||||
size_t sent_payload_size =
|
||||
BytesAvailableInBuffer() - sizeof(MessageHeader);
|
||||
// |sent_payload_size| must always be smaller than |payload_size|. If it
|
||||
// is equal to |payload_size|, than we could have sent it as a normal,
|
||||
// non-partial message. It's also an error if it is larger than
|
||||
// payload_size (we're sending garbage data).
|
||||
assert(sent_payload_size < payload_size);
|
||||
|
||||
CopyPayloadToBuffer(partial_id, payload_data, sent_payload_size, true /*has_more_chunks*/);
|
||||
CopyPayloadToBuffer(partial_id, payload_data, sent_payload_size,
|
||||
true /*has_more_chunks*/);
|
||||
payload_data += sent_payload_size;
|
||||
payload_size -= sent_payload_size;
|
||||
|
||||
@ -209,11 +212,14 @@ void MessageQueue::Enqueue(const Message& message) {
|
||||
|
||||
// The entire payload fits. Send it all now.
|
||||
else {
|
||||
// Include partial message id, as there could have been previous parts of this payload.
|
||||
CopyPayloadToBuffer(partial_id, payload_data, payload_size, false /*has_more_chunks*/);
|
||||
// Include partial message id, as there could have been previous parts of
|
||||
// this payload.
|
||||
CopyPayloadToBuffer(partial_id, payload_data, payload_size,
|
||||
false /*has_more_chunks*/);
|
||||
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
std::cerr << "Sending full message with payload_size=" << payload_size << std::endl;
|
||||
std::cerr << "Sending full message with payload_size=" << payload_size
|
||||
<< std::endl;
|
||||
#endif
|
||||
return RepeatResult::Break;
|
||||
}
|
||||
@ -225,7 +231,7 @@ std::vector<std::unique_ptr<Buffer>> MessageQueue::DequeueAll() {
|
||||
|
||||
std::vector<std::unique_ptr<Buffer>> result;
|
||||
|
||||
do {
|
||||
while (true) {
|
||||
size_t local_buffer_size = 0;
|
||||
|
||||
// Move data from shared memory into a local buffer. Do this
|
||||
@ -237,33 +243,31 @@ std::vector<std::unique_ptr<Buffer>> MessageQueue::DequeueAll() {
|
||||
|
||||
// note: Do not copy over buffer_ metadata.
|
||||
local_buffer_size = metadata()->total_message_bytes();
|
||||
memcpy(local_buffer_->data,
|
||||
first_message_in_buffer(),
|
||||
local_buffer_size);
|
||||
memcpy(local_buffer_->data, first_message_in_buffer(), local_buffer_size);
|
||||
|
||||
metadata()->reset();
|
||||
}
|
||||
|
||||
// Parse blocks from shared memory.
|
||||
for (auto it = BufferMessageIterator::Begin(local_buffer_->data, local_buffer_size);
|
||||
it != BufferMessageIterator::End();
|
||||
++it) {
|
||||
for (auto it = BufferMessageIterator::Begin(local_buffer_->data,
|
||||
local_buffer_size);
|
||||
it != BufferMessageIterator::End(); ++it) {
|
||||
#if defined(MESSAGE_QUEUE_LOG)
|
||||
std::cerr << "Got message with partial_id=" << it->partial_id << ", payload_size=" << it->size << ", has_more_chunks=" << it->has_more_chunks << std::endl;
|
||||
std::cerr << "Got message with partial_id=" << it->partial_id
|
||||
<< ", payload_size=" << it->size
|
||||
<< ", has_more_chunks=" << it->has_more_chunks << std::endl;
|
||||
#endif
|
||||
|
||||
if (it->partial_id != 0) {
|
||||
auto* buf = CreateOrFindResizableBuffer(resizable_buffers, it->partial_id);
|
||||
auto* buf =
|
||||
CreateOrFindResizableBuffer(resizable_buffers, it->partial_id);
|
||||
buf->Append(it.message_data(), it->size);
|
||||
|
||||
if (!it->has_more_chunks) {
|
||||
// We can't remove the resizable buffer yet because we need to keep the data alive.
|
||||
// We will remove it the next time this function is called.
|
||||
result.push_back(MakeBuffer(buf->buffer, buf->size));
|
||||
resizable_buffers.erase(it->partial_id);
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
// Note: we can't just return pointers to |local_buffer_| because if we
|
||||
// read a partial message we will invalidate all of the existing
|
||||
// pointers. We could jump through hoops to make it work (ie, if no
|
||||
@ -274,15 +278,21 @@ std::vector<std::unique_ptr<Buffer>> MessageQueue::DequeueAll() {
|
||||
}
|
||||
}
|
||||
|
||||
// Let other threads run. We still want to run as fast as possible, though.
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(0));
|
||||
} while (resizable_buffers.size() > 0);
|
||||
// We're waiting for data to be posted to result. Delay a little so we
|
||||
// don't push the CPU so hard.
|
||||
if (!resizable_buffers.empty())
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(0));
|
||||
else
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
void MessageQueue::CopyPayloadToBuffer(uint32_t partial_id, void* payload, size_t payload_size, bool has_more_chunks) {
|
||||
void MessageQueue::CopyPayloadToBuffer(uint32_t partial_id,
|
||||
void* payload,
|
||||
size_t payload_size,
|
||||
bool has_more_chunks) {
|
||||
assert(BytesAvailableInBuffer() >= (sizeof(MessageHeader) + payload_size));
|
||||
|
||||
// Copy header.
|
||||
@ -303,22 +313,23 @@ size_t MessageQueue::BytesAvailableInBuffer() const {
|
||||
}
|
||||
|
||||
Message* MessageQueue::first_message_in_buffer() const {
|
||||
return reinterpret_cast<Message*>(
|
||||
reinterpret_cast<uint8_t*>(buffer_->data) + sizeof(BufferMetadata));
|
||||
return reinterpret_cast<Message*>(reinterpret_cast<uint8_t*>(buffer_->data) +
|
||||
sizeof(BufferMetadata));
|
||||
}
|
||||
|
||||
void* MessageQueue::first_free_address_in_buffer() const {
|
||||
if (metadata()->total_bytes_used_including_metadata() >= buffer_->capacity)
|
||||
return nullptr;
|
||||
return reinterpret_cast<void*>(
|
||||
reinterpret_cast<uint8_t*>(buffer_->data) +
|
||||
metadata()->total_bytes_used_including_metadata());
|
||||
reinterpret_cast<uint8_t*>(buffer_->data) +
|
||||
metadata()->total_bytes_used_including_metadata());
|
||||
}
|
||||
|
||||
TEST_SUITE("MessageQueue");
|
||||
|
||||
TEST_CASE("simple") {
|
||||
MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5), false /*buffer_has_data*/);
|
||||
MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5),
|
||||
false /*buffer_has_data*/);
|
||||
|
||||
int data = 0;
|
||||
data = 1;
|
||||
@ -337,7 +348,8 @@ TEST_CASE("simple") {
|
||||
}
|
||||
|
||||
TEST_CASE("large payload") {
|
||||
MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5), false /*buffer_has_data*/);
|
||||
MessageQueue queue(Buffer::Create(kMinimumPartialPayloadSize * 5),
|
||||
false /*buffer_has_data*/);
|
||||
|
||||
// Allocate big buffer.
|
||||
size_t num_ints = kMinimumPartialPayloadSize * 100;
|
||||
@ -345,7 +357,8 @@ TEST_CASE("large payload") {
|
||||
for (int i = 0; i < num_ints; ++i)
|
||||
sent_ints[i] = i;
|
||||
|
||||
// Queue big buffer. Add surrounding messages to make sure they get sent correctly.
|
||||
// Queue big buffer. Add surrounding messages to make sure they get sent
|
||||
// correctly.
|
||||
// Run in a separate thread because Enqueue will block.
|
||||
volatile bool done_sending = false;
|
||||
std::thread sender([&]() {
|
||||
|
Loading…
Reference in New Issue
Block a user