mirror of
				https://github.com/MaskRay/ccls.git
				synced 2025-11-04 06:15:20 +00:00 
			
		
		
		
	partial messages work
This commit is contained in:
		
							parent
							
								
									ebd467d31b
								
							
						
					
					
						commit
						b7923b4abe
					
				@ -663,27 +663,24 @@ void LanguageServerMain(std::string process_name) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  std::thread stdio_reader(&LanguageServerStdinLoop, &client_ipc);
 | 
					  std::thread stdio_reader(&LanguageServerStdinLoop, &client_ipc);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
  // No server. Run it in-process.
 | 
					 | 
				
			||||||
  if (!has_server) {
 | 
					  if (!has_server) {
 | 
				
			||||||
 | 
					    // No server. Run it in-process.
 | 
				
			||||||
    QueryableDatabase db;
 | 
					    new std::thread([&]() {
 | 
				
			||||||
    IpcServer server_ipc("languageserver");
 | 
					      IpcServer server_ipc("languageserver");
 | 
				
			||||||
 | 
					      QueryableDatabase db;
 | 
				
			||||||
    while (true) {
 | 
					      while (true) {
 | 
				
			||||||
      QueryDbMainLoop(&server_ipc, &db);
 | 
					        QueryDbMainLoop(&server_ipc, &db);
 | 
				
			||||||
      LanguageServerMainLoop(&client_ipc);
 | 
					        // TODO: use a condition variable.
 | 
				
			||||||
      // TODO: use a condition variable.
 | 
					        std::this_thread::sleep_for(std::chrono::microseconds(0));
 | 
				
			||||||
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
 | 
					      }
 | 
				
			||||||
    }
 | 
					    });
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  else {
 | 
					  // Run language client.
 | 
				
			||||||
    while (true) {
 | 
					  while (true) {
 | 
				
			||||||
      LanguageServerMainLoop(&client_ipc);
 | 
					    LanguageServerMainLoop(&client_ipc);
 | 
				
			||||||
      // TODO: use a condition variable.
 | 
					    // TODO: use a condition variable.
 | 
				
			||||||
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
 | 
					    std::this_thread::sleep_for(std::chrono::microseconds(0));
 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -734,6 +731,10 @@ void LanguageServerMain(std::string process_name) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int main(int argc, char** argv) {
 | 
					int main(int argc, char** argv) {
 | 
				
			||||||
 | 
					  bool loop = false;
 | 
				
			||||||
 | 
					  while (loop)
 | 
				
			||||||
 | 
					    std::this_thread::sleep_for(std::chrono::milliseconds(16));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (argc == 1) {
 | 
					  if (argc == 1) {
 | 
				
			||||||
    RunTests();
 | 
					    RunTests();
 | 
				
			||||||
    return 0;
 | 
					    return 0;
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										302
									
								
								ipc.cc
									
									
									
									
									
								
							
							
						
						
									
										302
									
								
								ipc.cc
									
									
									
									
									
								
							@ -2,44 +2,9 @@
 | 
				
			|||||||
#include "serializer.h"
 | 
					#include "serializer.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace {
 | 
					namespace {
 | 
				
			||||||
  struct BufferBuilder {
 | 
					  // The absolute smallest partial payload we should send. This must be >0, ie, 1 is the
 | 
				
			||||||
    void* memory;
 | 
					  // minimum. Keep a reasonably high value so we don't send needlessly send tiny payloads.
 | 
				
			||||||
    size_t size;
 | 
					  const int kMinimumPartialPayloadSize = 128;
 | 
				
			||||||
    size_t capacity;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    BufferBuilder() {
 | 
					 | 
				
			||||||
      memory = malloc(128);
 | 
					 | 
				
			||||||
      size = 0;
 | 
					 | 
				
			||||||
      capacity = 128;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ~BufferBuilder() {
 | 
					 | 
				
			||||||
      free(memory);
 | 
					 | 
				
			||||||
      size = 0;
 | 
					 | 
				
			||||||
      capacity = 0;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    void AppendToBuffer(void* content, size_t content_size) {
 | 
					 | 
				
			||||||
      if (size + content_size > capacity) {
 | 
					 | 
				
			||||||
        // Grow memory if needed.
 | 
					 | 
				
			||||||
        size_t new_size = capacity * 2;
 | 
					 | 
				
			||||||
        while (new_size < size + content_size)
 | 
					 | 
				
			||||||
          new_size *= 2;
 | 
					 | 
				
			||||||
        void* new_memory = malloc(capacity);
 | 
					 | 
				
			||||||
        memcpy(new_memory, memory, size);
 | 
					 | 
				
			||||||
        free(memory);
 | 
					 | 
				
			||||||
        memory = new_memory;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        // Append new content into memory.
 | 
					 | 
				
			||||||
        memcpy((char*)memory + size, content, content_size);
 | 
					 | 
				
			||||||
        size += content_size;
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    void Reset() {
 | 
					 | 
				
			||||||
      size = 0;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  };
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // JSON-encoded message that is passed across shared memory.
 | 
					  // JSON-encoded message that is passed across shared memory.
 | 
				
			||||||
  //
 | 
					  //
 | 
				
			||||||
@ -49,14 +14,20 @@ namespace {
 | 
				
			|||||||
  // completely different address.
 | 
					  // completely different address.
 | 
				
			||||||
  struct JsonMessage {
 | 
					  struct JsonMessage {
 | 
				
			||||||
    IpcId ipc_id;
 | 
					    IpcId ipc_id;
 | 
				
			||||||
 | 
					    int partial_message_id;
 | 
				
			||||||
 | 
					    bool has_more_chunks;
 | 
				
			||||||
    size_t payload_size;
 | 
					    size_t payload_size;
 | 
				
			||||||
 | 
					    void* payload() {
 | 
				
			||||||
    const char* payload() {
 | 
					      return reinterpret_cast<char*>(this) + sizeof(JsonMessage);
 | 
				
			||||||
      return reinterpret_cast<const char*>(this) + sizeof(JsonMessage);
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    void SetPayload(size_t payload_size, const char* payload) {
 | 
					
 | 
				
			||||||
      char* payload_dest = reinterpret_cast<char*>(this) + sizeof(JsonMessage);
 | 
					    void Setup(IpcId ipc_id, int partial_message_id, bool has_more_chunks, size_t payload_size, const char* payload) {
 | 
				
			||||||
 | 
					      this->ipc_id = ipc_id;
 | 
				
			||||||
 | 
					      this->partial_message_id = partial_message_id;
 | 
				
			||||||
 | 
					      this->has_more_chunks = has_more_chunks;
 | 
				
			||||||
      this->payload_size = payload_size;
 | 
					      this->payload_size = payload_size;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      char* payload_dest = reinterpret_cast<char*>(this) + sizeof(JsonMessage);
 | 
				
			||||||
      memcpy(payload_dest, payload, payload_size);
 | 
					      memcpy(payload_dest, payload, payload_size);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  };
 | 
					  };
 | 
				
			||||||
@ -89,15 +60,10 @@ struct IpcDirectionalChannel::MessageBuffer {
 | 
				
			|||||||
  size_t real_buffer_size;
 | 
					  size_t real_buffer_size;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  template<typename T>
 | 
					  template<typename T>
 | 
				
			||||||
  T* Offset(size_t offset) {
 | 
					  T* Offset(size_t offset) const {
 | 
				
			||||||
    return reinterpret_cast<T*>(static_cast<char*>(real_buffer) + offset);
 | 
					    return reinterpret_cast<T*>(static_cast<char*>(real_buffer) + offset);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // Number of bytes available in buffer_start. Note that this
 | 
					 | 
				
			||||||
  // is smaller than the total buffer size, since there is some
 | 
					 | 
				
			||||||
  // metadata stored at the start of the buffer.
 | 
					 | 
				
			||||||
  size_t buffer_size;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  struct Metadata {
 | 
					  struct Metadata {
 | 
				
			||||||
    // The number of bytes that are currently used in the buffer minus the
 | 
					    // The number of bytes that are currently used in the buffer minus the
 | 
				
			||||||
    // size of this Metadata struct.
 | 
					    // size of this Metadata struct.
 | 
				
			||||||
@ -106,22 +72,26 @@ struct IpcDirectionalChannel::MessageBuffer {
 | 
				
			|||||||
    int num_outstanding_partial_messages = 0;
 | 
					    int num_outstanding_partial_messages = 0;
 | 
				
			||||||
  };
 | 
					  };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  Metadata* metadata() {
 | 
					  Metadata* metadata() const {
 | 
				
			||||||
    return Offset<Metadata>(0);
 | 
					    return Offset<Metadata>(0);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  JsonMessage* message_at_offset(size_t offset) {
 | 
					  size_t bytes_available() const {
 | 
				
			||||||
 | 
					    return real_buffer_size - sizeof(Metadata) - metadata()->bytes_used;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  JsonMessage* message_at_offset(size_t offset) const {
 | 
				
			||||||
    return Offset<JsonMessage>(sizeof(Metadata) + offset);
 | 
					    return Offset<JsonMessage>(sizeof(Metadata) + offset);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // First json message.
 | 
					  // First json message.
 | 
				
			||||||
  JsonMessage* first_message() {
 | 
					  JsonMessage* first_message() const {
 | 
				
			||||||
    return message_at_offset(0);
 | 
					    return message_at_offset(0);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // First free, writable json message. Make sure to increase *bytes_used()
 | 
					  // First free, writable json message. Make sure to increase *bytes_used()
 | 
				
			||||||
  // by any written size.
 | 
					  // by any written size.
 | 
				
			||||||
  JsonMessage* free_message() {
 | 
					  JsonMessage* free_message() const {
 | 
				
			||||||
    return message_at_offset(metadata()->bytes_used);
 | 
					    return message_at_offset(metadata()->bytes_used);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -129,26 +99,32 @@ struct IpcDirectionalChannel::MessageBuffer {
 | 
				
			|||||||
    void* buffer;
 | 
					    void* buffer;
 | 
				
			||||||
    size_t remaining_bytes;
 | 
					    size_t remaining_bytes;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Iterator(void* buffer, size_t remaining_bytes) : remaining_bytes(remaining_bytes) {}
 | 
					    Iterator(void* buffer, size_t remaining_bytes) : buffer(buffer), remaining_bytes(remaining_bytes) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    JsonMessage* get() const {
 | 
					    JsonMessage* get() const {
 | 
				
			||||||
      assert(buffer);
 | 
					      assert(buffer);
 | 
				
			||||||
      return reinterpret_cast<JsonMessage*>(buffer);
 | 
					      return reinterpret_cast<JsonMessage*>(buffer);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    JsonMessage* operator*() const {
 | 
				
			||||||
 | 
					      return get();
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    JsonMessage* operator->() const {
 | 
					    JsonMessage* operator->() const {
 | 
				
			||||||
      return get();
 | 
					      return get();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Iterator operator++() const {
 | 
					    void operator++() {
 | 
				
			||||||
      size_t next_message_offset = sizeof(JsonMessage) + get()->payload_size;
 | 
					      size_t next_message_offset = sizeof(JsonMessage) + get()->payload_size;
 | 
				
			||||||
      if (next_message_offset >= remaining_bytes) {
 | 
					      if (next_message_offset >= remaining_bytes) {
 | 
				
			||||||
        assert(next_message_offset == remaining_bytes);
 | 
					        assert(next_message_offset == remaining_bytes);
 | 
				
			||||||
        return Iterator(nullptr, 0);
 | 
					        buffer = nullptr;
 | 
				
			||||||
 | 
					        remaining_bytes = 0;
 | 
				
			||||||
 | 
					        return;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      auto* next_message = (char*)buffer + next_message_offset;
 | 
					      buffer = (char*)buffer + next_message_offset;
 | 
				
			||||||
      return Iterator(next_message, remaining_bytes - next_message_offset);
 | 
					      remaining_bytes -= next_message_offset;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    bool operator==(const Iterator& other) const {
 | 
					    bool operator==(const Iterator& other) const {
 | 
				
			||||||
@ -159,14 +135,71 @@ struct IpcDirectionalChannel::MessageBuffer {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  };
 | 
					  };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  Iterator begin() {
 | 
					  Iterator begin() const {
 | 
				
			||||||
 | 
					    if (metadata()->bytes_used == 0)
 | 
				
			||||||
 | 
					      return end();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return Iterator(first_message(), metadata()->bytes_used);
 | 
					    return Iterator(first_message(), metadata()->bytes_used);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  Iterator end() {
 | 
					  Iterator end() const {
 | 
				
			||||||
    return Iterator(nullptr, 0);
 | 
					    return Iterator(nullptr, 0);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					struct IpcDirectionalChannel::ResizableBuffer {
 | 
				
			||||||
 | 
					  void* memory;
 | 
				
			||||||
 | 
					  size_t size;
 | 
				
			||||||
 | 
					  size_t capacity;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ResizableBuffer() {
 | 
				
			||||||
 | 
					    memory = malloc(128);
 | 
				
			||||||
 | 
					    size = 0;
 | 
				
			||||||
 | 
					    capacity = 128;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ~ResizableBuffer() {
 | 
				
			||||||
 | 
					    free(memory);
 | 
				
			||||||
 | 
					    size = 0;
 | 
				
			||||||
 | 
					    capacity = 0;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  void Append(void* content, size_t content_size) {
 | 
				
			||||||
 | 
					    assert(capacity);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Grow memory if needed.
 | 
				
			||||||
 | 
					    if ((size + content_size) >= capacity) {
 | 
				
			||||||
 | 
					      size_t new_capacity = capacity * 2;
 | 
				
			||||||
 | 
					      while (new_capacity < size + content_size)
 | 
				
			||||||
 | 
					        new_capacity *= 2;
 | 
				
			||||||
 | 
					      void* new_memory = malloc(new_capacity);
 | 
				
			||||||
 | 
					      assert(size < capacity);
 | 
				
			||||||
 | 
					      memcpy(new_memory, memory, size);
 | 
				
			||||||
 | 
					      free(memory);
 | 
				
			||||||
 | 
					      memory = new_memory;
 | 
				
			||||||
 | 
					      capacity = new_capacity;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Append new content into memory.
 | 
				
			||||||
 | 
					    memcpy((char*)memory + size, content, content_size);
 | 
				
			||||||
 | 
					    size += content_size;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  void Reset() {
 | 
				
			||||||
 | 
					    size = 0;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					IpcDirectionalChannel::ResizableBuffer* IpcDirectionalChannel::CreateOrFindResizableBuffer(int id) {
 | 
				
			||||||
 | 
					  auto it = resizable_buffers.find(id);
 | 
				
			||||||
 | 
					  if (it != resizable_buffers.end())
 | 
				
			||||||
 | 
					    return it->second.get();
 | 
				
			||||||
 | 
					  return (resizable_buffers[id] = MakeUnique<ResizableBuffer>()).get();
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void IpcDirectionalChannel::RemoveResizableBuffer(int id) {
 | 
				
			||||||
 | 
					  resizable_buffers.erase(id);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) {
 | 
					IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) {
 | 
				
			||||||
  shared = CreatePlatformSharedMemory(name + "memory");
 | 
					  shared = CreatePlatformSharedMemory(name + "memory");
 | 
				
			||||||
  mutex = CreatePlatformMutex(name + "mutex");
 | 
					  mutex = CreatePlatformMutex(name + "mutex");
 | 
				
			||||||
@ -180,16 +213,37 @@ IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
IpcDirectionalChannel::~IpcDirectionalChannel() {}
 | 
					IpcDirectionalChannel::~IpcDirectionalChannel() {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO:
 | 
					enum class DispatchResult {
 | 
				
			||||||
//  We need to send partial payloads. But other payloads may appear in
 | 
					  RunAgain,
 | 
				
			||||||
//  the middle of the payload.
 | 
					  Break
 | 
				
			||||||
//
 | 
					};
 | 
				
			||||||
//  
 | 
					
 | 
				
			||||||
//  int partial_payload_id = 0
 | 
					// Run |action| an arbitrary number of times.
 | 
				
			||||||
//  int num_uncompleted_payloads = 0
 | 
					void IpcDispatch(PlatformMutex* mutex, std::function<DispatchResult()> action) {
 | 
				
			||||||
 | 
					  bool first = true;
 | 
				
			||||||
 | 
					  int log_iteration_count = 0;
 | 
				
			||||||
 | 
					  int log_count = 0;
 | 
				
			||||||
 | 
					  while (true) {
 | 
				
			||||||
 | 
					    if (!first) {
 | 
				
			||||||
 | 
					      if (log_iteration_count > 1000) {
 | 
				
			||||||
 | 
					        log_iteration_count = 0;
 | 
				
			||||||
 | 
					        std::cerr << "[info]: shmem full, waiting (" << log_count++ << ")" << std::endl; // TODO: remove
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      ++log_iteration_count;
 | 
				
			||||||
 | 
					      std::this_thread::sleep_for(std::chrono::microseconds(0));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    first = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    std::unique_ptr<PlatformScopedMutexLock> lock = CreatePlatformScopedMutexLock(mutex);
 | 
				
			||||||
 | 
					    if (action() == DispatchResult::RunAgain)
 | 
				
			||||||
 | 
					      continue;
 | 
				
			||||||
 | 
					    break;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void IpcDirectionalChannel::PushMessage(IpcMessage* message) {
 | 
					void IpcDirectionalChannel::PushMessage(IpcMessage* message) {
 | 
				
			||||||
  assert(message->ipc_id != IpcId::Invalid);
 | 
					  assert(message->ipc_id != IpcId::Invalid);
 | 
				
			||||||
 | 
					  assert(shmem_size > sizeof(JsonMessage) + kMinimumPartialPayloadSize);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  rapidjson::StringBuffer output;
 | 
					  rapidjson::StringBuffer output;
 | 
				
			||||||
  rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
 | 
					  rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
 | 
				
			||||||
@ -200,37 +254,61 @@ void IpcDirectionalChannel::PushMessage(IpcMessage* message) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  //std::cerr << "Sending message with id " << message->runtime_id() << " (hash " << message->hashed_runtime_id() << ")" << std::endl;
 | 
					  //std::cerr << "Sending message with id " << message->runtime_id() << " (hash " << message->hashed_runtime_id() << ")" << std::endl;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  size_t payload_size = strlen(output.GetString());
 | 
					 | 
				
			||||||
  assert(payload_size < shmem_size && "Increase shared memory size, payload will never fit");
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  bool first = true;
 | 
					  size_t payload_size = output.GetSize();
 | 
				
			||||||
  bool did_log = false;
 | 
					  const char* payload = output.GetString();
 | 
				
			||||||
  while (true) {
 | 
					  if (payload_size == 0)
 | 
				
			||||||
    if (!first) {
 | 
					    return;
 | 
				
			||||||
      if (!did_log) {
 | 
					
 | 
				
			||||||
        std::cerr << "[info]: shmem full, waiting" << std::endl; // TODO: remove
 | 
					  int partial_message_id = 0; // TODO
 | 
				
			||||||
        did_log = true;
 | 
					
 | 
				
			||||||
      }
 | 
					  std::cerr << "Starting dispatch of payload with size " << payload_size << std::endl;
 | 
				
			||||||
      std::this_thread::sleep_for(std::chrono::milliseconds(16));
 | 
					
 | 
				
			||||||
 | 
					  IpcDispatch(mutex.get(), [&]() {
 | 
				
			||||||
 | 
					    assert(payload_size > 0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // We cannot find the entire payload in the buffer. We
 | 
				
			||||||
 | 
					    // have to send chunks of it over time.
 | 
				
			||||||
 | 
					    if ((sizeof(JsonMessage) + payload_size) > shared_buffer->bytes_available()) {
 | 
				
			||||||
 | 
					      if ((sizeof(JsonMessage) + kMinimumPartialPayloadSize) > shared_buffer->bytes_available())
 | 
				
			||||||
 | 
					        return DispatchResult::RunAgain;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      if (partial_message_id == 0)
 | 
				
			||||||
 | 
					        partial_message_id = ++shared_buffer->metadata()->next_partial_message_id; // note: pre-increment so we 1 as initial value
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      size_t sent_payload_size = shared_buffer->bytes_available() - sizeof(JsonMessage);
 | 
				
			||||||
 | 
					      shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, true /*has_more_chunks*/, sent_payload_size, payload);
 | 
				
			||||||
 | 
					      shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + sent_payload_size;
 | 
				
			||||||
 | 
					      shared_buffer->free_message()->ipc_id = IpcId::Invalid;
 | 
				
			||||||
 | 
					      std::cerr << "Sending partial message with payload_size=" << sent_payload_size << std::endl;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      // Prepare for next time.
 | 
				
			||||||
 | 
					      payload_size -= sent_payload_size;
 | 
				
			||||||
 | 
					      payload += sent_payload_size;
 | 
				
			||||||
 | 
					      return DispatchResult::RunAgain;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    first = false;
 | 
					    // The entire payload fits. Send it all now.
 | 
				
			||||||
 | 
					    else {
 | 
				
			||||||
 | 
					      // Include partial message id, as there could have been previous parts of this payload.
 | 
				
			||||||
 | 
					      shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, false /*has_more_chunks*/, payload_size, payload);
 | 
				
			||||||
 | 
					      shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + payload_size;
 | 
				
			||||||
 | 
					      shared_buffer->free_message()->ipc_id = IpcId::Invalid;
 | 
				
			||||||
 | 
					      std::cerr << "Sending full message with payload_size=" << payload_size << std::endl;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    std::unique_ptr<PlatformScopedMutexLock> lock = CreatePlatformScopedMutexLock(mutex.get());
 | 
					      return DispatchResult::Break;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  });
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Try again later when there is room in shared memory.
 | 
					void AddIpcMessageFromJsonMessage(std::vector<std::unique_ptr<IpcMessage>>& result, IpcId ipc_id, void* payload, size_t payload_size) {
 | 
				
			||||||
    if ((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata) + sizeof(JsonMessage) + payload_size) >= shmem_size)
 | 
					  rapidjson::Document document;
 | 
				
			||||||
      continue;
 | 
					  document.Parse(reinterpret_cast<const char*>(payload), payload_size);
 | 
				
			||||||
 | 
					  bool has_error = document.HasParseError();
 | 
				
			||||||
    shared_buffer->free_message()->ipc_id = message->ipc_id;
 | 
					  auto error = document.GetParseError();
 | 
				
			||||||
    shared_buffer->free_message()->SetPayload(payload_size, output.GetString());
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + shared_buffer->free_message()->payload_size;
 | 
					 | 
				
			||||||
    assert((shared_buffer->metadata()->bytes_used + sizeof(MessageBuffer::Metadata)) < shmem_size);
 | 
					 | 
				
			||||||
    assert(shared_buffer->metadata()->bytes_used >= 0);
 | 
					 | 
				
			||||||
    shared_buffer->free_message()->ipc_id = IpcId::Invalid;
 | 
					 | 
				
			||||||
    break;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  std::unique_ptr<IpcMessage> base_message = IpcRegistry::instance()->Allocate(ipc_id);
 | 
				
			||||||
 | 
					  base_message->Deserialize(document);
 | 
				
			||||||
 | 
					  result.emplace_back(std::move(base_message));
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
std::vector<std::unique_ptr<IpcMessage>> IpcDirectionalChannel::TakeMessages() {
 | 
					std::vector<std::unique_ptr<IpcMessage>> IpcDirectionalChannel::TakeMessages() {
 | 
				
			||||||
@ -250,31 +328,23 @@ std::vector<std::unique_ptr<IpcMessage>> IpcDirectionalChannel::TakeMessages() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  std::vector<std::unique_ptr<IpcMessage>> result;
 | 
					  std::vector<std::unique_ptr<IpcMessage>> result;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // TODO
 | 
					  for (JsonMessage* message : *local_buffer) {
 | 
				
			||||||
  for (auto it = local_buffer->begin(); it != local_buffer->end(); ++it) {
 | 
					    std::cerr << "Got message with payload_size=" << message->payload_size << std::endl;
 | 
				
			||||||
    // TODO: partial payload, maybe something like this:
 | 
					 | 
				
			||||||
    //
 | 
					 | 
				
			||||||
    //  if (it->partial_id != 0) {
 | 
					 | 
				
			||||||
    //    auto* buf = CreateOrFindResizableBuffer(it->partial_id);
 | 
					 | 
				
			||||||
    //    buf->Append(it->payload(), it->payload_size());
 | 
					 | 
				
			||||||
    //    if (it->is_complete) {
 | 
					 | 
				
			||||||
    //      Process(buf.payload(), buff.payload_size())
 | 
					 | 
				
			||||||
    //      RemoveResizableBuffer(it->partial_id)
 | 
					 | 
				
			||||||
    //    }
 | 
					 | 
				
			||||||
    //  }
 | 
					 | 
				
			||||||
    //  else {
 | 
					 | 
				
			||||||
    //    Process(it->payload(), it->payload_size())
 | 
					 | 
				
			||||||
    //  }
 | 
					 | 
				
			||||||
    //
 | 
					 | 
				
			||||||
    rapidjson::Document document;
 | 
					 | 
				
			||||||
    document.Parse(it->payload(), it->payload_size);
 | 
					 | 
				
			||||||
    bool has_error = document.HasParseError();
 | 
					 | 
				
			||||||
    auto error = document.GetParseError();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    std::unique_ptr<IpcMessage> base_message = IpcRegistry::instance()->Allocate(it->ipc_id);
 | 
					    if (message->partial_message_id != 0) {
 | 
				
			||||||
    base_message->Deserialize(document);
 | 
					      auto* buf = CreateOrFindResizableBuffer(message->partial_message_id);
 | 
				
			||||||
    result.emplace_back(std::move(base_message));
 | 
					      buf->Append(message->payload(), message->payload_size);
 | 
				
			||||||
 | 
					      if (!message->has_more_chunks) {
 | 
				
			||||||
 | 
					        AddIpcMessageFromJsonMessage(result, message->ipc_id, buf->memory, buf->size);
 | 
				
			||||||
 | 
					        RemoveResizableBuffer(message->partial_message_id);
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    else {
 | 
				
			||||||
 | 
					      assert(!message->has_more_chunks);
 | 
				
			||||||
 | 
					      AddIpcMessageFromJsonMessage(result, message->ipc_id, message->payload(), message->payload_size);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					  local_buffer->metadata()->bytes_used = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  return result;
 | 
					  return result;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										4
									
								
								ipc.h
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								ipc.h
									
									
									
									
									
								
							@ -93,7 +93,11 @@ struct IpcDirectionalChannel {
 | 
				
			|||||||
  std::vector<std::unique_ptr<IpcMessage>> TakeMessages();
 | 
					  std::vector<std::unique_ptr<IpcMessage>> TakeMessages();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  struct MessageBuffer;
 | 
					  struct MessageBuffer;
 | 
				
			||||||
 | 
					  struct ResizableBuffer;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ResizableBuffer* CreateOrFindResizableBuffer(int id);
 | 
				
			||||||
 | 
					  void RemoveResizableBuffer(int id);
 | 
				
			||||||
 | 
					  std::unordered_map<int, std::unique_ptr<ResizableBuffer>> resizable_buffers;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // Pointer to process shared memory and process shared mutex.
 | 
					  // Pointer to process shared memory and process shared mutex.
 | 
				
			||||||
  std::unique_ptr<PlatformSharedMemory> shared;
 | 
					  std::unique_ptr<PlatformSharedMemory> shared;
 | 
				
			||||||
 | 
				
			|||||||
@ -14,7 +14,7 @@ struct PlatformSharedMemory {
 | 
				
			|||||||
  void* shared;
 | 
					  void* shared;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const int shmem_size = 50;// 1024 * 1024 * 32;  // number of chars/bytes (32mb)
 | 
					const int shmem_size = 200;// 1024 * 1024 * 32;  // number of chars/bytes (32mb)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name);
 | 
					std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name);
 | 
				
			||||||
std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(PlatformMutex* mutex);
 | 
					std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(PlatformMutex* mutex);
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user