Split MultiQueueWaiter into {querydb,indexer,stdout}waiter to solve thundering herd problem (#217)

See https://github.com/jacobdufault/cquery/pull/213#issuecomment-354706992
This commit is contained in:
Fangrui Song 2018-01-01 23:40:36 -08:00 committed by GitHub
parent 0b53c871dc
commit a14ddc69ac
4 changed files with 57 additions and 27 deletions

View File

@ -142,7 +142,8 @@ bool QueryDbMainLoop(Config* config,
void RunQueryDbThread(const std::string& bin_name,
Config* config,
MultiQueueWaiter* waiter) {
MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter) {
Project project;
SemanticHighlightSymbolCache semantic_cache;
WorkingFiles working_files;
@ -169,7 +170,7 @@ void RunQueryDbThread(const std::string& bin_name,
for (MessageHandler* handler : *MessageHandler::message_handlers) {
handler->config = config;
handler->db = &db;
handler->waiter = waiter;
handler->waiter = indexer_waiter;
handler->project = &project;
handler->file_consumer_shared = &file_consumer_shared;
handler->import_manager = &import_manager;
@ -189,7 +190,7 @@ void RunQueryDbThread(const std::string& bin_name,
SetCurrentThreadName("querydb");
while (true) {
bool did_work = QueryDbMainLoop(
config, &db, waiter, &project, &file_consumer_shared, &import_manager,
config, &db, querydb_waiter, &project, &file_consumer_shared, &import_manager,
&timestamp_manager, &semantic_cache, &working_files, &clang_complete,
&include_complete, global_code_complete_cache.get(),
non_global_code_complete_cache.get(), signature_cache.get());
@ -199,7 +200,8 @@ void RunQueryDbThread(const std::string& bin_name,
if (!did_work) {
auto* queue = QueueManager::instance();
waiter->Wait(&queue->on_indexed, &queue->for_querydb, &queue->do_id_map);
querydb_waiter->Wait(&queue->on_indexed, &queue->for_querydb,
&queue->do_id_map);
}
}
}
@ -347,18 +349,20 @@ void LaunchStdoutThread(std::unordered_map<IpcId, Timer>* request_times,
void LanguageServerMain(const std::string& bin_name,
Config* config,
MultiQueueWaiter* waiter) {
MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter,
MultiQueueWaiter* stdout_waiter) {
std::unordered_map<IpcId, Timer> request_times;
LaunchStdinLoop(config, &request_times);
// We run a dedicated thread for writing to stdout because there can be an
// unknown number of delays when output information.
LaunchStdoutThread(&request_times, waiter);
LaunchStdoutThread(&request_times, stdout_waiter);
// Start querydb which takes over this thread. The querydb will launch
// indexer threads as needed.
RunQueryDbThread(bin_name, config, waiter);
RunQueryDbThread(bin_name, config, querydb_waiter, indexer_waiter);
}
////////////////////////////////////////////////////////////////////////////////
@ -388,8 +392,9 @@ int main(int argc, char** argv) {
loguru::g_flush_interval_ms = 0;
loguru::init(argc, argv);
MultiQueueWaiter waiter;
QueueManager::CreateInstance(&waiter);
MultiQueueWaiter querydb_waiter, indexer_waiter, stdout_waiter;
QueueManager::CreateInstance(&querydb_waiter, &indexer_waiter,
&stdout_waiter);
// bool loop = true;
// while (loop)
@ -438,7 +443,8 @@ int main(int argc, char** argv) {
print_help = false;
// std::cerr << "Running language server" << std::endl;
auto config = MakeUnique<Config>();
LanguageServerMain(argv[0], config.get(), &waiter);
LanguageServerMain(argv[0], config.get(), &querydb_waiter, &indexer_waiter,
&stdout_waiter);
return 0;
}

View File

@ -48,8 +48,10 @@ QueueManager* QueueManager::instance() {
}
// static
void QueueManager::CreateInstance(MultiQueueWaiter* waiter) {
instance_ = new QueueManager(waiter);
void QueueManager::CreateInstance(MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter,
MultiQueueWaiter* stdout_waiter) {
instance_ = new QueueManager(querydb_waiter, indexer_waiter, stdout_waiter);
}
// static
@ -63,14 +65,17 @@ void QueueManager::WriteStdout(IpcId id, lsBaseOutMessage& response) {
instance()->for_stdout.Enqueue(std::move(out));
}
QueueManager::QueueManager(MultiQueueWaiter* waiter)
: for_stdout(waiter),
for_querydb(waiter),
index_request(waiter),
do_id_map(waiter),
load_previous_index(waiter),
on_id_mapped(waiter),
on_indexed(waiter) {}
QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter,
MultiQueueWaiter* stdout_waiter)
: for_stdout(stdout_waiter),
for_querydb(querydb_waiter),
do_id_map(querydb_waiter),
index_request(indexer_waiter),
load_previous_index(indexer_waiter),
on_id_mapped(indexer_waiter),
// TODO on_indexed is shared by "querydb" and "indexer"
on_indexed(querydb_waiter, indexer_waiter) {}
bool QueueManager::HasWork() {
return !index_request.IsEmpty() || !do_id_map.IsEmpty() ||

View File

@ -71,25 +71,33 @@ struct Index_OnIndexed {
struct QueueManager {
static QueueManager* instance();
static void CreateInstance(MultiQueueWaiter* waiter);
static void CreateInstance(MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter,
MultiQueueWaiter* stdout_waiter);
static void WriteStdout(IpcId id, lsBaseOutMessage& response);
bool HasWork();
// Runs on stdout thread.
ThreadedQueue<Stdout_Request> for_stdout;
// Runs on querydb thread.
ThreadedQueue<std::unique_ptr<BaseIpcMessage>> for_querydb;
ThreadedQueue<Index_DoIdMap> do_id_map;
// Runs on indexer threads.
ThreadedQueue<Index_Request> index_request;
ThreadedQueue<Index_DoIdMap> do_id_map;
ThreadedQueue<Index_DoIdMap> load_previous_index;
ThreadedQueue<Index_OnIdMapped> on_id_mapped;
// Shared by querydb and indexer.
// TODO split on_indexed
ThreadedQueue<Index_OnIndexed> on_indexed;
private:
explicit QueueManager(MultiQueueWaiter* waiter);
explicit QueueManager(MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter,
MultiQueueWaiter* stdout_waiter);
static QueueManager* instance_;
};

View File

@ -102,10 +102,14 @@ struct ThreadedQueue : public BaseThreadQueue {
ThreadedQueue() : total_count_(0) {
owned_waiter_ = MakeUnique<MultiQueueWaiter>();
waiter_ = owned_waiter_.get();
owned_waiter1_ = MakeUnique<MultiQueueWaiter>();
waiter1_ = owned_waiter1_.get();
}
explicit ThreadedQueue(MultiQueueWaiter* waiter)
: total_count_(0), waiter_(waiter) {}
// TODO remove waiter1 after split of on_indexed
explicit ThreadedQueue(MultiQueueWaiter* waiter,
MultiQueueWaiter* waiter1 = nullptr)
: total_count_(0), waiter_(waiter), waiter1_(waiter1) {}
// Returns the number of elements in the queue. This is lock-free.
size_t Size() const { return total_count_; }
@ -115,7 +119,9 @@ struct ThreadedQueue : public BaseThreadQueue {
std::lock_guard<std::mutex> lock(mutex_);
priority_.push(std::move(t));
++total_count_;
waiter_->cv.notify_all();
waiter_->cv.notify_one();
if (waiter1_)
waiter1_->cv.notify_one();
}
// Add an element to the queue.
@ -123,7 +129,9 @@ struct ThreadedQueue : public BaseThreadQueue {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(t));
++total_count_;
waiter_->cv.notify_all();
waiter_->cv.notify_one();
if (waiter1_)
waiter1_->cv.notify_one();
}
// Add a set of elements to the queue.
@ -227,4 +235,7 @@ struct ThreadedQueue : public BaseThreadQueue {
std::queue<T> queue_;
MultiQueueWaiter* waiter_;
std::unique_ptr<MultiQueueWaiter> owned_waiter_;
// TODO remove waiter1 after split of on_indexed
MultiQueueWaiter* waiter1_;
std::unique_ptr<MultiQueueWaiter> owned_waiter1_;
};