Fix progress not always going to zero

This commit is contained in:
Jacob Dufault 2018-01-10 19:56:47 -08:00
parent 54394ed868
commit 05b577c9fa

View File

@ -32,9 +32,24 @@ long long GetCurrentTimeInMilliseconds() {
return elapsed_milliseconds;
}
// Send indexing progress to client if reporting is enabled.
void EmitProgress(Config* config, ImportPipelineStatus* status) {
if (config->progressReportFrequencyMs >= 0) {
struct ActiveThread {
ActiveThread(Config* config, ImportPipelineStatus* status)
: config_(config), status_(status) {
if (config_->progressReportFrequencyMs < 0)
return;
++status_->num_active_threads;
}
~ActiveThread() {
if (config_->progressReportFrequencyMs < 0)
return;
--status_->num_active_threads;
EmitProgress();
}
// Send indexing progress to client if reporting is enabled.
void EmitProgress() {
auto* queue = QueueManager::instance();
Out_Progress out;
out.params.indexRequestCount = queue->index_request.Size();
@ -42,26 +57,29 @@ void EmitProgress(Config* config, ImportPipelineStatus* status) {
out.params.loadPreviousIndexCount = queue->load_previous_index.Size();
out.params.onIdMappedCount = queue->on_id_mapped.Size();
out.params.onIndexedCount = queue->on_indexed.Size();
out.params.activeThreads = status->num_active_threads;
out.params.activeThreads = status_->num_active_threads;
// Ignore this progress update if the last update was too recent.
if (config->progressReportFrequencyMs != 0) {
if (config_->progressReportFrequencyMs != 0) {
// Make sure we output a status update if queue lengths are zero.
bool has_state =
out.params.indexRequestCount != 0 || out.params.doIdMapCount != 0 ||
out.params.loadPreviousIndexCount != 0 ||
out.params.onIdMappedCount != 0 || out.params.onIndexedCount != 0 ||
out.params.activeThreads != 0;
if (!has_state ||
GetCurrentTimeInMilliseconds() < status->next_progress_output)
bool all_zero =
out.params.indexRequestCount == 0 && out.params.doIdMapCount == 0 &&
out.params.loadPreviousIndexCount == 0 &&
out.params.onIdMappedCount == 0 && out.params.onIndexedCount == 0 &&
out.params.activeThreads == 0;
if (!all_zero ||
GetCurrentTimeInMilliseconds() < status_->next_progress_output)
return;
status->next_progress_output =
GetCurrentTimeInMilliseconds() + config->progressReportFrequencyMs;
status_->next_progress_output =
GetCurrentTimeInMilliseconds() + config_->progressReportFrequencyMs;
}
QueueManager::WriteStdout(IpcId::Unknown, out);
}
}
Config* config_;
ImportPipelineStatus* status_;
};
enum class FileParseQuery { NeedsParse, DoesNotNeedParse, NoSuchFile };
@ -453,38 +471,41 @@ void Indexer_Main(Config* config,
auto indexer = IIndexer::MakeClangIndexer();
while (true) {
status->num_active_threads++;
bool did_work = false;
EmitProgress(config, status);
{
ActiveThread active_thread(config, status);
// TODO: process all off IndexMain_DoIndex before calling
// IndexMain_DoCreateIndexUpdate for better icache behavior. We need to have
// some threads spinning on both though otherwise memory usage will get bad.
// TODO: process all off IndexMain_DoIndex before calling
// IndexMain_DoCreateIndexUpdate for better icache behavior. We need to
// have some threads spinning on both though otherwise memory usage will
// get bad.
// We need to make sure to run both IndexMain_DoParse and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
std::unique_ptr<ICacheManager> cache_manager = ICacheManager::Make(config);
bool did_parse = IndexMain_DoParse(
config, working_files, file_consumer_shared, timestamp_manager,
import_manager, cache_manager.get(), indexer.get());
// We need to make sure to run both IndexMain_DoParse and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
std::unique_ptr<ICacheManager> cache_manager =
ICacheManager::Make(config);
did_work = IndexMain_DoParse(config, working_files, file_consumer_shared,
timestamp_manager, import_manager,
cache_manager.get(), indexer.get()) ||
did_work;
bool did_create_update =
IndexMain_DoCreateIndexUpdate(timestamp_manager, cache_manager.get());
did_work = IndexMain_DoCreateIndexUpdate(timestamp_manager,
cache_manager.get()) ||
did_work;
bool did_load_previous = IndexMain_LoadPreviousIndex(cache_manager.get());
did_work = IndexMain_LoadPreviousIndex(cache_manager.get()) || did_work;
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates();
status->num_active_threads--;
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
if (!did_work)
did_work = IndexMergeIndexUpdates() || did_work;
}
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
if (!did_work) {
waiter->Wait(&queue->on_indexed, &queue->index_request,
&queue->on_id_mapped, &queue->load_previous_index);
}
@ -500,8 +521,7 @@ bool QueryDb_ImportMain(Config* config,
std::unique_ptr<ICacheManager> cache_manager = ICacheManager::Make(config);
auto* queue = QueueManager::instance();
status->num_active_threads++;
EmitProgress(config, status);
ActiveThread active_thread(config, status);
bool did_work = false;
@ -614,9 +634,6 @@ bool QueryDb_ImportMain(Config* config,
import_manager->DoneQueryDbImport(updated_file.path);
}
status->num_active_threads--;
EmitProgress(config, status);
return did_work;
}