More import pipeline simplifications.

Move already-import dependency checking to a separate stage.
This commit is contained in:
Jacob Dufault 2017-08-15 00:22:13 -07:00
parent 5a451d3ad1
commit 59851c06e0
2 changed files with 108 additions and 121 deletions

View File

@ -2,6 +2,9 @@
-xc++ -xc++
-std=c++11 -std=c++11
# Loguru
-DLOGURU_WITH_STREAMS=1
# Includes # Includes
# Windows # Windows
-IC:/Users/jacob/Desktop/cquery/third_party -IC:/Users/jacob/Desktop/cquery/third_party

View File

@ -547,25 +547,24 @@ void FilterCompletionResponse(Out_TextDocumentComplete* complete_response,
struct MappedIndexFile { struct Index_Request {
std::shared_ptr<IndexFile> file; std::string path;
std::shared_ptr<IdMap> ids; std::vector<std::string> args; // TODO: make this a string that is parsed lazily.
MappedIndexFile(std::shared_ptr<IndexFile> file, std::shared_ptr<IdMap> ids) Index_Request(const std::string& path, const std::vector<std::string>& args)
: file(file), ids(ids) {} : path(path), args(args) {}
}; };
struct Index_DoIdMap { struct Index_DoIdMap {
std::shared_ptr<IndexFile> current; std::unique_ptr<IndexFile> current;
PerformanceImportFile perf; std::unique_ptr<IndexFile> previous;
bool is_interactive;
explicit Index_DoIdMap( PerformanceImportFile perf;
std::shared_ptr<IndexFile> current, bool is_interactive = false;
bool load_previous = false;
Index_DoIdMap(
std::unique_ptr<IndexFile> current,
PerformanceImportFile perf, PerformanceImportFile perf,
bool is_interactive) bool is_interactive)
: current(std::move(current)), : current(std::move(current)),
@ -574,8 +573,16 @@ struct Index_DoIdMap {
}; };
struct Index_OnIdMapped { struct Index_OnIdMapped {
std::shared_ptr<MappedIndexFile> previous; struct File {
std::shared_ptr<MappedIndexFile> current; std::unique_ptr<IndexFile> file;
std::unique_ptr<IdMap> ids;
File(std::unique_ptr<IndexFile> file, std::unique_ptr<IdMap> ids)
: file(std::move(file)), ids(std::move(ids)) {}
};
std::unique_ptr<File> previous;
std::unique_ptr<File> current;
PerformanceImportFile perf; PerformanceImportFile perf;
bool is_interactive; bool is_interactive;
@ -590,7 +597,7 @@ struct Index_OnIndexed {
IndexUpdate update; IndexUpdate update;
PerformanceImportFile perf; PerformanceImportFile perf;
explicit Index_OnIndexed( Index_OnIndexed(
IndexUpdate& update, IndexUpdate& update,
PerformanceImportFile perf) PerformanceImportFile perf)
: update(update), perf(perf) {} : update(update), perf(perf) {}
@ -601,22 +608,7 @@ struct Index_OnIndexed {
struct IndexRequest {
std::string path;
std::vector<std::string> args; // TODO: make this a string that is parsed lazily.
IndexRequest(const std::string& path, const std::vector<std::string>& args) : path(path), args(args) {}
};
MAKE_REFLECT_STRUCT(IndexRequest, path, args);
struct IndexResult {
std::string file_path;
PerformanceImportFile perf;
IndexResult(const std::string& file_path, const PerformanceImportFile& perf) : file_path(file_path), perf(perf) {}
};
MAKE_REFLECT_STRUCT(IndexResult, file_path, perf);
@ -652,19 +644,18 @@ MAKE_REFLECT_STRUCT(IndexResult, file_path, perf);
struct QueueManager { struct QueueManager {
using IndexProcess_RequestQueue = ThreadedQueue<IndexRequest>; using Index_RequestQueue = ThreadedQueue<Index_Request>;
using IndexProcess_ResponseQueue = ThreadedQueue<IndexResult>;
using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>; using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>;
using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>; using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>; using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>;
IndexProcess_RequestQueue index_request; Index_RequestQueue index_request;
IndexProcess_ResponseQueue index_response;
Index_DoIdMapQueue do_id_map; Index_DoIdMapQueue do_id_map;
Index_DoIdMapQueue load_previous_index;
Index_OnIdMappedQueue on_id_mapped; Index_OnIdMappedQueue on_id_mapped;
Index_OnIndexedQueue on_indexed; Index_OnIndexedQueue on_indexed;
QueueManager(MultiQueueWaiter* waiter) : index_request(waiter), index_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {} QueueManager(MultiQueueWaiter* waiter) : index_request(waiter), do_id_map(waiter), load_previous_index(waiter), on_id_mapped(waiter), on_indexed(waiter) {}
}; };
void RegisterMessageTypes() { void RegisterMessageTypes() {
@ -758,34 +749,21 @@ struct CacheLoader {
return caches[path].get(); return caches[path].get();
} }
std::unique_ptr<IndexFile> TryTakeOrLoad(const std::string& path) {
auto it = caches.find(path);
if (it != caches.end()) {
auto result = std::move(it->second);
caches.erase(it);
return result;
}
return LoadCachedIndex(config_, path);
}
std::unordered_map<std::string, std::unique_ptr<IndexFile>> caches; std::unordered_map<std::string, std::unique_ptr<IndexFile>> caches;
Config* config_; Config* config_;
}; };
// Maintains the currently indexed file cache for the querydb process. This is
// needed for delta index updates.
struct CacheManager {
std::shared_ptr<MappedIndexFile> UpdateAndReturnOldFile(
std::shared_ptr<MappedIndexFile> entry) {
// Fetch previous value, if any.
std::shared_ptr<MappedIndexFile> previous_value;
const auto it = files_.find(entry->file->path);
if (it != files_.end()) {
previous_value = it->second;
}
// Update new value.
entry->file->ClearLargeState();
files_[entry->file->path] = entry;
// Return previous value.
return previous_value;
}
Config* config_;
std::unordered_map<std::string, std::shared_ptr<MappedIndexFile>> files_;
};
struct IndexManager { struct IndexManager {
std::unordered_set<std::string> files_being_indexed_; std::unordered_set<std::string> files_being_indexed_;
std::mutex mutex_; std::mutex mutex_;
@ -812,16 +790,14 @@ struct IndexManager {
//} // namespace //} // namespace
std::vector<IndexResult> DoParseFile( std::vector<Index_DoIdMap> DoParseFile(
Config* config, Config* config,
clang::Index* index, clang::Index* index,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
CacheLoader* cache_loader, CacheLoader* cache_loader,
const std::string& path, const std::string& path,
const std::vector<std::string>& args) { const std::vector<std::string>& args) {
LOG_S(INFO) << "Parsing " << path; std::vector<Index_DoIdMap> result;
std::vector<IndexResult> result;
IndexFile* previous_index = cache_loader->TryLoad(path); IndexFile* previous_index = cache_loader->TryLoad(path);
if (previous_index) { if (previous_index) {
@ -858,12 +834,19 @@ std::vector<IndexResult> DoParseFile(
if (!needs_reparse) { if (!needs_reparse) {
LOG_S(INFO) << "Skipping parse; no timestamp change for " << path; LOG_S(INFO) << "Skipping parse; no timestamp change for " << path;
// TODO/FIXME: real is_interactive
bool is_interactive = false;
// TODO/FIXME: real perf // TODO/FIXME: real perf
PerformanceImportFile perf; PerformanceImportFile perf;
result.push_back(IndexResult(path, perf)); result.push_back(Index_DoIdMap(cache_loader->TryTakeOrLoad(path), perf, is_interactive));
for (const std::string& dependency : previous_index->dependencies) { for (const std::string& dependency : previous_index->dependencies) {
// Only actually load the file if we haven't loaded it yet. Important
// for perf when files have lots of common dependencies.
if (!file_consumer_shared->Mark(dependency))
continue;
LOG_S(INFO) << "Emitting index result for " << dependency; LOG_S(INFO) << "Emitting index result for " << dependency;
result.push_back(IndexResult(dependency, perf)); result.push_back(Index_DoIdMap(cache_loader->TryTakeOrLoad(dependency), perf, is_interactive));
} }
return result; return result;
} }
@ -918,8 +901,10 @@ std::vector<IndexResult> DoParseFile(
WriteToCache(config, new_index->path, *new_index, new_index->file_contents_); WriteToCache(config, new_index->path, *new_index, new_index->file_contents_);
perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset(); perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset();
// TODO/FIXME: real is_interactive
bool is_interactive = false;
LOG_S(INFO) << "Emitting index result for " << new_index->path; LOG_S(INFO) << "Emitting index result for " << new_index->path;
result.push_back(IndexResult(new_index->path, perf)); result.push_back(Index_DoIdMap(std::move(new_index), perf, is_interactive));
} }
return result; return result;
@ -930,7 +915,7 @@ std::vector<IndexResult> DoParseFile(
// TODO: split index files into foo.cc.json, foo.cc.timestamp, foo.cc // TODO: split index files into foo.cc.json, foo.cc.timestamp, foo.cc
std::vector<IndexResult> ParseFile( std::vector<Index_DoIdMap> ParseFile(
Config* config, Config* config,
clang::Index* index, clang::Index* index,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
@ -948,48 +933,22 @@ std::vector<IndexResult> ParseFile(
bool IndexMain_DoParse( bool IndexMain_DoParse(
Config* config, Config* config,
QueueManager* queues, QueueManager* queue,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
clang::Index* index) { clang::Index* index) {
IndexRequest request = queues->index_request.Dequeue(); Index_Request request = queue->index_request.Dequeue();
Project::Entry entry; Project::Entry entry;
entry.filename = request.path; entry.filename = request.path;
entry.args = request.args; entry.args = request.args;
std::vector<IndexResult> responses = ParseFile(config, index, file_consumer_shared, entry); std::vector<Index_DoIdMap> responses = ParseFile(config, index, file_consumer_shared, entry);
for (auto response : responses) // TODO/FIXME: bulk enqueue so we don't lock so many times
queues->index_response.Enqueue(std::move(response)); for (Index_DoIdMap& response : responses)
return !responses.empty();
}
bool IndexMain_DoIndex(Config* config,
FileConsumer::SharedState* file_consumer_shared,
Project* project,
WorkingFiles* working_files,
clang::Index* index,
QueueManager* queue) {
optional<IndexResult> request = queue->index_response.TryDequeue();
if (!request)
return false;
std::unique_ptr<IndexFile> current_index =
LoadCachedIndex(config, request->file_path);
if (!current_index) {
std::cerr << "!!! Failed to load index for " + request->file_path + "\n";
return false;
}
assert(current_index);
// TODO: get real value for is_interactive
Index_DoIdMap response(std::move(current_index), request->perf,
false /*is_interactive*/);
queue->do_id_map.Enqueue(std::move(response)); queue->do_id_map.Enqueue(std::move(response));
return true; return !responses.empty();
} }
bool IndexMain_DoCreateIndexUpdate( bool IndexMain_DoCreateIndexUpdate(
@ -1045,6 +1004,18 @@ bool IndexMain_DoCreateIndexUpdate(
return true; return true;
} }
bool IndexMain_LoadPreviousIndex(Config* config, QueueManager* queue) {
optional<Index_DoIdMap> response = queue->load_previous_index.TryDequeue();
if (!response)
return false;
response->previous = LoadCachedIndex(config, response->current->path);
assert(response->previous);
queue->do_id_map.Enqueue(std::move(*response));
return true;
}
bool IndexMergeIndexUpdates(QueueManager* queue) { bool IndexMergeIndexUpdates(QueueManager* queue) {
// TODO/FIXME: it looks like there is a crash here? // TODO/FIXME: it looks like there is a crash here?
optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue(); optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue();
@ -1060,9 +1031,9 @@ bool IndexMergeIndexUpdates(QueueManager* queue) {
} }
did_merge = true; did_merge = true;
//Timer time; Timer time;
root->update.Merge(to_join->update); root->update.Merge(to_join->update);
//time.ResetAndPrint("[indexer] Joining two querydb updates"); time.ResetAndPrint("Joining two querydb updates");
} }
} }
@ -1083,28 +1054,28 @@ void IndexMain(Config* config,
// both though // both though
// otherwise memory usage will get bad. // otherwise memory usage will get bad.
bool did_parse = IndexMain_DoParse(config, queue, file_consumer_shared, &index); // We need to make sure to run both IndexMain_DoParse and
// We need to make sure to run both IndexMain_DoIndex and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed // work. Running both also lets the user query the partially constructed
// index. // index.
bool did_index = bool did_parse = IndexMain_DoParse(config, queue, file_consumer_shared, &index);
IndexMain_DoIndex(config, file_consumer_shared, project, working_files,
&index, queue);
bool did_create_update = bool did_create_update =
IndexMain_DoCreateIndexUpdate(queue); IndexMain_DoCreateIndexUpdate(queue);
bool did_merge = false;
bool did_load_previous = IndexMain_LoadPreviousIndex(config, queue);
// Nothing to index and no index updates to create, so join some already // Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread. // created index updates to reduce work on querydb thread.
if (!did_parse && !did_index && !did_create_update) bool did_merge = false;
if (!did_parse && !did_create_update && !did_load_previous)
did_merge = IndexMergeIndexUpdates(queue); did_merge = IndexMergeIndexUpdates(queue);
// We didn't do any work, so wait for a notification. // We didn't do any work, so wait for a notification.
if (!did_parse && !did_index && !did_create_update && !did_merge) if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
waiter->Wait( waiter->Wait(
{&queue->index_request, &queue->index_response, &queue->on_id_mapped, &queue->on_indexed}); {&queue->index_request, &queue->on_id_mapped, &queue->load_previous_index, &queue->on_indexed});
}
} }
} }
@ -1187,7 +1158,6 @@ void IndexMain(Config* config,
bool QueryDbMainLoop( bool QueryDbMainLoop(
Config* config, Config* config,
QueryDatabase* db, QueryDatabase* db,
CacheManager* db_cache,
MultiQueueWaiter* waiter, MultiQueueWaiter* waiter,
QueueManager* queue, QueueManager* queue,
Project* project, Project* project,
@ -1281,7 +1251,7 @@ bool QueryDbMainLoop(
//std::cerr << "[" << i << "/" << (project->entries.size() - 1) //std::cerr << "[" << i << "/" << (project->entries.size() - 1)
// << "] Dispatching index request for file " << entry.filename // << "] Dispatching index request for file " << entry.filename
// << std::endl; // << std::endl;
queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args)); queue->index_request.Enqueue(Index_Request(entry.filename, entry.args));
}); });
// We need to support multiple concurrent index processes. // We need to support multiple concurrent index processes.
@ -1344,7 +1314,7 @@ bool QueryDbMainLoop(
project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) { project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) {
LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1) LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1)
<< "] Dispatching index request for file " << entry.filename; << "] Dispatching index request for file " << entry.filename;
queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args)); queue->index_request.Enqueue(Index_Request(entry.filename, entry.args));
}); });
break; break;
} }
@ -1602,7 +1572,7 @@ bool QueryDbMainLoop(
// if so, ignore that index response. // if so, ignore that index response.
// TODO: send as priority request // TODO: send as priority request
Project::Entry entry = project->FindCompilationEntryForFile(path); Project::Entry entry = project->FindCompilationEntryForFile(path);
queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args)); queue->index_request.Enqueue(Index_Request(entry.filename, entry.args));
clang_complete->NotifySave(path); clang_complete->NotifySave(path);
@ -2470,17 +2440,32 @@ bool QueryDbMainLoop(
optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue(); optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue();
if (!request) if (!request)
break; break;
did_work = true; did_work = true;
// If the request does not have previous state and we have already imported
// it, load the previous state from disk and rerun IdMap logic later.
if (!request->previous &&
db->usr_to_file.find(LowerPathIfCaseInsensitive(request->current->path)) != db->usr_to_file.end()) {
assert(!request->load_previous);
request->load_previous = true;
queue->load_previous_index.Enqueue(std::move(*request));
continue;
}
Index_OnIdMapped response(request->perf, request->is_interactive); Index_OnIdMapped response(request->perf, request->is_interactive);
Timer time; Timer time;
assert(request->current); assert(request->current);
// Build IdMap for the new instance. Replace the value in the cache.
auto id_map_current = std::make_shared<IdMap>(db, request->current->id_cache); auto make_map = [db](std::unique_ptr<IndexFile> file) -> std::unique_ptr<Index_OnIdMapped::File> {
response.current = std::make_shared<MappedIndexFile>(request->current, id_map_current); if (!file)
response.previous = db_cache->UpdateAndReturnOldFile(response.current); return nullptr;
auto id_map = MakeUnique<IdMap>(db, file->id_cache);
return MakeUnique<Index_OnIdMapped::File>(std::move(file), std::move(id_map));
};
response.current = make_map(std::move(request->current));
response.previous = make_map(std::move(request->previous));
response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset();
queue->on_id_mapped.Enqueue(std::move(response)); queue->on_id_mapped.Enqueue(std::move(response));
@ -2536,10 +2521,9 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
// Run query db main loop. // Run query db main loop.
SetCurrentThreadName("querydb"); SetCurrentThreadName("querydb");
QueryDatabase db; QueryDatabase db;
CacheManager db_cache;
while (true) { while (true) {
bool did_work = QueryDbMainLoop( bool did_work = QueryDbMainLoop(
config, &db, &db_cache, waiter, &queue, config, &db, waiter, &queue,
&project, &file_consumer_shared, &working_files, &project, &file_consumer_shared, &working_files,
&clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); &clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get());
if (!did_work) { if (!did_work) {