make ipc easier to use

This commit is contained in:
Jacob Dufault 2017-03-02 22:16:28 -08:00
parent 6ffca03d6f
commit 94383d589b
5 changed files with 93 additions and 71 deletions

18
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,18 @@
{
"files.associations": {
"*.script": "pascal",
"functional": "cpp",
"queue": "cpp",
"stack": "cpp",
"tuple": "cpp",
"xutility": "cpp",
"xlocale": "cpp"
},
"files.exclude": {
"**/.git": true,
"**/.svn": true,
"**/.hg": true,
"**/.DS_Store": true,
"third_party/": true
}
}

View File

@ -97,11 +97,10 @@ indexer.exe --index-file /work2/chrome/src/chrome/foo.cc
#include "ipc.h"
void IndexerServerMain() {
IpcMessageQueue to_server("indexer_to_server");
IpcMessageQueue to_client("indexer_to_language_client");
IpcServer ipc("language_server");
while (true) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = to_server.PopMessage();
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc.TakeMessages();
std::cout << "Server has " << messages.size() << " messages" << std::endl;
for (auto& message : messages) {
@ -109,7 +108,7 @@ void IndexerServerMain() {
case JsonMessage::Kind::IsAlive:
{
IpcMessage_IsAlive response;
to_client.PushMessage(&response);
ipc.SendToClient(0, &response); // todo: make non-blocking
break;
}
default:
@ -125,24 +124,19 @@ void IndexerServerMain() {
}
void LanguageServerMain() {
// TODO: Encapsulate this pattern (ie, we generally want bi-directional channel/queue)
// TODO: Rename IpcMessageQueue to IpcDirectionalChannel
// - As per above, introduce wrapper IpcBidirectionalChannel that has two IpcDirectionalChannel instances
IpcMessageQueue to_server("indexer_to_server");
IpcMessageQueue to_client("indexer_to_language_client");
IpcClient ipc("language_server", 0);
// Discard any left-over messages from previous runs.
to_client.PopMessage();
ipc.TakeMessages();
// Emit an alive check. Sleep so the server has time to respond.
IpcMessage_IsAlive check_alive;
to_server.PushMessage(&check_alive);
ipc.SendToServer(&check_alive);
using namespace std::chrono_literals;
std::this_thread::sleep_for(50ms); // TODO: Tune this value or make it configurable.
// Check if we got an IsAlive message back.
std::vector<std::unique_ptr<BaseIpcMessage>> messages = to_client.PopMessage();
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc.TakeMessages();
bool has_server = false;
for (auto& message : messages) {
if (message->kind == JsonMessage::Kind::IsAlive) {

45
ipc.cc
View File

@ -4,6 +4,14 @@ namespace {
JsonMessage* as_message(char* ptr) {
return reinterpret_cast<JsonMessage*>(ptr);
}
std::string NameToServerName(const std::string& name) {
return name + "_server";
}
std::string NameToClientName(const std::string& name, int client_id) {
return name + "_server_" + std::to_string(client_id);
}
}
const char* JsonMessage::payload() {
@ -52,17 +60,17 @@ void IpcMessage_CreateIndex::Deserialize(Reader& reader) {
::Deserialize(reader, "args", args);
}
IpcMessageQueue::IpcMessageQueue(const std::string& name) {
IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name) {
local_block = new char[shmem_size];
shared = CreatePlatformSharedMemory(name + "_memory");
mutex = CreatePlatformMutex(name + "_mutex");
}
IpcMessageQueue::~IpcMessageQueue() {
IpcDirectionalChannel::~IpcDirectionalChannel() {
delete[] local_block;
}
void IpcMessageQueue::PushMessage(BaseIpcMessage* message) {
void IpcDirectionalChannel::PushMessage(BaseIpcMessage* message) {
rapidjson::StringBuffer output;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
writer.SetFormatOptions(
@ -103,7 +111,7 @@ void IpcMessageQueue::PushMessage(BaseIpcMessage* message) {
}
std::vector<std::unique_ptr<BaseIpcMessage>> IpcMessageQueue::PopMessage() {
std::vector<std::unique_ptr<BaseIpcMessage>> IpcDirectionalChannel::TakeMessages() {
size_t remaining_bytes = 0;
// Move data from shared memory into a local buffer. Do this
// before parsing the blocks so that other processes can begin
@ -150,4 +158,33 @@ std::vector<std::unique_ptr<BaseIpcMessage>> IpcMessageQueue::PopMessage() {
}
return result;
}
IpcServer::IpcServer(const std::string& name)
: name_(name), server_(NameToServerName(name)) {}
void IpcServer::SendToClient(int client_id, BaseIpcMessage* message) {
// Find or create the client.
auto it = clients_.find(client_id);
if (it == clients_.end())
clients_[client_id] = std::make_unique<IpcDirectionalChannel>(NameToClientName(name_, client_id));
clients_[client_id]->PushMessage(message);
}
std::vector<std::unique_ptr<BaseIpcMessage>> IpcServer::TakeMessages() {
return server_.TakeMessages();
}
IpcClient::IpcClient(const std::string& name, int client_id)
: server_(NameToServerName(name)), client_(NameToClientName(name, client_id)) {}
void IpcClient::SendToServer(BaseIpcMessage* message) {
server_.PushMessage(message);
}
std::vector<std::unique_ptr<BaseIpcMessage>> IpcClient::TakeMessages() {
return client_.TakeMessages();
}

31
ipc.h
View File

@ -72,15 +72,15 @@ struct IpcMessage_CreateIndex : public BaseIpcMessage {
void Deserialize(Reader& reader) override;
};
struct IpcMessageQueue {
struct IpcDirectionalChannel {
// NOTE: We keep all pointers in terms of char* so pointer arithmetic is
// always relative to bytes.
explicit IpcMessageQueue(const std::string& name);
~IpcMessageQueue();
explicit IpcDirectionalChannel(const std::string& name);
~IpcDirectionalChannel();
void PushMessage(BaseIpcMessage* message);
std::vector<std::unique_ptr<BaseIpcMessage>> PopMessage();
std::vector<std::unique_ptr<BaseIpcMessage>> TakeMessages();
private:
JsonMessage* get_free_message() {
@ -93,4 +93,27 @@ private:
// Pointer to process-local memory.
char* local_block;
};
struct IpcServer {
IpcServer(const std::string& name);
void SendToClient(int client_id, BaseIpcMessage* message);
std::vector<std::unique_ptr<BaseIpcMessage>> TakeMessages();
private:
std::string name_;
IpcDirectionalChannel server_;
std::unordered_map<int, std::unique_ptr<IpcDirectionalChannel>> clients_;
};
struct IpcClient {
IpcClient(const std::string& name, int client_id);
void SendToServer(BaseIpcMessage* message);
std::vector<std::unique_ptr<BaseIpcMessage>> TakeMessages();
private:
IpcDirectionalChannel server_;
IpcDirectionalChannel client_;
};

View File

@ -1,50 +0,0 @@
#include <iostream>
#include <vector>
#include <memory>
#include "ipc.h"
int main525252(int argc, char** argv) {
if (argc == 2) {
IpcMessageQueue queue("myqueue");
int i = 0;
while (true) {
IpcMessage_ImportIndex m;
m.path = "foo #" + std::to_string(i);
queue.PushMessage(&m);
std::cout << "Sent " << i << std::endl;;
using namespace std::chrono_literals;
std::this_thread::sleep_for(10ms);
++i;
}
}
else {
IpcMessageQueue queue("myqueue");
while (true) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = queue.PopMessage();
std::cout << "Got " << messages.size() << " messages" << std::endl;
for (auto& message : messages) {
rapidjson::StringBuffer output;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
writer.SetFormatOptions(
rapidjson::PrettyFormatOptions::kFormatSingleLineArray);
writer.SetIndent(' ', 2);
message->Serialize(writer);
std::cout << " kind=" << static_cast<int>(message->kind) << ", json=" << output.GetString() << std::endl;
}
using namespace std::chrono_literals;
std::this_thread::sleep_for(5s);
}
}
return 0;
}