From 3839d1e5ab07098b2c3996dc10cff33e12c5c691 Mon Sep 17 00:00:00 2001 From: Fangrui Song Date: Sun, 4 Feb 2018 19:38:57 -0800 Subject: [PATCH] Decouple QueryDb_ImportMain --- src/command_line.cc | 12 +-- src/import_pipeline.cc | 184 +++++++++++++++++++++++------------------ src/import_pipeline.h | 4 + src/query.cc | 1 - src/threaded_queue.h | 52 ++++++------ 5 files changed, 139 insertions(+), 114 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 4870ccc5..c252641a 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -146,19 +146,11 @@ bool QueryDbMainLoop(Config* config, CodeCompleteCache* non_global_code_complete_cache, CodeCompleteCache* signature_cache) { auto* queue = QueueManager::instance(); - bool did_work = false; - std::vector> messages = queue->for_querydb.DequeueAll(); + bool did_work = messages.size(); for (auto& message : messages) { - did_work = true; - - for (MessageHandler* handler : *MessageHandler::message_handlers) { - if (handler->GetId() == message->method_id) { - handler->Run(std::move(message)); - break; - } - } + QueryDb_Handle(message); if (message) { LOG_S(FATAL) << "Exiting; unhandled IPC message " << IpcIdToString(message->method_id); diff --git a/src/import_pipeline.cc b/src/import_pipeline.cc index 2f2d2bfd..b0588733 100644 --- a/src/import_pipeline.cc +++ b/src/import_pipeline.cc @@ -18,7 +18,6 @@ #include #include -#include #include #include @@ -381,7 +380,8 @@ void ParseFile(Config* config, true /*write_to_disk*/)); } - QueueManager::instance()->do_id_map.EnqueueAll(std::move(result)); + QueueManager::instance()->do_id_map.EnqueueAll(std::move(result), + request.is_interactive); } bool IndexMain_DoParse( @@ -466,7 +466,7 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) { #endif Index_OnIndexed reply(update, response->perf); - queue->on_indexed.Enqueue(std::move(reply)); + queue->on_indexed.Enqueue(std::move(reply), response->is_interactive); return true; } @@ -609,6 +609,105 @@ void Indexer_Main(Config* config, } } +void QueryDb_Handle(std::unique_ptr& message) { + for (MessageHandler* handler : *MessageHandler::message_handlers) { + if (handler->GetId() == message->method_id) { + handler->Run(std::move(message)); + break; + } + } +} + +namespace { +void QueryDb_DoIdMap(QueueManager* queue, + QueryDatabase* db, + ImportManager* import_manager, + Index_DoIdMap* request) { + assert(request->current); + + // If the request does not have previous state and we have already imported + // it, load the previous state from disk and rerun IdMap logic later. Do not + // do this if we have already attempted in the past. + if (!request->load_previous && !request->previous && + db->usr_to_file.find(NormalizedPath(request->current->path)) != + db->usr_to_file.end()) { + assert(!request->load_previous); + request->load_previous = true; + queue->load_previous_index.Enqueue(std::move(*request)); + return; + } + + // Check if the file is already being imported into querydb. If it is, drop + // the request. + // + // Note, we must do this *after* we have checked for the previous index, + // otherwise we will never actually generate the IdMap. + if (!import_manager->StartQueryDbImport(request->current->path)) { + LOG_S(INFO) << "Dropping index as it is already being imported for " + << request->current->path; + return; + } + + Index_OnIdMapped response(request->cache_manager, request->perf, + request->is_interactive, request->write_to_disk); + Timer time; + + auto make_map = [db](std::unique_ptr file) + -> std::unique_ptr { + if (!file) + return nullptr; + + auto id_map = MakeUnique(db, file->id_cache); + return MakeUnique(std::move(file), + std::move(id_map)); + }; + response.current = make_map(std::move(request->current)); + response.previous = make_map(std::move(request->previous)); + response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); + + queue->on_id_mapped.Enqueue(std::move(response)); +} + +void QueryDb_OnIndexed(QueueManager* queue, + QueryDatabase* db, + ImportManager* import_manager, + ImportPipelineStatus* status, + SemanticHighlightSymbolCache* semantic_cache, + WorkingFiles* working_files, + Index_OnIndexed* response) { + Timer time; + db->ApplyIndexUpdate(&response->update); + time.ResetAndPrint("Applying index update for " + + StringJoinMap(response->update.files_def_update, + [](const QueryFile::DefUpdate& value) { + return value.value.path; + })); + + // Update indexed content, inactive lines, and semantic highlighting. + for (auto& updated_file : response->update.files_def_update) { + WorkingFile* working_file = + working_files->GetFileByFilename(updated_file.value.path); + if (working_file) { + // Update indexed content. + working_file->SetIndexContent(updated_file.file_content); + + // Inactive lines. + EmitInactiveLines(working_file, updated_file.value.inactive_regions); + + // Semantic highlighting. + QueryFileId file_id = + db->usr_to_file[NormalizedPath(working_file->filename)]; + QueryFile* file = &db->files[file_id.id]; + EmitSemanticHighlighting(db, semantic_cache, working_file, file); + } + + // Mark the files as being done in querydb stage after we apply the index + // update. + import_manager->DoneQueryDbImport(updated_file.value.path); + } +} +} // namespace + bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import_manager, @@ -626,89 +725,16 @@ bool QueryDb_ImportMain(Config* config, if (!request) break; did_work = true; - - assert(request->current); - - // If the request does not have previous state and we have already imported - // it, load the previous state from disk and rerun IdMap logic later. Do not - // do this if we have already attempted in the past. - if (!request->load_previous && !request->previous && - db->usr_to_file.find(NormalizedPath(request->current->path)) != - db->usr_to_file.end()) { - assert(!request->load_previous); - request->load_previous = true; - queue->load_previous_index.Enqueue(std::move(*request)); - continue; - } - - // Check if the file is already being imported into querydb. If it is, drop - // the request. - // - // Note, we must do this *after* we have checked for the previous index, - // otherwise we will never actually generate the IdMap. - if (!import_manager->StartQueryDbImport(request->current->path)) { - LOG_S(INFO) << "Dropping index as it is already being imported for " - << request->current->path; - continue; - } - - Index_OnIdMapped response(request->cache_manager, request->perf, request->is_interactive, - request->write_to_disk); - Timer time; - - auto make_map = [db](std::unique_ptr file) - -> std::unique_ptr { - if (!file) - return nullptr; - - auto id_map = MakeUnique(db, file->id_cache); - return MakeUnique(std::move(file), - std::move(id_map)); - }; - response.current = make_map(std::move(request->current)); - response.previous = make_map(std::move(request->previous)); - response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); - - queue->on_id_mapped.Enqueue(std::move(response)); + QueryDb_DoIdMap(queue, db, import_manager, &*request); } while (true) { optional response = queue->on_indexed.TryDequeue(); if (!response) break; - did_work = true; - - Timer time; - db->ApplyIndexUpdate(&response->update); - time.ResetAndPrint("Applying index update for " + - StringJoinMap(response->update.files_def_update, - [](const QueryFile::DefUpdate& value) { - return value.value.path; - })); - - // Update indexed content, inactive lines, and semantic highlighting. - for (auto& updated_file : response->update.files_def_update) { - WorkingFile* working_file = - working_files->GetFileByFilename(updated_file.value.path); - if (working_file) { - // Update indexed content. - working_file->SetIndexContent(updated_file.file_content); - - // Inactive lines. - EmitInactiveLines(working_file, updated_file.value.inactive_regions); - - // Semantic highlighting. - QueryFileId file_id = - db->usr_to_file[NormalizedPath(working_file->filename)]; - QueryFile* file = &db->files[file_id.id]; - EmitSemanticHighlighting(db, semantic_cache, working_file, file); - } - - // Mark the files as being done in querydb stage after we apply the index - // update. - import_manager->DoneQueryDbImport(updated_file.value.path); - } + QueryDb_OnIndexed(queue, db, import_manager, status, semantic_cache, + working_files, &*response); } return did_work; diff --git a/src/import_pipeline.h b/src/import_pipeline.h index 5f212b43..19770def 100644 --- a/src/import_pipeline.h +++ b/src/import_pipeline.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -42,6 +43,9 @@ void Indexer_Main(Config* config, WorkingFiles* working_files, MultiQueueWaiter* waiter); +struct BaseIpcMessage; +void QueryDb_Handle(std::unique_ptr& message); + bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import_manager, diff --git a/src/query.cc b/src/query.cc index 2582a908..4f129cc5 100644 --- a/src/query.cc +++ b/src/query.cc @@ -863,7 +863,6 @@ void QueryDatabase::ApplyIndexUpdate(IndexUpdate* update) { AddRangeWithGen(&def.def_var_name, merge_update.to_add, def.gen); \ RemoveRangeWithGen(&def.def_var_name, merge_update.to_remove); \ VerifyUnique(def.def_var_name); \ - UpdateGen(this, def.def_var_name); \ } for (const std::string& filename : update->files_removed) diff --git a/src/threaded_queue.h b/src/threaded_queue.h index e0549cb0..e0c2a9b2 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -110,28 +110,26 @@ struct ThreadedQueue : public BaseThreadQueue { // Returns the number of elements in the queue. This is lock-free. size_t Size() const { return total_count_; } - // Add an element to the front of the queue. - void PriorityEnqueue(T&& t) { + // Add an element to the queue. + void Enqueue(T&& t, bool priority = false) { std::lock_guard lock(mutex_); - priority_.push(std::move(t)); + if (priority) + priority_.push(std::move(t)); + else + queue_.push(std::move(t)); ++total_count_; waiter_->cv.notify_one(); if (waiter1_) waiter1_->cv.notify_one(); } - // Add an element to the queue. - void Enqueue(T&& t) { - std::lock_guard lock(mutex_); - queue_.push(std::move(t)); - ++total_count_; - waiter_->cv.notify_one(); - if (waiter1_) - waiter1_->cv.notify_one(); + // Add an element to the front of the queue. + void PriorityEnqueue(T&& t) { + Enqueue(std::move(t), true); } // Add a set of elements to the queue. - void EnqueueAll(std::vector&& elements) { + void EnqueueAll(std::vector&& elements, bool priority = false) { if (elements.empty()) return; @@ -140,7 +138,10 @@ struct ThreadedQueue : public BaseThreadQueue { total_count_ += elements.size(); for (T& element : elements) { - queue_.push(std::move(element)); + if (priority) + priority_.push(std::move(element)); + else + queue_.push(std::move(element)); } elements.clear(); @@ -202,28 +203,31 @@ struct ThreadedQueue : public BaseThreadQueue { // Get the first element from the queue without blocking. Returns a null // value if the queue is empty. - template - optional TryDequeuePlusAction(TAction action) { + optional TryDequeueHelper(int which) { std::lock_guard lock(mutex_); - if (priority_.empty() && queue_.empty()) - return nullopt; - auto execute = [&](std::queue* q) { auto val = std::move(q->front()); q->pop(); --total_count_; - - action(val); - return std::move(val); }; - if (!priority_.empty()) + if (which & 2 && priority_.size()) return execute(&priority_); - return execute(&queue_); + if (which & 1 && queue_.size()) + return execute(&queue_); + return nullopt; } optional TryDequeue() { - return TryDequeuePlusAction([](const T&) {}); + return TryDequeueHelper(3); + } + + optional TryDequeueLow() { + return TryDequeueHelper(1); + } + + optional TryDequeueHigh() { + return TryDequeueHelper(2); } mutable std::mutex mutex_;