From a14ddc69ac4355fdced4901eb09e5024c296f469 Mon Sep 17 00:00:00 2001 From: Fangrui Song Date: Mon, 1 Jan 2018 23:40:36 -0800 Subject: [PATCH] Split MultiQueueWaiter into {querydb,indexer,stdout}waiter to solve thundering herd problem (#217) See https://github.com/jacobdufault/cquery/pull/213#issuecomment-354706992 --- src/command_line.cc | 26 ++++++++++++++++---------- src/queue_manager.cc | 25 +++++++++++++++---------- src/queue_manager.h | 14 +++++++++++--- src/threaded_queue.h | 19 +++++++++++++++---- 4 files changed, 57 insertions(+), 27 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index e3fbd9b4..10be4cd2 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -142,7 +142,8 @@ bool QueryDbMainLoop(Config* config, void RunQueryDbThread(const std::string& bin_name, Config* config, - MultiQueueWaiter* waiter) { + MultiQueueWaiter* querydb_waiter, + MultiQueueWaiter* indexer_waiter) { Project project; SemanticHighlightSymbolCache semantic_cache; WorkingFiles working_files; @@ -169,7 +170,7 @@ void RunQueryDbThread(const std::string& bin_name, for (MessageHandler* handler : *MessageHandler::message_handlers) { handler->config = config; handler->db = &db; - handler->waiter = waiter; + handler->waiter = indexer_waiter; handler->project = &project; handler->file_consumer_shared = &file_consumer_shared; handler->import_manager = &import_manager; @@ -189,7 +190,7 @@ void RunQueryDbThread(const std::string& bin_name, SetCurrentThreadName("querydb"); while (true) { bool did_work = QueryDbMainLoop( - config, &db, waiter, &project, &file_consumer_shared, &import_manager, + config, &db, querydb_waiter, &project, &file_consumer_shared, &import_manager, ×tamp_manager, &semantic_cache, &working_files, &clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); @@ -199,7 +200,8 @@ void RunQueryDbThread(const std::string& bin_name, if (!did_work) { auto* queue = QueueManager::instance(); - waiter->Wait(&queue->on_indexed, &queue->for_querydb, &queue->do_id_map); + querydb_waiter->Wait(&queue->on_indexed, &queue->for_querydb, + &queue->do_id_map); } } } @@ -347,18 +349,20 @@ void LaunchStdoutThread(std::unordered_map* request_times, void LanguageServerMain(const std::string& bin_name, Config* config, - MultiQueueWaiter* waiter) { + MultiQueueWaiter* querydb_waiter, + MultiQueueWaiter* indexer_waiter, + MultiQueueWaiter* stdout_waiter) { std::unordered_map request_times; LaunchStdinLoop(config, &request_times); // We run a dedicated thread for writing to stdout because there can be an // unknown number of delays when output information. - LaunchStdoutThread(&request_times, waiter); + LaunchStdoutThread(&request_times, stdout_waiter); // Start querydb which takes over this thread. The querydb will launch // indexer threads as needed. - RunQueryDbThread(bin_name, config, waiter); + RunQueryDbThread(bin_name, config, querydb_waiter, indexer_waiter); } //////////////////////////////////////////////////////////////////////////////// @@ -388,8 +392,9 @@ int main(int argc, char** argv) { loguru::g_flush_interval_ms = 0; loguru::init(argc, argv); - MultiQueueWaiter waiter; - QueueManager::CreateInstance(&waiter); + MultiQueueWaiter querydb_waiter, indexer_waiter, stdout_waiter; + QueueManager::CreateInstance(&querydb_waiter, &indexer_waiter, + &stdout_waiter); // bool loop = true; // while (loop) @@ -438,7 +443,8 @@ int main(int argc, char** argv) { print_help = false; // std::cerr << "Running language server" << std::endl; auto config = MakeUnique(); - LanguageServerMain(argv[0], config.get(), &waiter); + LanguageServerMain(argv[0], config.get(), &querydb_waiter, &indexer_waiter, + &stdout_waiter); return 0; } diff --git a/src/queue_manager.cc b/src/queue_manager.cc index c6afe982..6ccda5e7 100644 --- a/src/queue_manager.cc +++ b/src/queue_manager.cc @@ -48,8 +48,10 @@ QueueManager* QueueManager::instance() { } // static -void QueueManager::CreateInstance(MultiQueueWaiter* waiter) { - instance_ = new QueueManager(waiter); +void QueueManager::CreateInstance(MultiQueueWaiter* querydb_waiter, + MultiQueueWaiter* indexer_waiter, + MultiQueueWaiter* stdout_waiter) { + instance_ = new QueueManager(querydb_waiter, indexer_waiter, stdout_waiter); } // static @@ -63,14 +65,17 @@ void QueueManager::WriteStdout(IpcId id, lsBaseOutMessage& response) { instance()->for_stdout.Enqueue(std::move(out)); } -QueueManager::QueueManager(MultiQueueWaiter* waiter) - : for_stdout(waiter), - for_querydb(waiter), - index_request(waiter), - do_id_map(waiter), - load_previous_index(waiter), - on_id_mapped(waiter), - on_indexed(waiter) {} +QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter, + MultiQueueWaiter* indexer_waiter, + MultiQueueWaiter* stdout_waiter) + : for_stdout(stdout_waiter), + for_querydb(querydb_waiter), + do_id_map(querydb_waiter), + index_request(indexer_waiter), + load_previous_index(indexer_waiter), + on_id_mapped(indexer_waiter), + // TODO on_indexed is shared by "querydb" and "indexer" + on_indexed(querydb_waiter, indexer_waiter) {} bool QueueManager::HasWork() { return !index_request.IsEmpty() || !do_id_map.IsEmpty() || diff --git a/src/queue_manager.h b/src/queue_manager.h index 98acd0a2..f652aa57 100644 --- a/src/queue_manager.h +++ b/src/queue_manager.h @@ -71,25 +71,33 @@ struct Index_OnIndexed { struct QueueManager { static QueueManager* instance(); - static void CreateInstance(MultiQueueWaiter* waiter); + static void CreateInstance(MultiQueueWaiter* querydb_waiter, + MultiQueueWaiter* indexer_waiter, + MultiQueueWaiter* stdout_waiter); static void WriteStdout(IpcId id, lsBaseOutMessage& response); bool HasWork(); // Runs on stdout thread. ThreadedQueue for_stdout; + // Runs on querydb thread. ThreadedQueue> for_querydb; + ThreadedQueue do_id_map; // Runs on indexer threads. ThreadedQueue index_request; - ThreadedQueue do_id_map; ThreadedQueue load_previous_index; ThreadedQueue on_id_mapped; + + // Shared by querydb and indexer. + // TODO split on_indexed ThreadedQueue on_indexed; private: - explicit QueueManager(MultiQueueWaiter* waiter); + explicit QueueManager(MultiQueueWaiter* querydb_waiter, + MultiQueueWaiter* indexer_waiter, + MultiQueueWaiter* stdout_waiter); static QueueManager* instance_; }; diff --git a/src/threaded_queue.h b/src/threaded_queue.h index 414ec933..cb776aff 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -102,10 +102,14 @@ struct ThreadedQueue : public BaseThreadQueue { ThreadedQueue() : total_count_(0) { owned_waiter_ = MakeUnique(); waiter_ = owned_waiter_.get(); + owned_waiter1_ = MakeUnique(); + waiter1_ = owned_waiter1_.get(); } - explicit ThreadedQueue(MultiQueueWaiter* waiter) - : total_count_(0), waiter_(waiter) {} + // TODO remove waiter1 after split of on_indexed + explicit ThreadedQueue(MultiQueueWaiter* waiter, + MultiQueueWaiter* waiter1 = nullptr) + : total_count_(0), waiter_(waiter), waiter1_(waiter1) {} // Returns the number of elements in the queue. This is lock-free. size_t Size() const { return total_count_; } @@ -115,7 +119,9 @@ struct ThreadedQueue : public BaseThreadQueue { std::lock_guard lock(mutex_); priority_.push(std::move(t)); ++total_count_; - waiter_->cv.notify_all(); + waiter_->cv.notify_one(); + if (waiter1_) + waiter1_->cv.notify_one(); } // Add an element to the queue. @@ -123,7 +129,9 @@ struct ThreadedQueue : public BaseThreadQueue { std::lock_guard lock(mutex_); queue_.push(std::move(t)); ++total_count_; - waiter_->cv.notify_all(); + waiter_->cv.notify_one(); + if (waiter1_) + waiter1_->cv.notify_one(); } // Add a set of elements to the queue. @@ -227,4 +235,7 @@ struct ThreadedQueue : public BaseThreadQueue { std::queue queue_; MultiQueueWaiter* waiter_; std::unique_ptr owned_waiter_; + // TODO remove waiter1 after split of on_indexed + MultiQueueWaiter* waiter1_; + std::unique_ptr owned_waiter1_; };