From cac5dcaf8f473bc916305e1348bc59294cccca1f Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Tue, 1 Aug 2017 20:23:37 -0700 Subject: [PATCH] Simplify queue management --- src/command_line.cc | 89 ++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 25c69fb1..4f456ada 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -718,10 +718,19 @@ MAKE_REFLECT_STRUCT(IndexProcess_Response, type, index_result_args); -using IndexProcess_ResponseQueue = ThreadedQueue; -using Index_DoIdMapQueue = ThreadedQueue; -using Index_OnIdMappedQueue = ThreadedQueue; -using Index_OnIndexedQueue = ThreadedQueue; +struct QueueManager { + using IndexProcess_ResponseQueue = ThreadedQueue; + using Index_DoIdMapQueue = ThreadedQueue; + using Index_OnIdMappedQueue = ThreadedQueue; + using Index_OnIndexedQueue = ThreadedQueue; + + IndexProcess_ResponseQueue process_response; + Index_DoIdMapQueue do_id_map; + Index_OnIdMappedQueue on_id_mapped; + Index_OnIndexedQueue on_indexed; + + QueueManager(MultiQueueWaiter* waiter) : process_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {} +}; void RegisterMessageTypes() { MessageRegistry::instance()->Register(); @@ -873,10 +882,9 @@ bool IndexMain_DoIndex(Config* config, Project* project, WorkingFiles* working_files, clang::Index* index, - IndexProcess_ResponseQueue* queue_index_response, - Index_DoIdMapQueue* queue_do_id_map) { + QueueManager* queue) { optional request = - queue_index_response->TryDequeue(); + queue->process_response.TryDequeue(); if (!request) return false; @@ -892,20 +900,20 @@ bool IndexMain_DoIndex(Config* config, // TODO: get real value for is_interactive Index_DoIdMap response(std::move(current_index), request->perf, false /*is_interactive*/); - queue_do_id_map->Enqueue(std::move(response)); + queue->do_id_map.Enqueue(std::move(response)); return true; } bool IndexMain_DoCreateIndexUpdate( - Index_OnIdMappedQueue* queue_on_id_mapped, - Index_OnIndexedQueue* queue_on_indexed) { - optional response = queue_on_id_mapped->TryDequeue(); + QueueManager* queue) { + // TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed. + optional response = queue->on_id_mapped.TryDequeue(); if (!response) return false; Timer time; - + IdMap* previous_id_map = nullptr; IndexFile* previous_index = nullptr; if (response->previous) { @@ -944,22 +952,22 @@ bool IndexMain_DoCreateIndexUpdate( #endif Index_OnIndexed reply(update, response->perf); - queue_on_indexed->Enqueue(std::move(reply)); + queue->on_indexed.Enqueue(std::move(reply)); return true; } -bool IndexMergeIndexUpdates(Index_OnIndexedQueue* queue_on_indexed) { +bool IndexMergeIndexUpdates(QueueManager* queue) { // TODO/FIXME: it looks like there is a crash here? - optional root = queue_on_indexed->TryDequeue(); + optional root = queue->on_indexed.TryDequeue(); if (!root) return false; bool did_merge = false; while (true) { - optional to_join = queue_on_indexed->TryDequeue(); + optional to_join = queue->on_indexed.TryDequeue(); if (!to_join) { - queue_on_indexed->Enqueue(std::move(*root)); + queue->on_indexed.Enqueue(std::move(*root)); return did_merge; } @@ -975,10 +983,7 @@ void IndexMain(Config* config, Project* project, WorkingFiles* working_files, MultiQueueWaiter* waiter, - IndexProcess_ResponseQueue* queue_index_response, - Index_DoIdMapQueue* queue_do_id_map, - Index_OnIdMappedQueue* queue_on_id_mapped, - Index_OnIndexedQueue* queue_on_indexed) { + QueueManager* queue) { SetCurrentThreadName("indexer"); // TODO: dispose of index after it is not used for a while. clang::Index index(1, 0); @@ -996,20 +1001,20 @@ void IndexMain(Config* config, // index. bool did_index = IndexMain_DoIndex(config, file_consumer_shared, project, working_files, - &index, queue_index_response, queue_do_id_map); + &index, queue); bool did_create_update = - IndexMain_DoCreateIndexUpdate(queue_on_id_mapped, queue_on_indexed); + IndexMain_DoCreateIndexUpdate(queue); bool did_merge = false; // Nothing to index and no index updates to create, so join some already // created index updates to reduce work on querydb thread. if (!did_index && !did_create_update) - did_merge = IndexMergeIndexUpdates(queue_on_indexed); + did_merge = IndexMergeIndexUpdates(queue); // We didn't do any work, so wait for a notification. if (!did_index && !did_create_update && !did_merge) waiter->Wait( - {queue_index_response, queue_on_id_mapped, queue_on_indexed}); + {&queue->process_response, &queue->on_id_mapped, &queue->on_indexed}); } } @@ -1031,9 +1036,9 @@ struct OutOfProcessQueryDbResponder : IQueryDbResponder { }; struct InProcessQueryDbResponder : IQueryDbResponder { - IndexProcess_ResponseQueue* queue_; + QueueManager::IndexProcess_ResponseQueue* queue_; - InProcessQueryDbResponder(IndexProcess_ResponseQueue* queue) + InProcessQueryDbResponder(QueueManager::IndexProcess_ResponseQueue* queue) : queue_(queue) {} void Write(IndexProcess_Response response) override { @@ -1329,14 +1334,14 @@ struct OutOfProcessIndexer : IIndexerProcess { optional config_; std::string bin_name_; - IndexProcess_ResponseQueue* response_queue_; + QueueManager::IndexProcess_ResponseQueue* response_queue_; const int kMaxIndexRequestsUntilRestart = 25; bool enable_auto_restart_ = false; int number_of_index_requests_since_last_restart_ = 0; OutOfProcessIndexer(const std::string& bin_name, - IndexProcess_ResponseQueue* response_queue) + QueueManager::IndexProcess_ResponseQueue* response_queue) : bin_name_(bin_name), response_queue_(response_queue) { CreateIndexProcess(); } @@ -1578,10 +1583,7 @@ bool QueryDbMainLoop( QueryDatabase* db, CacheManager* db_cache, MultiQueueWaiter* waiter, - IndexProcess_ResponseQueue* queue_index_response, - Index_DoIdMapQueue* queue_do_id_map, - Index_OnIdMappedQueue* queue_on_id_mapped, - Index_OnIndexedQueue* queue_on_indexed, + QueueManager* queue, IIndexerProcess* indexer_process, Project* project, FileConsumer::SharedState* file_consumer_shared, @@ -1656,7 +1658,7 @@ bool QueryDbMainLoop( std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl; for (int i = 0; i < config->indexerCount; ++i) { new std::thread([&]() { - IndexMain(config, file_consumer_shared, project, working_files, waiter, queue_index_response, queue_do_id_map, queue_on_id_mapped, queue_on_indexed); + IndexMain(config, file_consumer_shared, project, working_files, waiter, queue); }); } @@ -2865,7 +2867,7 @@ bool QueryDbMainLoop( while (true) { - optional request = queue_do_id_map->TryDequeue(); + optional request = queue->do_id_map.TryDequeue(); if (!request) break; @@ -2881,11 +2883,11 @@ bool QueryDbMainLoop( response.previous = db_cache->UpdateAndReturnOldFile(response.current); response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); - queue_on_id_mapped->Enqueue(std::move(response)); + queue->on_id_mapped.Enqueue(std::move(response)); } while (true) { - optional response = queue_on_indexed->TryDequeue(); + optional response = queue->on_indexed.TryDequeue(); if (!response) break; @@ -2918,10 +2920,7 @@ bool QueryDbMainLoop( void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) { // Create queues. - IndexProcess_ResponseQueue queue_process_response(waiter); - Index_DoIdMapQueue queue_do_id_map(waiter); - Index_OnIdMappedQueue queue_on_id_mapped(waiter); - Index_OnIndexedQueue queue_on_indexed(waiter); + QueueManager queue(waiter); Project project; WorkingFiles working_files; @@ -2934,7 +2933,7 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* auto signature_cache = MakeUnique(); FileConsumer::SharedState file_consumer_shared; - InProcessQueryDbResponder responder(&queue_process_response); + InProcessQueryDbResponder responder(&queue.process_response); ThreadedQueue queue_process_request; auto indexer = MakeUnique(&responder, &queue_process_request); //auto indexer = MakeUnique(bin_name, &queue_process_response); @@ -2945,7 +2944,7 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* CacheManager db_cache; while (true) { bool did_work = QueryDbMainLoop( - config, &db, &db_cache, waiter, &queue_process_response, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, + config, &db, &db_cache, waiter, &queue, indexer.get(), &project, &file_consumer_shared, &working_files, &clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); @@ -2953,8 +2952,8 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* IpcManager* ipc = IpcManager::instance(); waiter->Wait({ ipc->threaded_queue_for_server_.get(), - &queue_do_id_map, - &queue_on_indexed + &queue.do_id_map, + &queue.on_indexed }); } }