From cd96cb9570c812014cff37674e2b03410c484f17 Mon Sep 17 00:00:00 2001 From: Fangrui Song Date: Sun, 4 Feb 2018 22:03:22 -0800 Subject: [PATCH] threaded_queue: queue -> deque IndexMergeIndexUpdates: use TryPopBack() and see --- src/clang_complete.cc | 4 +- src/command_line.cc | 2 +- src/import_pipeline.cc | 26 +++--- src/messages/cquery_freshen_index.cc | 5 +- src/messages/cquery_index_file.cc | 2 +- src/messages/initialize.cc | 2 +- src/messages/text_document_did_open.cc | 5 +- src/messages/text_document_did_save.cc | 6 +- .../workspace_did_change_watched_files.cc | 4 +- src/queue_manager.cc | 2 +- src/threaded_queue.h | 83 ++++++++++--------- 11 files changed, 76 insertions(+), 65 deletions(-) diff --git a/src/clang_complete.cc b/src/clang_complete.cc index 7f140101..da6fe90d 100644 --- a/src/clang_complete.cc +++ b/src/clang_complete.cc @@ -689,7 +689,7 @@ void ClangCompleteManager::NotifyView(const std::string& filename) { // Only reparse the file if we create a new CompletionSession. if (EnsureCompletionOrCreatePreloadSession(filename)) - parse_requests_.PriorityEnqueue(ParseRequest(filename)); + parse_requests_.PushBack(ParseRequest(filename), true); } void ClangCompleteManager::NotifyEdit(const std::string& filename) { @@ -708,7 +708,7 @@ void ClangCompleteManager::NotifySave(const std::string& filename) { // EnsureCompletionOrCreatePreloadSession(filename); - parse_requests_.PriorityEnqueue(ParseRequest(filename)); + parse_requests_.PushBack(ParseRequest(filename), true); } void ClangCompleteManager::NotifyClose(const std::string& filename) { diff --git a/src/command_line.cc b/src/command_line.cc index c252641a..6ca7236f 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -344,7 +344,7 @@ void LaunchStdinLoop(Config* config, case IpcId::CqueryDerived: case IpcId::CqueryIndexFile: case IpcId::CqueryWait: { - queue->for_querydb.Enqueue(std::move(message)); + queue->for_querydb.PushBack(std::move(message)); break; } diff --git a/src/import_pipeline.cc b/src/import_pipeline.cc index b0588733..182b9637 100644 --- a/src/import_pipeline.cc +++ b/src/import_pipeline.cc @@ -393,7 +393,7 @@ bool IndexMain_DoParse( ImportManager* import_manager, IIndexer* indexer) { auto* queue = QueueManager::instance(); - optional request = queue->index_request.TryDequeue(); + optional request = queue->index_request.TryPopFront(); if (!request) return false; @@ -408,7 +408,7 @@ bool IndexMain_DoParse( bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) { auto* queue = QueueManager::instance(); - optional response = queue->on_id_mapped.TryDequeue(); + optional response = queue->on_id_mapped.TryPopFront(); if (!response) return false; @@ -466,14 +466,14 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) { #endif Index_OnIndexed reply(update, response->perf); - queue->on_indexed.Enqueue(std::move(reply), response->is_interactive); + queue->on_indexed.PushBack(std::move(reply), response->is_interactive); return true; } bool IndexMain_LoadPreviousIndex() { auto* queue = QueueManager::instance(); - optional response = queue->load_previous_index.TryDequeue(); + optional response = queue->load_previous_index.TryPopFront(); if (!response) return false; @@ -482,21 +482,21 @@ bool IndexMain_LoadPreviousIndex() { << "Unable to load previous index for already imported index " << response->current->path; - queue->do_id_map.Enqueue(std::move(*response)); + queue->do_id_map.PushBack(std::move(*response)); return true; } bool IndexMergeIndexUpdates() { auto* queue = QueueManager::instance(); - optional root = queue->on_indexed.TryDequeue(); + optional root = queue->on_indexed.TryPopBack(); if (!root) return false; bool did_merge = false; while (true) { - optional to_join = queue->on_indexed.TryDequeue(); + optional to_join = queue->on_indexed.TryPopBack(); if (!to_join) { - queue->on_indexed.Enqueue(std::move(*root)); + queue->on_indexed.PushFront(std::move(*root)); return did_merge; } @@ -633,7 +633,7 @@ void QueryDb_DoIdMap(QueueManager* queue, db->usr_to_file.end()) { assert(!request->load_previous); request->load_previous = true; - queue->load_previous_index.Enqueue(std::move(*request)); + queue->load_previous_index.PushBack(std::move(*request)); return; } @@ -665,7 +665,7 @@ void QueryDb_DoIdMap(QueueManager* queue, response.previous = make_map(std::move(request->previous)); response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); - queue->on_id_mapped.Enqueue(std::move(response)); + queue->on_id_mapped.PushBack(std::move(response)); } void QueryDb_OnIndexed(QueueManager* queue, @@ -721,7 +721,7 @@ bool QueryDb_ImportMain(Config* config, bool did_work = false; while (true) { - optional request = queue->do_id_map.TryDequeue(); + optional request = queue->do_id_map.TryPopFront(); if (!request) break; did_work = true; @@ -729,7 +729,7 @@ bool QueryDb_ImportMain(Config* config, } while (true) { - optional response = queue->on_indexed.TryDequeue(); + optional response = queue->on_indexed.TryPopFront(); if (!response) break; did_work = true; @@ -762,7 +762,7 @@ TEST_SUITE("ImportPipeline") { const std::vector& args = {}, bool is_interactive = false, const std::string& contents = "void foo();") { - queue->index_request.Enqueue( + queue->index_request.PushBack( Index_Request(path, args, is_interactive, contents, cache_manager)); } diff --git a/src/messages/cquery_freshen_index.cc b/src/messages/cquery_freshen_index.cc index 707e4156..d731256f 100644 --- a/src/messages/cquery_freshen_index.cc +++ b/src/messages/cquery_freshen_index.cc @@ -97,8 +97,9 @@ struct CqueryFreshenIndexHandler : BaseMessageHandler { } bool is_interactive = working_files->GetFileByFilename(entry.filename) != nullptr; - queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, - is_interactive, *content, ICacheManager::Make(config))); + queue->index_request.PushBack( + Index_Request(entry.filename, entry.args, is_interactive, + *content, ICacheManager::Make(config))); }); } }; diff --git a/src/messages/cquery_index_file.cc b/src/messages/cquery_index_file.cc index f9889c5b..dd7ed0fe 100644 --- a/src/messages/cquery_index_file.cc +++ b/src/messages/cquery_index_file.cc @@ -27,7 +27,7 @@ REGISTER_IPC_MESSAGE(Ipc_CqueryIndexFile); struct CqueryIndexFileHandler : BaseMessageHandler { void Run(Ipc_CqueryIndexFile* request) override { LOG_S(INFO) << "Indexing file " << request->params.path; - QueueManager::instance()->index_request.Enqueue(Index_Request( + QueueManager::instance()->index_request.PushBack(Index_Request( NormalizePath(request->params.path), request->params.args, request->params.is_interactive, request->params.contents, ICacheManager::Make(config))); } diff --git a/src/messages/initialize.cc b/src/messages/initialize.cc index 1aa203a1..c6658234 100644 --- a/src/messages/initialize.cc +++ b/src/messages/initialize.cc @@ -624,7 +624,7 @@ struct InitializeHandler : BaseMessageHandler { } bool is_interactive = working_files->GetFileByFilename(entry.filename) != nullptr; - queue->index_request.Enqueue(Index_Request( + queue->index_request.PushBack(Index_Request( entry.filename, entry.args, is_interactive, *content, ICacheManager::Make(config), request->id)); }); diff --git a/src/messages/text_document_did_open.cc b/src/messages/text_document_did_open.cc index 1e3e091d..a16c0fb7 100644 --- a/src/messages/text_document_did_open.cc +++ b/src/messages/text_document_did_open.cc @@ -56,9 +56,10 @@ struct TextDocumentDidOpenHandler // Submit new index request. const Project::Entry& entry = project->FindCompilationEntryForFile(path); - QueueManager::instance()->index_request.PriorityEnqueue( + QueueManager::instance()->index_request.PushBack( Index_Request(entry.filename, entry.args, true /*is_interactive*/, - request->params.textDocument.text, cache_manager)); + request->params.textDocument.text, cache_manager), + true /* priority */); } }; REGISTER_MESSAGE_HANDLER(TextDocumentDidOpenHandler); diff --git a/src/messages/text_document_did_save.cc b/src/messages/text_document_did_save.cc index 256b39c6..f7629287 100644 --- a/src/messages/text_document_did_save.cc +++ b/src/messages/text_document_did_save.cc @@ -49,8 +49,10 @@ struct TextDocumentDidSaveHandler LOG_S(ERROR) << "Unable to read file content after saving " << path; } else { Project::Entry entry = project->FindCompilationEntryForFile(path); - QueueManager::instance()->index_request.Enqueue(Index_Request( - entry.filename, entry.args, true /*is_interactive*/, *content, ICacheManager::Make(config))); + QueueManager::instance()->index_request.PushBack( + Index_Request(entry.filename, entry.args, true /*is_interactive*/, + *content, ICacheManager::Make(config)), + true); } clang_complete->NotifySave(path); diff --git a/src/messages/workspace_did_change_watched_files.cc b/src/messages/workspace_did_change_watched_files.cc index a5f4a311..a2a3fb05 100644 --- a/src/messages/workspace_did_change_watched_files.cc +++ b/src/messages/workspace_did_change_watched_files.cc @@ -52,7 +52,7 @@ struct WorkspaceDidChangeWatchedFilesHandler if (!content) LOG_S(ERROR) << "Unable to read file content after saving " << path; else { - QueueManager::instance()->index_request.Enqueue( + QueueManager::instance()->index_request.PushBack( Index_Request(path, entry.args, is_interactive, *content, ICacheManager::Make(config))); if (is_interactive) clang_complete->NotifySave(path); @@ -60,7 +60,7 @@ struct WorkspaceDidChangeWatchedFilesHandler break; } case lsFileChangeType::Deleted: - QueueManager::instance()->index_request.Enqueue( + QueueManager::instance()->index_request.PushBack( Index_Request(path, entry.args, is_interactive, std::string(), ICacheManager::Make(config))); break; } diff --git a/src/queue_manager.cc b/src/queue_manager.cc index c9308cad..d36a796e 100644 --- a/src/queue_manager.cc +++ b/src/queue_manager.cc @@ -73,7 +73,7 @@ void QueueManager::WriteStdout(IpcId id, lsBaseOutMessage& response) { Stdout_Request out; out.content = sstream.str(); out.id = id; - instance()->for_stdout.Enqueue(std::move(out)); + instance()->for_stdout.PushBack(std::move(out)); } QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter, diff --git a/src/threaded_queue.h b/src/threaded_queue.h index e0c2a9b2..392e056b 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -8,8 +8,8 @@ #include #include #include +#include #include -#include #include // TODO: cleanup includes. @@ -111,21 +111,25 @@ struct ThreadedQueue : public BaseThreadQueue { size_t Size() const { return total_count_; } // Add an element to the queue. - void Enqueue(T&& t, bool priority = false) { + template ::*push)(T&&)> + void Push(T&& t, bool priority) { std::lock_guard lock(mutex_); if (priority) - priority_.push(std::move(t)); + (priority_.*push)(std::move(t)); else - queue_.push(std::move(t)); + (queue_.*push)(std::move(t)); ++total_count_; waiter_->cv.notify_one(); if (waiter1_) waiter1_->cv.notify_one(); } - // Add an element to the front of the queue. - void PriorityEnqueue(T&& t) { - Enqueue(std::move(t), true); + void PushFront(T&& t, bool priority = false) { + Push<&std::deque::push_front>(std::move(t), priority); + } + + void PushBack(T&& t, bool priority = false) { + Push<&std::deque::push_back>(std::move(t), priority); } // Add a set of elements to the queue. @@ -139,9 +143,9 @@ struct ThreadedQueue : public BaseThreadQueue { for (T& element : elements) { if (priority) - priority_.push(std::move(element)); + priority_.push_back(std::move(element)); else - queue_.push(std::move(element)); + queue_.push_back(std::move(element)); } elements.clear(); @@ -158,11 +162,11 @@ struct ThreadedQueue : public BaseThreadQueue { result.reserve(priority_.size() + queue_.size()); while (!priority_.empty()) { result.emplace_back(std::move(priority_.front())); - priority_.pop(); + priority_.pop_front(); } while (!queue_.empty()) { result.emplace_back(std::move(queue_.front())); - queue_.pop(); + queue_.pop_front(); } return result; @@ -171,24 +175,16 @@ struct ThreadedQueue : public BaseThreadQueue { // Returns true if the queue is empty. This is lock-free. bool IsEmpty() { return total_count_ == 0; } - // TODO: Unify code between DequeuePlusAction with TryDequeuePlusAction. - // Probably have opt Dequeue(bool wait_for_element); - // Get the first element from the queue. Blocks until one is available. - // Executes |action| with an acquired |mutex_|. - template - T DequeuePlusAction(TAction action) { + T Dequeue() { std::unique_lock lock(mutex_); waiter_->cv.wait(lock, [&]() { return !priority_.empty() || !queue_.empty(); }); - auto execute = [&](std::queue* q) { + auto execute = [&](std::deque* q) { auto val = std::move(q->front()); - q->pop(); + q->pop_front(); --total_count_; - - action(); - return std::move(val); }; if (!priority_.empty()) @@ -196,18 +192,13 @@ struct ThreadedQueue : public BaseThreadQueue { return execute(&queue_); } - // Get the first element from the queue. Blocks until one is available. - T Dequeue() { - return DequeuePlusAction([]() {}); - } - // Get the first element from the queue without blocking. Returns a null // value if the queue is empty. - optional TryDequeueHelper(int which) { + optional TryPopFrontHelper(int which) { std::lock_guard lock(mutex_); - auto execute = [&](std::queue* q) { + auto execute = [&](std::deque* q) { auto val = std::move(q->front()); - q->pop(); + q->pop_front(); --total_count_; return std::move(val); }; @@ -218,24 +209,40 @@ struct ThreadedQueue : public BaseThreadQueue { return nullopt; } - optional TryDequeue() { - return TryDequeueHelper(3); + optional TryPopFront() { + return TryPopFrontHelper(3); } - optional TryDequeueLow() { - return TryDequeueHelper(1); + optional TryPopBack() { + std::lock_guard lock(mutex_); + auto execute = [&](std::deque* q) { + auto val = std::move(q->back()); + q->pop_back(); + --total_count_; + return std::move(val); + }; + // Reversed + if (queue_.size()) + return execute(&queue_); + if (priority_.size()) + return execute(&priority_); + return nullopt; } - optional TryDequeueHigh() { - return TryDequeueHelper(2); + optional TryPopFrontLow() { + return TryPopFrontHelper(1); + } + + optional TryPopFrontHigh() { + return TryPopFrontHelper(2); } mutable std::mutex mutex_; private: std::atomic total_count_; - std::queue priority_; - std::queue queue_; + std::deque priority_; + std::deque queue_; MultiQueueWaiter* waiter_; std::unique_ptr owned_waiter_; // TODO remove waiter1 after split of on_indexed