diff --git a/src/command_line.cc b/src/command_line.cc index 772a4198..046b0b29 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -609,8 +609,6 @@ struct IndexRequest { }; MAKE_REFLECT_STRUCT(IndexRequest, path, args); -using Index_IndexProcess_Request_IndexQueue = ThreadedQueue; - struct IndexResult { std::string file_path; @@ -654,17 +652,19 @@ MAKE_REFLECT_STRUCT(IndexResult, file_path, perf); struct QueueManager { + using IndexProcess_RequestQueue = ThreadedQueue; using IndexProcess_ResponseQueue = ThreadedQueue; using Index_DoIdMapQueue = ThreadedQueue; using Index_OnIdMappedQueue = ThreadedQueue; using Index_OnIndexedQueue = ThreadedQueue; - IndexProcess_ResponseQueue process_response; + IndexProcess_RequestQueue index_request; + IndexProcess_ResponseQueue index_response; Index_DoIdMapQueue do_id_map; Index_OnIdMappedQueue on_id_mapped; Index_OnIndexedQueue on_indexed; - QueueManager(MultiQueueWaiter* waiter) : process_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {} + QueueManager(MultiQueueWaiter* waiter) : index_request(waiter), index_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {} }; void RegisterMessageTypes() { @@ -812,171 +812,15 @@ struct IndexManager { //} // namespace -bool IndexMain_DoIndex(Config* config, - FileConsumer::SharedState* file_consumer_shared, - Project* project, - WorkingFiles* working_files, - clang::Index* index, - QueueManager* queue) { - optional request = queue->process_response.TryDequeue(); - if (!request) - return false; - - std::unique_ptr current_index = - LoadCachedIndex(config, request->file_path); - if (!current_index) { - std::cerr << "!!! Failed to load index for " + request->file_path + "\n"; - return false; - } - - assert(current_index); - - // TODO: get real value for is_interactive - Index_DoIdMap response(std::move(current_index), request->perf, - false /*is_interactive*/); - queue->do_id_map.Enqueue(std::move(response)); - - return true; -} - -bool IndexMain_DoCreateIndexUpdate( - QueueManager* queue) { - // TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed. - optional response = queue->on_id_mapped.TryDequeue(); - if (!response) - return false; - - Timer time; - - IdMap* previous_id_map = nullptr; - IndexFile* previous_index = nullptr; - if (response->previous) { - LOG_S(INFO) << "Creating delta update for " << response->previous->file->path; - previous_id_map = response->previous->ids.get(); - previous_index = response->previous->file.get(); - } - - IndexUpdate update = IndexUpdate::CreateDelta( - previous_id_map, response->current->ids.get(), - previous_index, response->current->file.get()); - response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset(); - -#if false -#define PRINT_SECTION(name) \ - if (response->perf.name) {\ - total += response->perf.name; \ - output << " " << #name << ": " << FormatMicroseconds(response->perf.name); \ - } - std::stringstream output; - long long total = 0; - output << "[perf]"; - PRINT_SECTION(index_parse); - PRINT_SECTION(index_build); - PRINT_SECTION(index_save_to_disk); - PRINT_SECTION(index_load_cached); - PRINT_SECTION(querydb_id_map); - PRINT_SECTION(index_make_delta); - output << "\n total: " << FormatMicroseconds(total); - output << " path: " << response->current_index->path; - output << std::endl; - std::cerr << output.rdbuf(); -#undef PRINT_SECTION - - if (response->is_interactive) - std::cerr << "Applying IndexUpdate" << std::endl << update.ToString() << std::endl; -#endif - - Index_OnIndexed reply(update, response->perf); - queue->on_indexed.Enqueue(std::move(reply)); - - return true; -} - -bool IndexMergeIndexUpdates(QueueManager* queue) { - // TODO/FIXME: it looks like there is a crash here? - optional root = queue->on_indexed.TryDequeue(); - if (!root) - return false; - - bool did_merge = false; - while (true) { - optional to_join = queue->on_indexed.TryDequeue(); - if (!to_join) { - queue->on_indexed.Enqueue(std::move(*root)); - return did_merge; - } - - did_merge = true; - //Timer time; - root->update.Merge(to_join->update); - //time.ResetAndPrint("[indexer] Joining two querydb updates"); - } -} - -void IndexMain(Config* config, - FileConsumer::SharedState* file_consumer_shared, - Project* project, - WorkingFiles* working_files, - MultiQueueWaiter* waiter, - QueueManager* queue) { - SetCurrentThreadName("indexer"); - // TODO: dispose of index after it is not used for a while. - clang::Index index(1, 0); - - while (true) { - // TODO: process all off IndexMain_DoIndex before calling - // IndexMain_DoCreateIndexUpdate for - // better icache behavior. We need to have some threads spinning on - // both though - // otherwise memory usage will get bad. - - // We need to make sure to run both IndexMain_DoIndex and - // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any - // work. Running both also lets the user query the partially constructed - // index. - bool did_index = - IndexMain_DoIndex(config, file_consumer_shared, project, working_files, - &index, queue); - bool did_create_update = - IndexMain_DoCreateIndexUpdate(queue); - bool did_merge = false; - - // Nothing to index and no index updates to create, so join some already - // created index updates to reduce work on querydb thread. - if (!did_index && !did_create_update) - did_merge = IndexMergeIndexUpdates(queue); - - // We didn't do any work, so wait for a notification. - if (!did_index && !did_create_update && !did_merge) - waiter->Wait( - {&queue->process_response, &queue->on_id_mapped, &queue->on_indexed}); - } -} - - - -struct IQueryDbResponder { - virtual void Write(IndexResult result) = 0; -}; - -struct InProcessQueryDbResponder : IQueryDbResponder { - QueueManager::IndexProcess_ResponseQueue* queue_; - - InProcessQueryDbResponder(QueueManager::IndexProcess_ResponseQueue* queue) - : queue_(queue) {} - - void Write(IndexResult result) override { - queue_->Enqueue(std::move(result)); - } -}; - std::vector DoParseFile( - Config* config, - clang::Index* index, - FileConsumer::SharedState* file_consumer_shared, - CacheLoader* cache_loader, - const std::string& path, - const std::vector& args) { + Config* config, + clang::Index* index, + FileConsumer::SharedState* file_consumer_shared, + CacheLoader* cache_loader, + const std::string& path, + const std::vector& args) { + LOG_S(INFO) << "Parsing " << path; + std::vector result; IndexFile* previous_index = cache_loader->TryLoad(path); @@ -1102,78 +946,175 @@ std::vector ParseFile( return DoParseFile(config, index, file_consumer_shared, &cache_loader, tu_path, entry.args); } +bool IndexMain_DoParse( + Config* config, + QueueManager* queues, + FileConsumer::SharedState* file_consumer_shared, + clang::Index* index) { + IndexRequest request = queues->index_request.Dequeue(); -void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexProcess_Request_IndexQueue* queue, std::atomic* busy, FileConsumer::SharedState* file_consumer_shared) { + Project::Entry entry; + entry.filename = request.path; + entry.args = request.args; + std::vector responses = ParseFile(config, index, file_consumer_shared, entry); + + for (auto response : responses) + queues->index_response.Enqueue(std::move(response)); + + return !responses.empty(); +} + +bool IndexMain_DoIndex(Config* config, + FileConsumer::SharedState* file_consumer_shared, + Project* project, + WorkingFiles* working_files, + clang::Index* index, + QueueManager* queue) { + optional request = queue->index_response.TryDequeue(); + if (!request) + return false; + + std::unique_ptr current_index = + LoadCachedIndex(config, request->file_path); + if (!current_index) { + std::cerr << "!!! Failed to load index for " + request->file_path + "\n"; + return false; + } + + assert(current_index); + + // TODO: get real value for is_interactive + Index_DoIdMap response(std::move(current_index), request->perf, + false /*is_interactive*/); + queue->do_id_map.Enqueue(std::move(response)); + + return true; +} + +bool IndexMain_DoCreateIndexUpdate( + QueueManager* queue) { + // TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed. + optional response = queue->on_id_mapped.TryDequeue(); + if (!response) + return false; + + Timer time; + + IdMap* previous_id_map = nullptr; + IndexFile* previous_index = nullptr; + if (response->previous) { + LOG_S(INFO) << "Creating delta update for " << response->previous->file->path; + previous_id_map = response->previous->ids.get(); + previous_index = response->previous->file.get(); + } + + IndexUpdate update = IndexUpdate::CreateDelta( + previous_id_map, response->current->ids.get(), + previous_index, response->current->file.get()); + response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset(); + +#if false +#define PRINT_SECTION(name) \ + if (response->perf.name) {\ + total += response->perf.name; \ + output << " " << #name << ": " << FormatMicroseconds(response->perf.name); \ + } + std::stringstream output; + long long total = 0; + output << "[perf]"; + PRINT_SECTION(index_parse); + PRINT_SECTION(index_build); + PRINT_SECTION(index_save_to_disk); + PRINT_SECTION(index_load_cached); + PRINT_SECTION(querydb_id_map); + PRINT_SECTION(index_make_delta); + output << "\n total: " << FormatMicroseconds(total); + output << " path: " << response->current_index->path; + output << std::endl; + std::cerr << output.rdbuf(); +#undef PRINT_SECTION + + if (response->is_interactive) + std::cerr << "Applying IndexUpdate" << std::endl << update.ToString() << std::endl; +#endif + + Index_OnIndexed reply(update, response->perf); + queue->on_indexed.Enqueue(std::move(reply)); + + return true; +} + +bool IndexMergeIndexUpdates(QueueManager* queue) { + // TODO/FIXME: it looks like there is a crash here? + optional root = queue->on_indexed.TryDequeue(); + if (!root) + return false; + + bool did_merge = false; while (true) { - IndexRequest request = queue->DequeuePlusAction([&]() { - ++(*busy); - }); + optional to_join = queue->on_indexed.TryDequeue(); + if (!to_join) { + queue->on_indexed.Enqueue(std::move(*root)); + return did_merge; + } - clang::Index index(0, 0); - Project::Entry entry; - entry.filename = request.path; - entry.args = request.args; - std::vector responses = ParseFile(config, &index, file_consumer_shared, entry); + did_merge = true; + //Timer time; + root->update.Merge(to_join->update); + //time.ResetAndPrint("[indexer] Joining two querydb updates"); + } +} - for (const auto& response : responses) - responder->Write(response); +void IndexMain(Config* config, + FileConsumer::SharedState* file_consumer_shared, + Project* project, + WorkingFiles* working_files, + MultiQueueWaiter* waiter, + QueueManager* queue) { + SetCurrentThreadName("indexer"); + // TODO: dispose of index after it is not used for a while. + clang::Index index(1, 0); - --(*busy); + while (true) { + // TODO: process all off IndexMain_DoIndex before calling + // IndexMain_DoCreateIndexUpdate for + // better icache behavior. We need to have some threads spinning on + // both though + // otherwise memory usage will get bad. + + bool did_parse = IndexMain_DoParse(config, queue, file_consumer_shared, &index); + + // We need to make sure to run both IndexMain_DoIndex and + // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any + // work. Running both also lets the user query the partially constructed + // index. + bool did_index = + IndexMain_DoIndex(config, file_consumer_shared, project, working_files, + &index, queue); + bool did_create_update = + IndexMain_DoCreateIndexUpdate(queue); + bool did_merge = false; + + // Nothing to index and no index updates to create, so join some already + // created index updates to reduce work on querydb thread. + if (!did_parse && !did_index && !did_create_update) + did_merge = IndexMergeIndexUpdates(queue); + + // We didn't do any work, so wait for a notification. + if (!did_parse && !did_index && !did_create_update && !did_merge) + waiter->Wait( + {&queue->index_request, &queue->index_response, &queue->on_id_mapped, &queue->on_indexed}); } } -struct IIndexerProcess { - virtual void SetConfig(const Config& config) = 0; - virtual void SendMessage(IndexRequest message) = 0; -}; -struct InProcessIndexer : IIndexerProcess { - ThreadedQueue* messages_; - Index_IndexProcess_Request_IndexQueue queue_; - std::vector indexer_threads_; - std::atomic num_busy_indexers_; - Config config_; - IQueryDbResponder* responder_; - // TODO: Remove FileConsumer::SharedState support from indexer. Indexer - // always generates every index since it doesn't take a huge amount of time. - // Then the querydb process decides if the new index is worth importing. At - // some point we can "blacklist" certain files we are definately not - // interested in indexing. - FileConsumer::SharedState file_consumer_shared_; - explicit InProcessIndexer(IQueryDbResponder* responder, ThreadedQueue* messages) - : messages_(messages), num_busy_indexers_(0), responder_(responder) {} - void SetConfig(const Config& config) override { - config_ = config; - for (int i = 0; i < config_.indexerCount; ++i) { - indexer_threads_.push_back(std::thread([&, i]() { - SetCurrentThreadName("indexer" + std::to_string(i)); - IndexThreadMain(&config_, responder_, &queue_, &num_busy_indexers_, &file_consumer_shared_); - })); - } - } - void SendMessage(IndexRequest message) override { - // Dispatch the request so one of the indexers will pick it up. - queue_.Enqueue(std::move(message)); - } - bool TryWaitUntilIdle() { - // Wait until all indexers are done running and we have finished all of our work. - while (num_busy_indexers_ != 0 || !queue_.IsEmpty() || !messages_->IsEmpty()) { - // There are other messages that need to be processed; start the loop over. - if (!messages_->IsEmpty()) - return false; - std::cerr << "!! Trying to exit indexer; there are still " + std::to_string(num_busy_indexers_) + " indexers running\n"; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - return true; - } -}; @@ -1249,7 +1190,6 @@ bool QueryDbMainLoop( CacheManager* db_cache, MultiQueueWaiter* waiter, QueueManager* queue, - IIndexerProcess* indexer_process, Project* project, FileConsumer::SharedState* file_consumer_shared, WorkingFiles* working_files, @@ -1327,9 +1267,6 @@ bool QueryDbMainLoop( }); } - // Send config to indexer process. - indexer_process->SetConfig(*config); - Timer time; // Open up / load the project. @@ -1344,7 +1281,7 @@ bool QueryDbMainLoop( //std::cerr << "[" << i << "/" << (project->entries.size() - 1) // << "] Dispatching index request for file " << entry.filename // << std::endl; - indexer_process->SendMessage(IndexRequest(entry.filename, entry.args)); + queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args)); }); // We need to support multiple concurrent index processes. @@ -1407,7 +1344,7 @@ bool QueryDbMainLoop( project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) { LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1) << "] Dispatching index request for file " << entry.filename; - indexer_process->SendMessage(IndexRequest(entry.filename, entry.args)); + queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args)); }); break; } @@ -1665,7 +1602,7 @@ bool QueryDbMainLoop( // if so, ignore that index response. // TODO: send as priority request Project::Entry entry = project->FindCompilationEntryForFile(path); - indexer_process->SendMessage(IndexRequest(entry.filename, entry.args)); + queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args)); clang_complete->NotifySave(path); @@ -2596,11 +2533,6 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* auto signature_cache = MakeUnique(); FileConsumer::SharedState file_consumer_shared; - InProcessQueryDbResponder responder(&queue.process_response); - ThreadedQueue queue_process_request; - auto indexer = MakeUnique(&responder, &queue_process_request); - //auto indexer = MakeUnique(bin_name, &queue_process_response); - // Run query db main loop. SetCurrentThreadName("querydb"); QueryDatabase db; @@ -2608,7 +2540,6 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* while (true) { bool did_work = QueryDbMainLoop( config, &db, &db_cache, waiter, &queue, - indexer.get(), &project, &file_consumer_shared, &working_files, &clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); if (!did_work) {