Refactor ipc behind a proxy type so we can (eventually) bypass all serialization when running fully in-process.

This commit is contained in:
Jacob Dufault 2017-04-16 12:02:29 -07:00
parent 2ebaadd696
commit 724d8cc3f4

View File

@ -33,11 +33,197 @@
namespace {
const char* kIpcLanguageClientName = "language_client";
const int kNumIndexers = 8 - 1;
const int kQueueSizeBytes = 1024 * 8;
const int kMaxWorkspaceSearchResults = 1000;
const bool kUseMultipleProcesses = true;
struct IpcManager {
// TODO: Rename TypedBidiMessageQueue to IpcTransport?
using IpcMessageQueue = TypedBidiMessageQueue<IpcId, BaseIpcMessage>;
static constexpr const char* kIpcLanguageClientName = "language_client";
static constexpr const int kQueueSizeBytes = 1024 * 8;
std::unique_ptr<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>> threaded_queue_;
std::unique_ptr<IpcMessageQueue> ipc_queue_;
IpcManager() {
ipc_queue_ = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes);
}
template <typename T>
void SendOutMessageToClient(T& response) {
std::ostringstream sstream;
response.Write(sstream);
Ipc_Cout out;
out.content = sstream.str();
ipc_queue_->SendMessage(&ipc_queue_->for_client, Ipc_Cout::kIpcId, out);
}
enum class Destination {
Client, Server
};
template <typename TId, typename TMessage>
void SendMessageWithId(Destination destination, TId id, TMessage& message) {
ipc_queue_->SendMessage(
destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server,
id,
message);
}
template <typename TMessage>
void SendMessage(Destination destination, TMessage& message) {
SendMessageWithId(destination, TMessage::kIpcId, message);
}
template <typename TMessage>
std::vector<std::unique_ptr<TMessage>> GetMessages(Destination destination) {
return ipc_queue_->GetMessages(destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server);
}
private:
template<typename T>
void RegisterId(IpcMessageQueue* t) {
t->RegisterId(T::kIpcId,
[](Writer& visitor, BaseIpcMessage& message) {
T& m = static_cast<T&>(message);
Reflect(visitor, m);
}, [](Reader& visitor) {
auto m = MakeUnique<T>();
Reflect(visitor, *m);
return m;
});
}
std::unique_ptr<IpcMessageQueue> BuildIpcMessageQueue(const std::string& name, size_t buffer_size) {
auto ipc = MakeUnique<IpcMessageQueue>(name, buffer_size);
RegisterId<Ipc_CancelRequest>(ipc.get());
RegisterId<Ipc_InitializeRequest>(ipc.get());
RegisterId<Ipc_InitializedNotification>(ipc.get());
RegisterId<Ipc_TextDocumentDidOpen>(ipc.get());
RegisterId<Ipc_TextDocumentDidChange>(ipc.get());
RegisterId<Ipc_TextDocumentDidClose>(ipc.get());
RegisterId<Ipc_TextDocumentDidSave>(ipc.get());
RegisterId<Ipc_TextDocumentRename>(ipc.get());
RegisterId<Ipc_TextDocumentComplete>(ipc.get());
RegisterId<Ipc_TextDocumentDefinition>(ipc.get());
RegisterId<Ipc_TextDocumentDocumentHighlight>(ipc.get());
RegisterId<Ipc_TextDocumentHover>(ipc.get());
RegisterId<Ipc_TextDocumentReferences>(ipc.get());
RegisterId<Ipc_TextDocumentDocumentSymbol>(ipc.get());
RegisterId<Ipc_TextDocumentCodeLens>(ipc.get());
RegisterId<Ipc_CodeLensResolve>(ipc.get());
RegisterId<Ipc_WorkspaceSymbol>(ipc.get());
RegisterId<Ipc_Quit>(ipc.get());
RegisterId<Ipc_IsAlive>(ipc.get());
RegisterId<Ipc_OpenProject>(ipc.get());
RegisterId<Ipc_Cout>(ipc.get());
return ipc;
}
};
void PushBack(NonElidedVector<lsLocation>* result, optional<lsLocation> location) {
if (location)
@ -501,6 +687,18 @@ std::vector<SymbolRef> FindSymbolsAtLocation(QueryFile* file, lsPosition positio
@ -549,67 +747,11 @@ struct Index_OnIndexed {
explicit Index_OnIndexed(IndexUpdate& update) : update(update) {}
};
// TODO: Rename TypedBidiMessageQueue to IpcTransport?
using IpcMessageQueue = TypedBidiMessageQueue<IpcId, BaseIpcMessage>;
using Index_DoIndexQueue = ThreadedQueue<Index_DoIndex>;
using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>;
using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>;
template<typename TMessage>
void SendMessage(IpcMessageQueue& t, MessageQueue* destination, TMessage& message) {
t.SendMessage(destination, TMessage::kIpcId, message);
}
template<typename T>
void SendOutMessageToClient(IpcMessageQueue* queue, T& response) {
std::ostringstream sstream;
response.Write(sstream);
Ipc_Cout out;
out.content = sstream.str();
queue->SendMessage(&queue->for_client, Ipc_Cout::kIpcId, out);
}
template<typename T>
void RegisterId(IpcMessageQueue* t) {
t->RegisterId(T::kIpcId,
[](Writer& visitor, BaseIpcMessage& message) {
T& m = static_cast<T&>(message);
Reflect(visitor, m);
}, [](Reader& visitor) {
auto m = MakeUnique<T>();
Reflect(visitor, *m);
return m;
});
}
std::unique_ptr<IpcMessageQueue> BuildIpcMessageQueue(const std::string& name, size_t buffer_size) {
auto ipc = MakeUnique<IpcMessageQueue>(name, buffer_size);
RegisterId<Ipc_CancelRequest>(ipc.get());
RegisterId<Ipc_InitializeRequest>(ipc.get());
RegisterId<Ipc_InitializedNotification>(ipc.get());
RegisterId<Ipc_TextDocumentDidOpen>(ipc.get());
RegisterId<Ipc_TextDocumentDidChange>(ipc.get());
RegisterId<Ipc_TextDocumentDidClose>(ipc.get());
RegisterId<Ipc_TextDocumentDidSave>(ipc.get());
RegisterId<Ipc_TextDocumentRename>(ipc.get());
RegisterId<Ipc_TextDocumentComplete>(ipc.get());
RegisterId<Ipc_TextDocumentDefinition>(ipc.get());
RegisterId<Ipc_TextDocumentDocumentHighlight>(ipc.get());
RegisterId<Ipc_TextDocumentHover>(ipc.get());
RegisterId<Ipc_TextDocumentReferences>(ipc.get());
RegisterId<Ipc_TextDocumentDocumentSymbol>(ipc.get());
RegisterId<Ipc_TextDocumentCodeLens>(ipc.get());
RegisterId<Ipc_CodeLensResolve>(ipc.get());
RegisterId<Ipc_WorkspaceSymbol>(ipc.get());
RegisterId<Ipc_Quit>(ipc.get());
RegisterId<Ipc_IsAlive>(ipc.get());
RegisterId<Ipc_OpenProject>(ipc.get());
RegisterId<Ipc_Cout>(ipc.get());
return ipc;
}
void RegisterMessageTypes() {
MessageRegistry::instance()->Register<Ipc_CancelRequest>();
MessageRegistry::instance()->Register<Ipc_InitializeRequest>();
@ -630,6 +772,39 @@ void RegisterMessageTypes() {
MessageRegistry::instance()->Register<Ipc_WorkspaceSymbol>();
}
bool IndexMain_DoIndex(FileConsumer::SharedState* file_consumer_shared,
Index_DoIndexQueue* queue_do_index,
Index_DoIdMapQueue* queue_do_id_map) {
@ -780,6 +955,22 @@ void IndexMain(
@ -799,7 +990,7 @@ void IndexMain(
void QueryDbMainLoop(
QueryDatabase* db,
IpcMessageQueue* language_client,
IpcManager* language_client,
Index_DoIndexQueue* queue_do_index,
Index_DoIdMapQueue* queue_do_id_map,
Index_OnIdMappedQueue* queue_on_id_mapped,
@ -808,7 +999,7 @@ void QueryDbMainLoop(
WorkingFiles* working_files,
CompletionManager* completion_manager) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = language_client->GetMessages(&language_client->for_server);
std::vector<std::unique_ptr<BaseIpcMessage>> messages = language_client->GetMessages<BaseIpcMessage>(IpcManager::Destination::Server);
for (auto& message : messages) {
//std::cerr << "[querydb] Processing message " << static_cast<int>(message->method_id) << std::endl;
@ -821,7 +1012,7 @@ void QueryDbMainLoop(
case IpcId::IsAlive: {
Ipc_IsAlive response;
language_client->SendMessage(&language_client->for_client, response.method_id, response);
language_client->SendMessageWithId(IpcManager::Destination::Client, response.method_id, response);
break;
}
@ -908,7 +1099,7 @@ void QueryDbMainLoop(
}
response.Write(std::cerr);
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -986,7 +1177,7 @@ void QueryDbMainLoop(
break;
}
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1022,7 +1213,7 @@ void QueryDbMainLoop(
break;
}
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1048,7 +1239,7 @@ void QueryDbMainLoop(
break;
}
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1085,7 +1276,7 @@ void QueryDbMainLoop(
break;
}
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1111,7 +1302,7 @@ void QueryDbMainLoop(
response.result.push_back(info);
}
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1205,7 +1396,7 @@ void QueryDbMainLoop(
};
}
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1245,7 +1436,7 @@ void QueryDbMainLoop(
}
std::cerr << "- Found " << response.result.size() << " results for query " << query << std::endl;
SendOutMessageToClient(language_client, response);
language_client->SendOutMessageToClient(response);
break;
}
@ -1297,7 +1488,7 @@ void QueryDbMain() {
//std::cerr << "Running QueryDb" << std::endl;
// Create queues.
std::unique_ptr<IpcMessageQueue> ipc = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes);
IpcManager ipc;
Index_DoIndexQueue queue_do_index;
Index_DoIdMapQueue queue_do_id_map;
Index_OnIdMappedQueue queue_on_id_mapped;
@ -1318,11 +1509,73 @@ void QueryDbMain() {
// Run query db main loop.
QueryDatabase db;
while (true) {
QueryDbMainLoop(&db, ipc.get(), &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager);
QueryDbMainLoop(&db, &ipc, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
// TODO: global lock on stderr output.
// Separate thread whose only job is to read from stdin and
@ -1331,7 +1584,7 @@ void QueryDbMain() {
// blocks.
//
// |ipc| is connected to a server.
void LanguageServerStdinLoop(IpcMessageQueue* ipc) {
void LanguageServerStdinLoop(IpcManager* ipc) {
while (true) {
std::unique_ptr<BaseIpcMessage> message = MessageRegistry::instance()->ReadMessageFromStdin();
@ -1353,7 +1606,7 @@ void LanguageServerStdinLoop(IpcMessageQueue* ipc) {
<< std::endl;
Ipc_OpenProject open_project;
open_project.project_path = project_path;
ipc->SendMessage(&ipc->for_server, Ipc_OpenProject::kIpcId, open_project);
ipc->SendMessageWithId(IpcManager::Destination::Server, Ipc_OpenProject::kIpcId, open_project);
}
// TODO: query request->params.capabilities.textDocument and support only things
@ -1415,7 +1668,7 @@ void LanguageServerStdinLoop(IpcMessageQueue* ipc) {
case IpcId::TextDocumentCodeLens:
case IpcId::WorkspaceSymbol: {
//std::cerr << "Sending message " << (int)message->method_id << std::endl;
ipc->SendMessage(&ipc->for_server, message->method_id, *message.get());
ipc->SendMessageWithId(IpcManager::Destination::Server, message->method_id, *message.get());
break;
}
@ -1428,8 +1681,8 @@ void LanguageServerStdinLoop(IpcMessageQueue* ipc) {
}
}
void LanguageServerMainLoop(IpcMessageQueue* ipc) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(&ipc->for_client);
void LanguageServerMainLoop(IpcManager* ipc) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages<BaseIpcMessage>(IpcManager::Destination::Client);
for (auto& message : messages) {
switch (message->method_id) {
case IpcId::Quit: {
@ -1454,16 +1707,63 @@ void LanguageServerMainLoop(IpcMessageQueue* ipc) {
}
}
bool IsQueryDbProcessRunning(IpcMessageQueue* ipc) {
bool IsQueryDbProcessRunning(IpcManager* ipc) {
// Emit an alive check. Sleep so the server has time to respond.
Ipc_IsAlive check_alive;
SendMessage(*ipc, &ipc->for_server, check_alive);
ipc->SendMessage(IpcManager::Destination::Server, check_alive);
// TODO: Tune this value or make it configurable.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Check if we got an IsAlive message back.
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(&ipc->for_client);
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages<BaseIpcMessage>(IpcManager::Destination::Client);
for (auto& message : messages) {
if (IpcId::IsAlive == message->method_id)
return true;
@ -1473,12 +1773,12 @@ bool IsQueryDbProcessRunning(IpcMessageQueue* ipc) {
}
void LanguageServerMain(std::string process_name) {
std::unique_ptr<IpcMessageQueue> ipc = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes);
IpcManager ipc;
// Discard any left-over messages from previous runs.
ipc->GetMessages(&ipc->for_client);
ipc.GetMessages<BaseIpcMessage>(IpcManager::Destination::Client);
bool has_server = IsQueryDbProcessRunning(ipc.get());
bool has_server = IsQueryDbProcessRunning(&ipc);
// No server is running. Start it in-process. If the user wants to run the
// server out of process they have to start it themselves.
@ -1487,14 +1787,65 @@ void LanguageServerMain(std::string process_name) {
}
// Run language client.
new std::thread(&LanguageServerStdinLoop, ipc.get());
new std::thread(&LanguageServerStdinLoop, &ipc);
while (true) {
LanguageServerMainLoop(ipc.get());
LanguageServerMainLoop(&ipc);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int main(int argc, char** argv) {
// TODO: Move to IndexInit(), remove clang-c include.
clang_enableStackTraces();
clang_toggleCrashRecovery(1);