From 4249fc4a388fe0a8723e9334c7027d9e67a38a29 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Fri, 7 Apr 2017 23:45:28 -0700 Subject: [PATCH] Implement new threading model for computing index updates (prepare IdMap on querydb thread) --- src/command_line.cc | 260 ++++++++++++++++++++++++------------------- src/query.cc | 106 ++++++++---------- src/query.h | 12 +- src/threaded_queue.h | 9 +- 4 files changed, 204 insertions(+), 183 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 6c76bd99..97cfe55b 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -30,7 +30,7 @@ const int kQueueSizeBytes = 1024 * 8; const int kMaxWorkspaceSearchResults = 1000; } -struct IndexTranslationUnitRequest { +struct Index_DoIndex { enum class Type { Import, Update @@ -40,18 +40,37 @@ struct IndexTranslationUnitRequest { std::vector args; Type type; - IndexTranslationUnitRequest(Type type) : type(type) {} + Index_DoIndex(Type type) : type(type) {} }; -struct IndexTranslationUnitResponse { +struct Index_DoIdMap { + std::unique_ptr previous; + std::unique_ptr current; + + explicit Index_DoIdMap(std::unique_ptr previous, + std::unique_ptr current) + : previous(std::move(previous)), + current(std::move(current)) {} +}; + +struct Index_OnIdMapped { + std::unique_ptr previous_index; + std::unique_ptr current_index; + std::unique_ptr previous_id_map; + std::unique_ptr current_id_map; +}; + +struct Index_OnIndexed { IndexUpdate update; - explicit IndexTranslationUnitResponse(IndexUpdate& update) : update(update) {} + explicit Index_OnIndexed(IndexUpdate& update) : update(update) {} }; // TODO: Rename TypedBidiMessageQueue to IpcTransport? using IpcMessageQueue = TypedBidiMessageQueue; -using IndexRequestQueue = ThreadedQueue; -using IndexResponseQueue = ThreadedQueue; +using Index_DoIndexQueue = ThreadedQueue>; +using Index_DoIdMapQueue = ThreadedQueue>; +using Index_OnIdMappedQueue = ThreadedQueue>; +using Index_OnIndexedQueue = ThreadedQueue>; template void SendMessage(IpcMessageQueue& t, MessageQueue* destination, TMessage& message) { @@ -222,22 +241,26 @@ std::string GetCachedFileName(std::string source_file) { return kCacheDirectory + source_file + ".json"; } -optional LoadCachedFile(std::string filename) { +std::unique_ptr LoadCachedFile(std::string filename) { // TODO FIXME FIXME FIXME - return nullopt; + return nullptr; std::string cache_file = GetCachedFileName(filename); std::ifstream cache; cache.open(GetCachedFileName(filename)); if (!cache.good()) - return nullopt; + return nullptr; std::string file_content = std::string( std::istreambuf_iterator(cache), std::istreambuf_iterator()); - return Deserialize(filename, file_content); + optional indexed = Deserialize(filename, file_content); + if (indexed) + return MakeUnique(indexed.value()); + + return nullptr; } void WriteToCache(std::string filename, IndexedFile& file) { @@ -250,111 +273,85 @@ void WriteToCache(std::string filename, IndexedFile& file) { cache.close(); } -void IndexMain(IndexRequestQueue* requests, IndexResponseQueue* responses) { - while (true) { - // Try to get a request. If there isn't one, sleep for a little while. - optional request = requests->TryDequeue(); - if (!request) { - // TODO: use CV to wakeup? - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - continue; - } + +bool IndexMain_DoIndex(Index_DoIndexQueue* queue_do_index, + Index_DoIdMapQueue* queue_do_id_map) { + optional> opt_index_request = queue_do_index->TryDequeue(); + if (!opt_index_request) + return false; + std::unique_ptr index_request = std::move(opt_index_request.value()); + + Timer time; + + std::unique_ptr old_index = LoadCachedFile(index_request->path); + time.ResetAndPrint("Loading cached index"); - Timer time; - - // If the index update is an import, then we will load the previous index - // into memory if we have a previous index. After that, we dispatch an - // update request to get the latest version. - if (request->type == IndexTranslationUnitRequest::Type::Import) { - request->type = IndexTranslationUnitRequest::Type::Update; - - // TODO: we're not serializing out the files cache. We only ever want to import references - // from the primary file though, so that should be ok. We need to cleanup indexer output. - optional old_index = LoadCachedFile(request->path); - if (old_index.has_value()) { - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO: We need to create IdMap on QueryDb thread. - IdMap old_id_map(nullptr, old_index->id_cache); - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - IndexUpdate update = IndexUpdate::CreateImport(old_id_map, old_index.value()); - IndexTranslationUnitResponse response(update); - responses->Enqueue(response); - time.ResetAndPrint("Loading cached index"); - requests->Enqueue(request.value()); - continue; - } - } - - assert(request->type == IndexTranslationUnitRequest::Type::Update); + // If the index update is an import, then we will load the previous index + // into memory if we have a previous index. After that, we dispatch an + // update request to get the latest version. + if (old_index && index_request->type == Index_DoIndex::Type::Import) { + auto response = MakeUnique(nullptr /*previous*/, std::move(old_index) /*current*/); + queue_do_id_map->Enqueue(std::move(response)); + index_request->type = Index_DoIndex::Type::Update; + queue_do_index->Enqueue(std::move(index_request)); + } + else { // Parse request and send a response. - std::cerr << "Parsing file " << request->path << " with args " - << Join(request->args, ", ") << std::endl; + std::cerr << "Parsing file " << index_request->path << " with args " + << Join(index_request->args, ", ") << std::endl; + + // TODO: parse should return unique_ptr. Then we can eliminate copy below. Make sure to not + // reuse moved pointer in WriteToCache if we do so. + IndexedFile current_index = Parse(index_request->path, index_request->args); - IndexedFile new_index = Parse(request->path, request->args); time.ResetAndPrint("Parsing/indexing"); - // If we have a cached index, that means it is already imported, which - // means we want to apply a delta update. - optional old_index = LoadCachedFile(request->path); - time.ResetAndPrint("Loading previous index"); - if (old_index) { - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO: We need to create IdMap on QueryDb thread. - IdMap old_id_map(nullptr, old_index->id_cache); - IdMap new_id_map(nullptr, new_index.id_cache); - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - - // Apply delta update. - IndexUpdate update = IndexUpdate::CreateDelta(old_id_map, new_id_map, old_index.value(), new_index); - IndexTranslationUnitResponse response(update); - time.ResetAndPrint("Creating delta index update/response"); - responses->Enqueue(response); - time.ResetAndPrint("Sending update to server"); - } - else { - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO: We need to create IdMap on QueryDb thread. - IdMap new_id_map(nullptr, new_index.id_cache); - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - // TODO/FIXME/TODO - - // Apply full update. - IndexUpdate update = IndexUpdate::CreateImport(new_id_map, new_index); - IndexTranslationUnitResponse response(update); - time.ResetAndPrint("Creating index update/response"); - responses->Enqueue(response); - time.ResetAndPrint("Sending update to server"); - } + auto response = MakeUnique(std::move(old_index) /*previous*/, MakeUnique(current_index) /*current*/); + queue_do_id_map->Enqueue(std::move(response)); // Cache file so we can diff it later. - WriteToCache(request->path, new_index); + WriteToCache(index_request->path, current_index); time.ResetAndPrint("Cache index update to disk"); } + + return true; +} + +bool IndexMain_DoCreateIndexUpdate(Index_OnIdMappedQueue* queue_on_id_mapped, + Index_OnIndexedQueue* queue_on_indexed) { + optional> opt_response = queue_on_id_mapped->TryDequeue(); + if (!opt_response) + return false; + std::unique_ptr response = std::move(opt_response.value()); + + Timer time; + IndexUpdate update = IndexUpdate::CreateDelta(response->previous_id_map.get(), response->current_id_map.get(), + response->previous_index.get(), response->current_index.get()); + time.ResetAndPrint("Creating delta IndexUpdate"); + auto reply = MakeUnique(update); + queue_on_indexed->Enqueue(std::move(reply)); + time.ResetAndPrint("Sending update to server"); + + return true; +} + +void IndexMain(Index_DoIndexQueue* queue_do_index, + Index_DoIdMapQueue* queue_do_id_map, + Index_OnIdMappedQueue* queue_on_id_mapped, + Index_OnIndexedQueue* queue_on_indexed) { + while (true) { + // TODO: process all off IndexMain_DoIndex before calling IndexMain_DoCreateIndexUpdate for + // better icache behavior. We need to have some threads spinning on both though + // otherwise memory usage will get bad. + + if (!IndexMain_DoIndex(queue_do_index, queue_do_id_map) && + !IndexMain_DoCreateIndexUpdate(queue_on_id_mapped, queue_on_indexed)) { + // TODO: use CV to wakeup? + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } } QueryableFile* FindFile(QueryableDatabase* db, const std::string& filename) { @@ -457,8 +454,10 @@ void AddCodeLens(std::vector* result, void QueryDbMainLoop( QueryableDatabase* db, IpcMessageQueue* language_client, - IndexRequestQueue* index_requests, - IndexResponseQueue* index_responses, + Index_DoIndexQueue* queue_do_index, + Index_DoIdMapQueue* queue_do_id_map, + Index_OnIdMappedQueue* queue_on_id_mapped, + Index_OnIndexedQueue* queue_on_indexed, Project* project, WorkingFiles* working_files, CompletionManager* completion_manager) { @@ -496,10 +495,10 @@ void QueryDbMainLoop( << "] Dispatching index request for file " << filepath << std::endl; - IndexTranslationUnitRequest request(IndexTranslationUnitRequest::Type::Import); - request.path = filepath; - request.args = entry.args; - index_requests->Enqueue(request); + auto request = MakeUnique(Index_DoIndex::Type::Import); + request->path = filepath; + request->args = entry.args; + queue_do_index->Enqueue(std::move(request)); } std::cerr << "Done" << std::endl; break; @@ -711,7 +710,7 @@ void QueryDbMainLoop( std::string query = msg->params.query; for (int i = 0; i < db->qualified_names.size(); ++i) { if (response.result.size() > kMaxWorkspaceSearchResults) { - std::cerr << "Query exceeded maximum number of responses (" << kMaxWorkspaceSearchResults << "), output may not contain all results"; + std::cerr << "Query exceeded maximum number of responses (" << kMaxWorkspaceSearchResults << "), output may not contain all results" << std::endl; break; } @@ -729,6 +728,8 @@ void QueryDbMainLoop( switch (symbol.kind) { case SymbolKind::File: { QueryableFile& def = db->files[symbol.idx]; + info.name = def.def.usr; + info.kind = lsSymbolKind::File; info.location.uri.SetPath(def.def.usr); break; } @@ -795,10 +796,32 @@ void QueryDbMainLoop( // TODO: consider rate-limiting and checking for IPC messages so we don't block // requests / we can serve partial requests. + + while (true) { - optional response = index_responses->TryDequeue(); - if (!response) + optional> opt_request = queue_do_id_map->TryDequeue(); + if (!opt_request) break; + std::unique_ptr request = std::move(opt_request.value()); + + auto response = MakeUnique(); + Timer time; + if (request->previous) { + response->previous_id_map = MakeUnique(db, request->previous->id_cache); + response->previous_index = std::move(request->previous); + } + response->current_id_map = MakeUnique(db, request->current->id_cache); + response->current_index = std::move(request->current); + time.ResetAndPrint("Create IdMap"); + + queue_on_id_mapped->Enqueue(std::move(response)); + } + + while (true) { + optional> opt_response = queue_on_indexed->TryDequeue(); + if (!opt_response) + break; + std::unique_ptr response = std::move(opt_response.value()); Timer time; db->ApplyIndexUpdate(&response->update); @@ -811,8 +834,11 @@ void QueryDbMain() { // Create queues. std::unique_ptr ipc = BuildIpcMessageQueue(kIpcLanguageClientName, kQueueSizeBytes); - IndexRequestQueue index_request_queue; - IndexResponseQueue index_response_queue; + Index_DoIndexQueue queue_do_index; + Index_DoIdMapQueue queue_do_id_map; + Index_OnIdMappedQueue queue_on_id_mapped; + Index_OnIndexedQueue queue_on_indexed; + Project project; WorkingFiles working_files; CompletionManager completion_manager(&project, &working_files); @@ -820,14 +846,14 @@ void QueryDbMain() { // Start indexer threads. for (int i = 0; i < kNumIndexers; ++i) { new std::thread([&]() { - IndexMain(&index_request_queue, &index_response_queue); + IndexMain(&queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed); }); } // Run query db main loop. QueryableDatabase db; while (true) { - QueryDbMainLoop(&db, ipc.get(), &index_request_queue, &index_response_queue, &project, &working_files, &completion_manager); + QueryDbMainLoop(&db, ipc.get(), &queue_do_index, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, &project, &working_files, &completion_manager); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } diff --git a/src/query.cc b/src/query.cc index cc7cefaf..865701b8 100644 --- a/src/query.cc +++ b/src/query.cc @@ -452,10 +452,8 @@ int GetOrAddSymbol(QueryableDatabase* query_db, SymbolKind kind, const Usr& usr) IdMap::IdMap(QueryableDatabase* query_db, const IdCache& local_ids) : local_ids(local_ids) { - assert(query_db); // TODO: remove after testing. - index_file_id = GetQueryFileIdFromUsr(query_db, local_ids.primary_file); - + cached_type_ids_.reserve(local_ids.type_id_to_usr.size()); for (const auto& entry : local_ids.type_id_to_usr) cached_type_ids_[entry.first] = GetQueryTypeIdFromUsr(query_db, entry.second); @@ -489,19 +487,16 @@ IdMap::IdMap(QueryableDatabase* query_db, const IdCache& local_ids) // static -IndexUpdate IndexUpdate::CreateImport(const IdMap& id_map, IndexedFile& file) { - // Return standard diff constructor but with an empty file so everything is - // added. - IndexedFile previous(file.path); - return IndexUpdate(id_map, id_map, previous, file); +IndexUpdate IndexUpdate::CreateDelta(const IdMap* previous_id_map, const IdMap* current_id_map, IndexedFile* previous, IndexedFile* current) { + if (!previous_id_map) { + assert(!previous); + IndexedFile previous(current->path); + return IndexUpdate(*current_id_map, *current_id_map, previous, *current); + } + return IndexUpdate(*previous_id_map, *current_id_map, *previous, *current); } -// static -IndexUpdate IndexUpdate::CreateDelta(const IdMap& current_id_map, const IdMap& previous_id_map, IndexedFile& current, IndexedFile& updated) { - return IndexUpdate(current_id_map, previous_id_map, current, updated); -} - -IndexUpdate::IndexUpdate(const IdMap& current_id_map, const IdMap& previous_id_map, IndexedFile& previous_file, IndexedFile& current_file) { +IndexUpdate::IndexUpdate(const IdMap& previous_id_map, const IdMap& current_id_map, IndexedFile& previous_file, IndexedFile& current_file) { // |query_name| is the name of the variable on the query type. // |index_name| is the name of the variable on the index type. // |type| is the type of the variable. @@ -638,7 +633,16 @@ void IndexUpdate::Merge(const IndexUpdate& update) { - +void UpdateQualifiedName(QueryableDatabase* db, int* qualified_name_index, SymbolKind kind, int symbol_index, const std::string& name) { + if (*qualified_name_index == -1) { + db->qualified_names.push_back(name); + db->symbols.push_back(SymbolIdx(kind, symbol_index)); + *qualified_name_index = db->qualified_names.size() - 1; + } + else { + db->qualified_names[*qualified_name_index] = name; + } +} @@ -654,67 +658,53 @@ void QueryableDatabase::RemoveUsrs(const std::vector& to_remove) { void QueryableDatabase::ImportOrUpdate(const std::vector& updates) { for (auto& def : updates) { auto it = usr_to_symbol.find(def.usr); - if (it == usr_to_symbol.end()) { - qualified_names.push_back(def.usr); - symbols.push_back(SymbolIdx(SymbolKind::File, files.size())); - usr_to_symbol[def.usr] = SymbolIdx(SymbolKind::File, files.size()); - files.push_back(QueryableFile(def)); - } - else { - QueryableFile& existing = files[it->second.idx]; - existing.def = def; - } + assert(it != usr_to_symbol.end()); + + QueryableFile& existing = files[it->second.idx]; + existing.def = def; + UpdateQualifiedName(this, &existing.qualified_name_idx, SymbolKind::File, it->second.idx, def.usr); } } void QueryableDatabase::ImportOrUpdate(const std::vector& updates) { for (auto& def : updates) { + if (!def.definition_extent) + continue; + auto it = usr_to_symbol.find(def.usr); - if (it == usr_to_symbol.end()) { - qualified_names.push_back(def.qualified_name); - symbols.push_back(SymbolIdx(SymbolKind::Type, types.size())); - usr_to_symbol[def.usr] = SymbolIdx(SymbolKind::Type, types.size()); - types.push_back(QueryableTypeDef(def)); - } - else { - QueryableTypeDef& existing = types[it->second.idx]; - if (def.definition_extent) - existing.def = def; - } + assert(it != usr_to_symbol.end()); + + QueryableTypeDef& existing = types[it->second.idx]; + existing.def = def; + UpdateQualifiedName(this, &existing.qualified_name_idx, SymbolKind::Type, it->second.idx, def.usr); } } void QueryableDatabase::ImportOrUpdate(const std::vector& updates) { for (auto& def : updates) { + if (!def.definition_extent) + continue; + auto it = usr_to_symbol.find(def.usr); - if (it == usr_to_symbol.end()) { - qualified_names.push_back(def.qualified_name); - symbols.push_back(SymbolIdx(SymbolKind::Func, funcs.size())); - usr_to_symbol[def.usr] = SymbolIdx(SymbolKind::Func, funcs.size()); - funcs.push_back(QueryableFuncDef(def)); - } - else { - QueryableFuncDef& existing = funcs[it->second.idx]; - if (def.definition_extent) - existing.def = def; - } + assert(it != usr_to_symbol.end()); + + QueryableFuncDef& existing = funcs[it->second.idx]; + existing.def = def; + UpdateQualifiedName(this, &existing.qualified_name_idx, SymbolKind::Func, it->second.idx, def.usr); } } void QueryableDatabase::ImportOrUpdate(const std::vector& updates) { for (auto& def : updates) { + if (!def.definition_extent) + continue; + auto it = usr_to_symbol.find(def.usr); - if (it == usr_to_symbol.end()) { - qualified_names.push_back(def.qualified_name); - symbols.push_back(SymbolIdx(SymbolKind::Var, vars.size())); - usr_to_symbol[def.usr] = SymbolIdx(SymbolKind::Var, vars.size()); - vars.push_back(QueryableVarDef(def)); - } - else { - QueryableVarDef& existing = vars[it->second.idx]; - if (def.definition_extent) - existing.def = def; - } + assert(it != usr_to_symbol.end()); + + QueryableVarDef& existing = vars[it->second.idx]; + existing.def = def; + UpdateQualifiedName(this, &existing.qualified_name_idx, SymbolKind::Var, it->second.idx, def.usr); } } diff --git a/src/query.h b/src/query.h index 1c94e54b..b3c1acf0 100644 --- a/src/query.h +++ b/src/query.h @@ -118,6 +118,7 @@ struct QueryableFile { using DefUpdate = Def; DefUpdate def; + int qualified_name_idx = -1; QueryableFile(const Usr& usr) { def.usr = usr; } QueryableFile(const Def& def) : def(def) {} @@ -134,6 +135,7 @@ struct QueryableTypeDef { std::vector derived; std::vector instantiations; std::vector uses; + int qualified_name_idx = -1; QueryableTypeDef(const Usr& usr) : def(usr) {} QueryableTypeDef(const DefUpdate& def) : def(def) {} @@ -152,6 +154,7 @@ struct QueryableFuncDef { std::vector derived; std::vector callers; std::vector uses; + int qualified_name_idx = -1; QueryableFuncDef(const Usr& usr) : def(usr) {} QueryableFuncDef(const DefUpdate& def) : def(def) {} @@ -164,6 +167,7 @@ struct QueryableVarDef { DefUpdate def; std::vector uses; + int qualified_name_idx = -1; QueryableVarDef(const Usr& usr) : def(usr) {} QueryableVarDef(const DefUpdate& def) : def(def) {} @@ -181,9 +185,9 @@ struct SymbolIdx { struct IndexUpdate { - // Creates a new IndexUpdate that will import |file|. - static IndexUpdate CreateImport(const IdMap& id_map, IndexedFile& file); - static IndexUpdate CreateDelta(const IdMap& current_id_map, const IdMap& previous_id_map, IndexedFile& current, IndexedFile& updated); + // Creates a new IndexUpdate based on the delta from previous to current. If + // no delta computation should be done just pass null for previous. + static IndexUpdate CreateDelta(const IdMap* previous_id_map, const IdMap* current_id_map, IndexedFile* previous, IndexedFile* current); // Merge |update| into this update; this can reduce overhead / index update // work can be parallelized. @@ -217,7 +221,7 @@ struct IndexUpdate { // Creates an index update assuming that |previous| is already // in the index, so only the delta between |previous| and |current| // will be applied. - IndexUpdate(const IdMap& current_id_map, const IdMap& previous_id_map, IndexedFile& previous, IndexedFile& current); + IndexUpdate(const IdMap& previous_id_map, const IdMap& current_id_map, IndexedFile& previous, IndexedFile& current); }; diff --git a/src/threaded_queue.h b/src/threaded_queue.h index c496b7ab..ac59bb9d 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -15,9 +15,9 @@ template class ThreadedQueue { public: // Add an element to the queue. - void Enqueue(T t) { + void Enqueue(T&& t) { std::lock_guard lock(mutex_); - queue_.push(t); + queue_.push(std::move(t)); cv_.notify_one(); } @@ -29,7 +29,8 @@ public: // release lock as long as the wait and reaquire it afterwards. cv_.wait(lock); } - T val = queue_.front(); + + auto val = std::move(queue_.front()); queue_.pop(); return val; } @@ -41,7 +42,7 @@ public: if (queue_.empty()) return nullopt; - T val = queue_.front(); + auto val = std::move(queue_.front()); queue_.pop(); return val; }