From 1fd0a1be949e9616acc06bff4d83d28fb2c86be9 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Tue, 20 Mar 2018 11:55:40 -0700 Subject: [PATCH] Reduce queue lengths by running index updates as iteration loop A single translation unit can create many index updates, so give IndexMain_DoCreateIndexUpdate a chance to run a few times. This should also be faster as it is more icache friendly. --- src/import_pipeline.cc | 121 ++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 57 deletions(-) diff --git a/src/import_pipeline.cc b/src/import_pipeline.cc index 4876bba0..3fcff636 100644 --- a/src/import_pipeline.cc +++ b/src/import_pipeline.cc @@ -452,67 +452,74 @@ bool IndexMain_DoParse( bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) { auto* queue = QueueManager::instance(); - optional response = queue->on_id_mapped.TryPopFront(); - if (!response) - return false; - Timer time; + bool did_work = false; + IterationLoop loop; + while (loop.Next()) { + optional response = queue->on_id_mapped.TryPopFront(); + if (!response) + return did_work; - IdMap* previous_id_map = nullptr; - IndexFile* previous_index = nullptr; - if (response->previous) { - previous_id_map = response->previous->ids.get(); - previous_index = response->previous->file.get(); + did_work = true; + + Timer time; + + IdMap* previous_id_map = nullptr; + IndexFile* previous_index = nullptr; + if (response->previous) { + previous_id_map = response->previous->ids.get(); + previous_index = response->previous->file.get(); + } + + // Build delta update. + IndexUpdate update = + IndexUpdate::CreateDelta(previous_id_map, response->current->ids.get(), + previous_index, response->current->file.get()); + response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset(); + LOG_S(INFO) << "Built index update for " << response->current->file->path + << " (is_delta=" << !!response->previous << ")"; + + // Write current index to disk if requested. + if (response->write_to_disk) { + LOG_S(INFO) << "Writing cached index to disk for " + << response->current->file->path; + time.Reset(); + response->cache_manager->WriteToCache(*response->current->file); + response->perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset(); + timestamp_manager->UpdateCachedModificationTime( + response->current->file->path, + response->current->file->last_modification_time); + } + + #if false + #define PRINT_SECTION(name) \ + if (response->perf.name) { \ + total += response->perf.name; \ + output << " " << #name << ": " << FormatMicroseconds(response->perf.name); \ + } + std::stringstream output; + long long total = 0; + output << "[perf]"; + PRINT_SECTION(index_parse); + PRINT_SECTION(index_build); + PRINT_SECTION(index_save_to_disk); + PRINT_SECTION(index_load_cached); + PRINT_SECTION(querydb_id_map); + PRINT_SECTION(index_make_delta); + output << "\n total: " << FormatMicroseconds(total); + output << " path: " << response->current_index->path; + LOG_S(INFO) << output.rdbuf(); + #undef PRINT_SECTION + + if (response->is_interactive) + LOG_S(INFO) << "Applying IndexUpdate" << std::endl << update.ToString(); + #endif + + Index_OnIndexed reply(std::move(update), response->perf); + queue->on_indexed.PushBack(std::move(reply), response->is_interactive); } - // Build delta update. - IndexUpdate update = - IndexUpdate::CreateDelta(previous_id_map, response->current->ids.get(), - previous_index, response->current->file.get()); - response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset(); - LOG_S(INFO) << "Built index update for " << response->current->file->path - << " (is_delta=" << !!response->previous << ")"; - - // Write current index to disk if requested. - if (response->write_to_disk) { - LOG_S(INFO) << "Writing cached index to disk for " - << response->current->file->path; - time.Reset(); - response->cache_manager->WriteToCache(*response->current->file); - response->perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset(); - timestamp_manager->UpdateCachedModificationTime( - response->current->file->path, - response->current->file->last_modification_time); - } - -#if false -#define PRINT_SECTION(name) \ - if (response->perf.name) { \ - total += response->perf.name; \ - output << " " << #name << ": " << FormatMicroseconds(response->perf.name); \ - } - std::stringstream output; - long long total = 0; - output << "[perf]"; - PRINT_SECTION(index_parse); - PRINT_SECTION(index_build); - PRINT_SECTION(index_save_to_disk); - PRINT_SECTION(index_load_cached); - PRINT_SECTION(querydb_id_map); - PRINT_SECTION(index_make_delta); - output << "\n total: " << FormatMicroseconds(total); - output << " path: " << response->current_index->path; - LOG_S(INFO) << output.rdbuf(); -#undef PRINT_SECTION - - if (response->is_interactive) - LOG_S(INFO) << "Applying IndexUpdate" << std::endl << update.ToString(); -#endif - - Index_OnIndexed reply(std::move(update), response->perf); - queue->on_indexed.PushBack(std::move(reply), response->is_interactive); - - return true; + return did_work; } bool IndexMain_LoadPreviousIndex() {