Use condition variables instead of sleeping.

While tricky to do because we have multiple queues, this reduces a lot of unnecessary delay. e2e time goes down from 10-15ms on average to 0-3ms on average. Loading from cache is also nearly instant on the cquery codebase.
This commit is contained in:
Jacob Dufault 2017-04-23 15:45:40 -07:00
parent 63908e3aa0
commit 1b2f5896dc
2 changed files with 129 additions and 55 deletions

View File

@ -80,8 +80,8 @@ struct IpcManager {
static IpcManager* instance() {
return instance_;
}
static void CreateInstance() {
instance_ = new IpcManager();
static void CreateInstance(MultiQueueWaiter* waiter) {
instance_ = new IpcManager(waiter);
}
std::unique_ptr<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>> threaded_queue_for_client_;
@ -114,9 +114,9 @@ struct IpcManager {
}
private:
IpcManager() {
threaded_queue_for_client_ = MakeUnique<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>>();
threaded_queue_for_server_ = MakeUnique<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>>();
IpcManager(MultiQueueWaiter* waiter) {
threaded_queue_for_client_ = MakeUnique<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>>(waiter);
threaded_queue_for_server_ = MakeUnique<ThreadedQueue<std::unique_ptr<BaseIpcMessage>>>(waiter);
}
};
@ -1073,18 +1073,20 @@ bool IndexMain_DoCreateIndexUpdate(
return true;
}
void IndexJoinIndexUpdates(Index_OnIndexedQueue* queue_on_indexed) {
bool IndexMergeIndexUpdates(Index_OnIndexedQueue* queue_on_indexed) {
optional<Index_OnIndexed> root = queue_on_indexed->TryDequeue();
if (!root)
return;
return false;
bool did_merge = false;
while (true) {
optional<Index_OnIndexed> to_join = queue_on_indexed->TryDequeue();
if (!to_join) {
queue_on_indexed->Enqueue(std::move(*root));
return;
return did_merge;
}
did_merge = true;
Timer time;
root->update.Merge(to_join->update);
time.ResetAndPrint("[indexer] Joining two querydb updates");
@ -1095,6 +1097,7 @@ void IndexMain(
IndexerConfig* config,
FileConsumer::SharedState* file_consumer_shared,
Project* project,
MultiQueueWaiter* waiter,
Index_DoIndexQueue* queue_do_index,
Index_DoIdMapQueue* queue_do_id_map,
Index_OnIdMappedQueue* queue_on_id_mapped,
@ -1112,15 +1115,20 @@ void IndexMain(
// index.
bool did_index = IndexMain_DoIndex(config, file_consumer_shared, project, queue_do_index, queue_do_id_map);
bool did_create_update = IndexMain_DoCreateIndexUpdate(queue_on_id_mapped, queue_on_indexed);
if (!did_index && !did_create_update) {
bool did_merge = false;
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
IndexJoinIndexUpdates(queue_on_indexed);
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
if (!did_index && !did_create_update)
did_merge = IndexMergeIndexUpdates(queue_on_indexed);
// TODO: use CV to wakeup?
std::this_thread::sleep_for(std::chrono::milliseconds(25));
}
// We didn't do any work, so wait for a notification.
if (!did_index && !did_create_update && !did_merge)
waiter->Wait({
queue_do_index,
queue_on_id_mapped,
queue_on_indexed
});
}
}
@ -1173,9 +1181,10 @@ void IndexMain(
void QueryDbMainLoop(
bool QueryDbMainLoop(
IndexerConfig* config,
QueryDatabase* db,
MultiQueueWaiter* waiter,
Index_DoIndexQueue* queue_do_index,
Index_DoIdMapQueue* queue_do_id_map,
Index_OnIdMappedQueue* queue_on_id_mapped,
@ -1186,8 +1195,11 @@ void QueryDbMainLoop(
CompletionManager* completion_manager) {
IpcManager* ipc = IpcManager::instance();
bool did_work = false;
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Server);
for (auto& message : messages) {
did_work = true;
std::cerr << "[querydb] Processing message " << IpcIdToString(message->method_id) << std::endl;
switch (message->method_id) {
@ -1223,7 +1235,7 @@ void QueryDbMainLoop(
std::cerr << "[querydb] Starting " << indexer_count << " indexers" << std::endl;
for (int i = 0; i < indexer_count; ++i) {
new std::thread([&]() {
IndexMain(config, file_consumer_shared, project, queue_do_index, queue_do_id_map, queue_on_id_mapped, queue_on_indexed);
IndexMain(config, file_consumer_shared, project, waiter, queue_do_index, queue_do_id_map, queue_on_id_mapped, queue_on_indexed);
});
}
@ -1738,6 +1750,7 @@ void QueryDbMainLoop(
if (!request)
break;
did_work = true;
Index_OnIdMapped response;
Timer time;
@ -1760,6 +1773,8 @@ void QueryDbMainLoop(
if (!response)
break;
did_work = true;
Timer time;
for (auto& updated_file : response->update.files_def_update) {
@ -1787,14 +1802,16 @@ void QueryDbMainLoop(
db->ApplyIndexUpdate(&response->update);
time.ResetAndPrint("[querydb] Applying index update");
}
return did_work;
}
void QueryDbMain(IndexerConfig* config) {
void QueryDbMain(IndexerConfig* config, MultiQueueWaiter* waiter) {
// Create queues.
Index_DoIndexQueue queue_do_index;
Index_DoIdMapQueue queue_do_id_map;
Index_OnIdMappedQueue queue_on_id_mapped;
Index_OnIndexedQueue queue_on_indexed;
Index_DoIndexQueue queue_do_index(waiter);
Index_DoIdMapQueue queue_do_id_map(waiter);
Index_OnIdMappedQueue queue_on_id_mapped(waiter);
Index_OnIndexedQueue queue_on_indexed(waiter);
Project project;
WorkingFiles working_files;
@ -1805,8 +1822,15 @@ void QueryDbMain(IndexerConfig* config) {
SetCurrentThreadName("querydb");
QueryDatabase db;
while (true) {
QueryDbMainLoop(config, &db, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &file_consumer_shared, &working_files, &completion_manager);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
bool did_work = QueryDbMainLoop(config, &db, waiter, &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &file_consumer_shared, &working_files, &completion_manager);
if (!did_work) {
IpcManager* ipc = IpcManager::instance();
waiter->Wait({
ipc->threaded_queue_for_server_.get(),
&queue_do_id_map,
&queue_on_indexed
});
}
}
}
@ -1978,34 +2002,42 @@ void LanguageServerStdinLoop(IndexerConfig* config, std::unordered_map<IpcId, Ti
void StdoutMainLoop(std::unordered_map<IpcId, Timer>* request_times) {
void StdoutMain(std::unordered_map<IpcId, Timer>* request_times, MultiQueueWaiter* waiter) {
SetCurrentThreadName("stdout");
IpcManager* ipc = IpcManager::instance();
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Client);
for (auto& message : messages) {
std::cerr << "[stdout] Processing message " << IpcIdToString(message->method_id) << std::endl;
while (true) {
std::vector<std::unique_ptr<BaseIpcMessage>> messages = ipc->GetMessages(IpcManager::Destination::Client);
if (messages.empty()) {
waiter->Wait({ipc->threaded_queue_for_client_.get()});
continue;
}
switch (message->method_id) {
case IpcId::Cout: {
auto msg = static_cast<Ipc_Cout*>(message.get());
for (auto& message : messages) {
std::cerr << "[stdout] Processing message " << IpcIdToString(message->method_id) << std::endl;
Timer time = (*request_times)[msg->original_ipc_id];
time.ResetAndPrint("[e2e] Running " + std::string(IpcIdToString(msg->original_ipc_id)));
switch (message->method_id) {
case IpcId::Cout: {
auto msg = static_cast<Ipc_Cout*>(message.get());
std::cout << msg->content;
std::cout.flush();
break;
}
Timer time = (*request_times)[msg->original_ipc_id];
time.ResetAndPrint("[e2e] Running " + std::string(IpcIdToString(msg->original_ipc_id)));
default: {
std::cerr << "[stdout] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
exit(1);
std::cout << msg->content;
std::cout.flush();
break;
}
default: {
std::cerr << "[stdout] Unhandled IPC message " << IpcIdToString(message->method_id) << std::endl;
exit(1);
}
}
}
}
}
void LanguageServerMain(IndexerConfig* config) {
void LanguageServerMain(IndexerConfig* config, MultiQueueWaiter* waiter) {
std::unordered_map<IpcId, Timer> request_times;
// Start stdin reader. Reading from stdin is a blocking operation so this
@ -2015,17 +2047,13 @@ void LanguageServerMain(IndexerConfig* config) {
});
// Start querydb thread. querydb will start indexer threads as needed.
new std::thread([&config]() {
QueryDbMain(config);
new std::thread([&]() {
QueryDbMain(config, waiter);
});
// We run a dedicated thread for writing to stdout because there can be an
// unknown number of delays when output information.
SetCurrentThreadName("stdout");
while (true) {
StdoutMainLoop(&request_times);
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
StdoutMain(&request_times, waiter);
}
@ -2079,7 +2107,8 @@ void LanguageServerMain(IndexerConfig* config) {
int main(int argc, char** argv) {
IpcManager::CreateInstance();
MultiQueueWaiter waiter;
IpcManager::CreateInstance(&waiter);
//bool loop = true;
//while (loop)
@ -2107,7 +2136,7 @@ int main(int argc, char** argv) {
else if (HasOption(options, "--language-server")) {
//std::cerr << "Running language server" << std::endl;
IndexerConfig config;
LanguageServerMain(&config);
LanguageServerMain(&config, &waiter);
return 0;
}
else {

View File

@ -3,29 +3,67 @@
#include <optional.h>
#include <algorithm>
#include <atomic>
#include <queue>
#include <mutex>
#include <condition_variable>
// TODO: cleanup includes.
struct BaseThreadQueue {
virtual bool IsEmpty() = 0;
};
struct MultiQueueWaiter {
std::mutex m;
std::condition_variable cv;
bool HasState(std::initializer_list<BaseThreadQueue*> queues) {
for (BaseThreadQueue* queue : queues) {
if (!queue->IsEmpty())
return true;
}
return false;
}
void Wait(std::initializer_list<BaseThreadQueue*> queues) {
// We cannot have a single condition variable wait on all of the different
// mutexes, so we have a global condition variable that every queue will
// notify. After it is notified we check to see if any of the queues have
// data; if they do, we return.
//
// We repoll every 5 seconds because it's not possible to atomically check
// the state of every queue and then setup the condition variable. So, if
// Wait() is called, HasState() returns false, and then in the time after
// HasState() is called data gets posted but before we begin waiting for
// the condition variable, we will miss the notification. The timeout of 5
// means that if this happens we will delay operation for 5 seconds.
while (!HasState(queues)) {
std::unique_lock<std::mutex> l(m);
cv.wait_for(l, std::chrono::seconds(5));
}
}
};
// A threadsafe-queue. http://stackoverflow.com/a/16075550
template <class T>
class ThreadedQueue {
struct ThreadedQueue : public BaseThreadQueue {
public:
ThreadedQueue(MultiQueueWaiter* waiter) : waiter_(waiter) {}
// Add an element to the front of the queue.
void PriorityEnqueue(T&& t) {
std::lock_guard<std::mutex> lock(mutex_);
priority_.push(std::move(t));
cv_.notify_one();
waiter_->cv.notify_all();
}
// Add an element to the queue.
void Enqueue(T&& t) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(t));
cv_.notify_one();
waiter_->cv.notify_all();
}
// Return all elements in the queue.
@ -45,6 +83,12 @@ public:
return result;
}
bool IsEmpty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
/*
// Get the "front"-element.
// If the queue is empty, wait untill an element is avaiable.
T Dequeue() {
@ -64,6 +108,7 @@ public:
queue_.pop();
return val;
}
*/
// Get the first element from the queue without blocking. Returns a null
// value if the queue is empty.
@ -85,7 +130,7 @@ public:
private:
std::queue<T> priority_;
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
MultiQueueWaiter* waiter_;
};