Cross-process support can now be disabled.

This commit is contained in:
Jacob Dufault 2017-04-16 14:49:48 -07:00
parent 5105f41f6d
commit c0fb407447
4 changed files with 145 additions and 87 deletions

View File

@ -35,7 +35,7 @@ namespace {
const int kNumIndexers = 8 - 1;
const int kMaxWorkspaceSearchResults = 1000;
const bool kUseMultipleProcesses = true;
const bool kUseMultipleProcesses = false;
@ -87,43 +87,71 @@ struct IpcManager {
static constexpr const char* kIpcLanguageClientName = "language_client";
static constexpr const int kQueueSizeBytes = 1024 * 8;
std::unique_ptr<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>> threaded_queue_;
std::unique_ptr<IpcMessageQueue> ipc_queue_;
IpcManager() {
ipc_queue_ = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes);
static IpcManager* instance_;
static IpcManager* instance() {
if (!instance_)
instance_ = new IpcManager();
return instance_;
}
std::unique_ptr<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>> threaded_queue_for_client_;
std::unique_ptr<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>> threaded_queue_for_server_;
std::unique_ptr<IpcMessageQueue> ipc_queue_;
enum class Destination {
Client, Server
};
MessageQueue* GetMessageQueue(Destination destination) {
assert(kUseMultipleProcesses);
return destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server;
}
ThreadedQueue<std::unique_ptr<BaseIpcMessage>>* GetThreadedQueue(Destination destination) {
assert(!kUseMultipleProcesses);
return destination == Destination::Client ? threaded_queue_for_client_.get() : threaded_queue_for_server_.get();
}
void SendOutMessageToClient(lsBaseOutMessage& response) {
std::ostringstream sstream;
response.Write(sstream);
Ipc_Cout out;
out.content = sstream.str();
ipc_queue_->SendMessage(&ipc_queue_->for_client, Ipc_Cout::kIpcId, out);
if (kUseMultipleProcesses) {
Ipc_Cout out;
out.content = sstream.str();
ipc_queue_->SendMessage(&ipc_queue_->for_client, Ipc_Cout::kIpcId, out);
}
else {
auto out = MakeUnique<Ipc_Cout>();
out->content = sstream.str();
GetThreadedQueue(Destination::Client)->Enqueue(std::move(out));
}
}
enum class Destination {
Client, Server
};
void SendMessageWithId(Destination destination, IpcId id, BaseIpcMessage& message) {
ipc_queue_->SendMessage(
destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server,
id,
message);
}
template <typename TMessage>
void SendMessage(Destination destination, TMessage& message) {
SendMessageWithId(destination, TMessage::kIpcId, message);
void SendMessage(Destination destination, std::unique_ptr<BaseIpcMessage> message) {
if (kUseMultipleProcesses)
ipc_queue_->SendMessage(GetMessageQueue(destination), message->method_id, *message);
else
GetThreadedQueue(destination)->Enqueue(std::move(message));
}
std::vector<std::unique_ptr<BaseIpcMessage>> GetMessages(Destination destination) {
return ipc_queue_->GetMessages(destination == Destination::Client ? &ipc_queue_->for_client : &ipc_queue_->for_server);
if (kUseMultipleProcesses)
return ipc_queue_->GetMessages(GetMessageQueue(destination));
else
return GetThreadedQueue(destination)->DequeueAll();
}
private:
IpcManager() {
if (kUseMultipleProcesses) {
ipc_queue_ = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes);
}
else {
threaded_queue_for_client_ = MakeUnique<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>>();
threaded_queue_for_server_ = MakeUnique<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>>();
}
}
template<typename T>
void RegisterId(IpcMessageQueue* t) {
t->RegisterId(T::kIpcId,
@ -164,7 +192,7 @@ struct IpcManager {
}
};
IpcManager* IpcManager::instance_ = nullptr;
@ -987,7 +1015,6 @@ void IndexMain(
void QueryDbMainLoop(
QueryDatabase* db,
IpcManager* language_client,
Index_DoIndexQueue* queue_do_index,
Index_DoIdMapQueue* queue_do_id_map,
Index_OnIdMappedQueue* queue_on_id_mapped,
@ -995,10 +1022,11 @@ void QueryDbMainLoop(
Project* project,
WorkingFiles* working_files,
CompletionManager* completion_manager) {
IpcManager* ipc = IpcManager::instance();
std::vector<std::unique_ptr<BaseIpcMessage>> messages = language_client->GetMessages(IpcManager::Destination::Server);
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Server);
for (auto& message : messages) {
//std::cerr << "[querydb] Processing message " << static_cast<int>(message->method_id) << std::endl;
std::cerr << "[querydb] Processing message " << IpcIdToString(message->method_id) << std::endl;
switch (message->method_id) {
case IpcId::Quit: {
@ -1008,8 +1036,8 @@ void QueryDbMainLoop(
}
case IpcId::IsAlive: {
Ipc_IsAlive response;
language_client->SendMessageWithId(IpcManager::Destination::Client, response.method_id, response);
std::cerr << "[querydb] Sending IsAlive response to client" << std::endl;
ipc->SendMessage(IpcManager::Destination::Client, MakeUnique<Ipc_IsAlive>());
break;
}
@ -1051,7 +1079,7 @@ void QueryDbMainLoop(
}
case IpcId::TextDocumentDidClose: {
auto msg = static_cast<Ipc_TextDocumentDidClose*>(message.get());
std::cerr << "Closing " << msg->params.textDocument.uri.GetPath() << std::endl;
//std::cerr << "Closing " << msg->params.textDocument.uri.GetPath() << std::endl;
working_files->OnClose(msg->params);
break;
}
@ -1096,7 +1124,7 @@ void QueryDbMainLoop(
}
response.Write(std::cerr);
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1174,7 +1202,7 @@ void QueryDbMainLoop(
break;
}
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1210,7 +1238,7 @@ void QueryDbMainLoop(
break;
}
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1236,7 +1264,7 @@ void QueryDbMainLoop(
break;
}
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1273,7 +1301,7 @@ void QueryDbMainLoop(
break;
}
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1299,7 +1327,7 @@ void QueryDbMainLoop(
response.result.push_back(info);
}
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1393,7 +1421,7 @@ void QueryDbMainLoop(
};
}
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
@ -1433,13 +1461,12 @@ void QueryDbMainLoop(
}
std::cerr << "- Found " << response.result.size() << " results for query " << query << std::endl;
language_client->SendOutMessageToClient(response);
ipc->SendOutMessageToClient(response);
break;
}
default: {
std::cerr << "1 Unhandled IPC message with kind "
<< static_cast<int>(message->method_id) << std::endl;
std::cerr << "[querydb] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
exit(1);
}
}
@ -1485,7 +1512,6 @@ void QueryDbMain() {
//std::cerr << "Running QueryDb" << std::endl;
// Create queues.
IpcManager ipc;
Index_DoIndexQueue queue_do_index;
Index_DoIdMapQueue queue_do_id_map;
Index_OnIdMappedQueue queue_on_id_mapped;
@ -1506,7 +1532,7 @@ void QueryDbMain() {
// Run query db main loop.
QueryDatabase db;
while (true) {
QueryDbMainLoop(&db, &ipc, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager);
QueryDbMainLoop(&db, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
@ -1581,7 +1607,9 @@ void QueryDbMain() {
// blocks.
//
// |ipc| is connected to a server.
void LanguageServerStdinLoop(IpcManager* ipc) {
void LanguageServerStdinLoop() {
IpcManager* ipc = IpcManager::instance();
while (true) {
std::unique_ptr<BaseIpcMessage> message = MessageRegistry::instance()->ReadMessageFromStdin();
@ -1589,8 +1617,7 @@ void LanguageServerStdinLoop(IpcManager* ipc) {
if (!message)
continue;
//std::cerr << "[info]: Got message of type "
// << IpcIdToString(message->method_id) << std::endl;
std::cerr << "[stdin]: Got message \"" << IpcIdToString(message->method_id) << '"' << std::endl;
switch (message->method_id) {
// TODO: For simplicitly lets just proxy the initialize request like
// all other requests so that stdin loop thread becomes super simple.
@ -1601,9 +1628,9 @@ void LanguageServerStdinLoop(IpcManager* ipc) {
std::cerr << "Initialize in directory " << project_path
<< " with uri " << request->params.rootUri->raw_uri
<< std::endl;
Ipc_OpenProject open_project;
open_project.project_path = project_path;
ipc->SendMessageWithId(IpcManager::Destination::Server, Ipc_OpenProject::kIpcId, open_project);
auto open_project = MakeUnique<Ipc_OpenProject>();
open_project->project_path = project_path;
ipc->SendMessage(IpcManager::Destination::Server, std::move(open_project));
}
// TODO: query request->params.capabilities.textDocument and support only things
@ -1664,42 +1691,43 @@ void LanguageServerStdinLoop(IpcManager* ipc) {
case IpcId::TextDocumentDocumentSymbol:
case IpcId::TextDocumentCodeLens:
case IpcId::WorkspaceSymbol: {
//std::cerr << "Sending message " << (int)message->method_id << std::endl;
ipc->SendMessageWithId(IpcManager::Destination::Server, message->method_id, *message.get());
ipc->SendMessage(IpcManager::Destination::Server, std::move(message));
break;
}
default: {
std::cerr << "3 Unhandled IPC message with kind "
<< static_cast<int>(message->method_id) << std::endl;
std::cerr << "[stdin] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
exit(1);
}
}
}
}
void LanguageServerMainLoop(IpcManager* ipc) {
void LanguageServerMainLoop() {
IpcManager* ipc = IpcManager::instance();
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Client);
for (auto& message : messages) {
std::cerr << "[server] Processing message " << IpcIdToString(message->method_id) << std::endl;
switch (message->method_id) {
case IpcId::Quit: {
std::cerr << "Got quit message (exiting)" << std::endl;
exit(0);
break;
}
case IpcId::Quit: {
std::cerr << "[server] Got quit message (exiting)" << std::endl;
exit(0);
break;
}
case IpcId::Cout: {
auto msg = static_cast<Ipc_Cout*>(message.get());
std::cout << msg->content;
std::cout.flush();
break;
}
case IpcId::Cout: {
auto msg = static_cast<Ipc_Cout*>(message.get());
std::cout << msg->content;
std::cout.flush();
break;
}
default: {
std::cerr << "2 Unhandled IPC message with kind "
<< static_cast<int>(message->method_id) << std::endl;
exit(1);
}
default: {
std::cerr << "[server] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
exit(1);
}
}
}
}
@ -1751,10 +1779,19 @@ void LanguageServerMainLoop(IpcManager* ipc) {
bool IsQueryDbProcessRunning(IpcManager* ipc) {
bool IsQueryDbProcessRunning() {
if (!kUseMultipleProcesses)
return false;
IpcManager* ipc = IpcManager::instance();
// Discard any left-over messages from previous runs.
if (kUseMultipleProcesses)
ipc->GetMessages(IpcManager::Destination::Client);
// Emit an alive check. Sleep so the server has time to respond.
Ipc_IsAlive check_alive;
ipc->SendMessage(IpcManager::Destination::Server, check_alive);
std::cerr << "[setup] Sending IsAlive request to server" << std::endl;
ipc->SendMessage(IpcManager::Destination::Server, MakeUnique<Ipc_IsAlive>());
// TODO: Tune this value or make it configurable.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -1766,16 +1803,14 @@ bool IsQueryDbProcessRunning(IpcManager* ipc) {
return true;
}
// No response back. Clear out server messages so server doesn't respond to stale request.
ipc->GetMessages(IpcManager::Destination::Server);
return false;
}
void LanguageServerMain(std::string process_name) {
IpcManager ipc;
// Discard any left-over messages from previous runs.
ipc.GetMessages(IpcManager::Destination::Client);
bool has_server = IsQueryDbProcessRunning(&ipc);
void LanguageServerMain() {
bool has_server = IsQueryDbProcessRunning();
// No server is running. Start it in-process. If the user wants to run the
// server out of process they have to start it themselves.
@ -1784,9 +1819,9 @@ void LanguageServerMain(std::string process_name) {
}
// Run language client.
new std::thread(&LanguageServerStdinLoop, &ipc);
new std::thread(&LanguageServerStdinLoop);
while (true) {
LanguageServerMainLoop(&ipc);
LanguageServerMainLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
@ -1900,7 +1935,7 @@ int main(int argc, char** argv) {
}
else if (HasOption(options, "--language-server")) {
//std::cerr << "Running language server" << std::endl;
LanguageServerMain(argv[0]);
LanguageServerMain();
return 0;
}
else if (HasOption(options, "--querydb")) {
@ -1910,7 +1945,7 @@ int main(int argc, char** argv) {
}
else {
//std::cerr << "Running language server" << std::endl;
LanguageServerMain(argv[0]);
LanguageServerMain();
return 0;
}

View File

@ -38,8 +38,18 @@ const char* IpcIdToString(IpcId id) {
return "codeLens/resolve";
case IpcId::WorkspaceSymbol:
return "workspace/symbol";
case IpcId::Quit:
return "$quit";
case IpcId::IsAlive:
return "$isAlive";
case IpcId::OpenProject:
return "$openProject";
case IpcId::Cout:
return "$cout";
default:
assert(false);
assert(false && "missing IpcId string name");
exit(1);
}
}

View File

@ -41,7 +41,7 @@ struct PlatformMutexWin : public PlatformMutex {
HANDLE raw_mutex = INVALID_HANDLE_VALUE;
PlatformMutexWin(const std::string& name) {
//std::cerr << "[win] Creating mutex with name " << name << std::endl;
std::cerr << "[win] Creating mutex with name " << name << std::endl;
raw_mutex = CreateMutex(nullptr, false /*initial_owner*/, name.c_str());
CheckForError({ ERROR_ALREADY_EXISTS });
}
@ -72,8 +72,8 @@ struct PlatformSharedMemoryWin : public PlatformSharedMemory {
HANDLE shmem_;
PlatformSharedMemoryWin(const std::string& name, size_t capacity) {
//std::cerr << "[win] Creating shared memory with name " << name
// << " and capacity " << capacity << std::endl;
std::cerr << "[win] Creating shared memory with name " << name
<< " and capacity " << capacity << std::endl;
this->name = name;
shmem_ = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0,

View File

@ -21,6 +21,19 @@ public:
cv_.notify_one();
}
// Return all elements in the queue.
std::vector<T> DequeueAll() {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<T> result;
result.reserve(queue_.size());
while (!queue_.empty()) {
result.emplace_back(std::move(queue_.front()));
queue_.pop();
}
return result;
}
// Get the "front"-element.
// If the queue is empty, wait untill an element is avaiable.
T Dequeue() {
@ -38,7 +51,7 @@ public:
// Get the first element from the queue without blocking. Returns a null
// value if the queue is empty.
optional<T> TryDequeue() {
std::unique_lock<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty())
return nullopt;