Simplify import dedup by allowing indexer to reparse the same document concurrently. Removes a critical section simplifies the code.

Instead we dedup the imports between creating an id map and applying the update.
This commit is contained in:
Jacob Dufault 2017-09-13 21:50:36 -07:00
parent 104cfd167c
commit cd58eafd90

View File

@ -727,26 +727,26 @@ void InsertSymbolIntoResult(QueryDatabase* db, WorkingFiles* working_files, Symb
// Manages files inside of the indexing pipeline so we don't have the same file // Manages files inside of the indexing pipeline so we don't have the same file
// being imported multiple times. // being imported multiple times.
//
// NOTE: This is not thread safe and should only be used on the querydb thread.
struct ImportManager { struct ImportManager {
// Try to import the given file. Returns true if the file should be imported. // Try to import the given file into querydb. We should only ever be
bool StartImport(const std::string& path) { // importing a file into querydb once per file. Returns true if the file
std::lock_guard<std::mutex> guard(mutex_); // can be imported.
bool StartQueryDbImport(const std::string& path) {
return import_.insert(path).second; return import_.insert(path).second;
} }
// The file has been fully imported and can be imported again later on. // The file has been fully imported and can be imported again later on.
void DoneImport(const std::string& path) { void DoneQueryDbImport(const std::string& path) {
std::lock_guard<std::mutex> guard(mutex_);
import_.erase(path); import_.erase(path);
} }
// Returns true if there any any files currently being imported. // Returns true if there any any files currently being imported.
bool HasActiveImports() { bool HasActiveImports() {
std::lock_guard<std::mutex> guard(mutex_);
return !import_.empty(); return !import_.empty();
} }
std::mutex mutex_;
std::unordered_set<std::string> import_; std::unordered_set<std::string> import_;
}; };
@ -1012,15 +1012,11 @@ bool IndexMain_DoParse(
WorkingFiles* working_files, WorkingFiles* working_files,
QueueManager* queue, QueueManager* queue,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
ImportManager* import_manager,
TimestampManager* timestamp_manager, TimestampManager* timestamp_manager,
clang::Index* index) { clang::Index* index) {
bool can_import = false; optional<Index_Request> request = queue->index_request.TryDequeue();
optional<Index_Request> request = queue->index_request.TryDequeuePlusAction([&](const Index_Request& request) { if (!request)
can_import = import_manager->StartImport(request.path);
});
if (!request || !can_import)
return false; return false;
Project::Entry entry; Project::Entry entry;
@ -1028,15 +1024,6 @@ bool IndexMain_DoParse(
entry.args = request->args; entry.args = request->args;
std::vector<Index_DoIdMap> responses = ParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, request->is_interactive, entry, request->contents); std::vector<Index_DoIdMap> responses = ParseFile(config, working_files, index, file_consumer_shared, timestamp_manager, request->is_interactive, entry, request->contents);
// Unmark file as imported if indexing failed.
bool found_import = false;
for (auto& response : responses) {
if (response.current->path == request->path)
found_import = true;
}
if (!found_import)
import_manager->DoneImport(request->path);
// Don't bother sending an IdMap request if there are no responses. // Don't bother sending an IdMap request if there are no responses.
if (responses.empty()) if (responses.empty())
return false; return false;
@ -1144,7 +1131,6 @@ bool IndexMergeIndexUpdates(QueueManager* queue) {
WorkThread::Result IndexMain( WorkThread::Result IndexMain(
Config* config, Config* config,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
ImportManager* import_manager,
TimestampManager* timestamp_manager, TimestampManager* timestamp_manager,
Project* project, Project* project,
WorkingFiles* working_files, WorkingFiles* working_files,
@ -1163,7 +1149,7 @@ WorkThread::Result IndexMain(
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any // IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed // work. Running both also lets the user query the partially constructed
// index. // index.
bool did_parse = IndexMain_DoParse(config, working_files, queue, file_consumer_shared, import_manager, timestamp_manager, &index); bool did_parse = IndexMain_DoParse(config, working_files, queue, file_consumer_shared, timestamp_manager, &index);
bool did_create_update = bool did_create_update =
IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager); IndexMain_DoCreateIndexUpdate(config, queue, timestamp_manager);
@ -1194,6 +1180,11 @@ bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import
break; break;
did_work = true; did_work = true;
// Check if the file is already being imported into querydb. If it is, drop
// the request.
if (!import_manager->StartQueryDbImport(request->current->path))
continue;
// If the request does not have previous state and we have already imported // If the request does not have previous state and we have already imported
// it, load the previous state from disk and rerun IdMap logic later. Do not // it, load the previous state from disk and rerun IdMap logic later. Do not
// do this if we have already attempted in the past. // do this if we have already attempted in the past.
@ -1251,7 +1242,7 @@ bool QueryDb_ImportMain(Config* config, QueryDatabase* db, ImportManager* import
// PERF: This will acquire a lock. If querydb ends being up being slow we // PERF: This will acquire a lock. If querydb ends being up being slow we
// could push this request to another queue which runs on an indexer. // could push this request to another queue which runs on an indexer.
import_manager->DoneImport(updated_file.path); import_manager->DoneQueryDbImport(updated_file.path);
} }
time.Reset(); time.Reset();
@ -1419,7 +1410,7 @@ bool QueryDbMainLoop(
std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl; std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl;
for (int i = 0; i < config->indexerCount; ++i) { for (int i = 0; i < config->indexerCount; ++i) {
WorkThread::StartThread("indexer" + std::to_string(i), [&]() { WorkThread::StartThread("indexer" + std::to_string(i), [&]() {
return IndexMain(config, file_consumer_shared, import_manager, timestamp_manager, project, working_files, waiter, queue); return IndexMain(config, file_consumer_shared, timestamp_manager, project, working_files, waiter, queue);
}); });
} }