Decouple QueryDb_ImportMain

This commit is contained in:
Fangrui Song 2018-02-04 19:38:57 -08:00
parent a8fb2264a9
commit 3839d1e5ab
5 changed files with 139 additions and 114 deletions

View File

@ -146,19 +146,11 @@ bool QueryDbMainLoop(Config* config,
CodeCompleteCache* non_global_code_complete_cache, CodeCompleteCache* non_global_code_complete_cache,
CodeCompleteCache* signature_cache) { CodeCompleteCache* signature_cache) {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
bool did_work = false;
std::vector<std::unique_ptr<BaseIpcMessage>> messages = std::vector<std::unique_ptr<BaseIpcMessage>> messages =
queue->for_querydb.DequeueAll(); queue->for_querydb.DequeueAll();
bool did_work = messages.size();
for (auto& message : messages) { for (auto& message : messages) {
did_work = true; QueryDb_Handle(message);
for (MessageHandler* handler : *MessageHandler::message_handlers) {
if (handler->GetId() == message->method_id) {
handler->Run(std::move(message));
break;
}
}
if (message) { if (message) {
LOG_S(FATAL) << "Exiting; unhandled IPC message " LOG_S(FATAL) << "Exiting; unhandled IPC message "
<< IpcIdToString(message->method_id); << IpcIdToString(message->method_id);

View File

@ -18,7 +18,6 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -381,7 +380,8 @@ void ParseFile(Config* config,
true /*write_to_disk*/)); true /*write_to_disk*/));
} }
QueueManager::instance()->do_id_map.EnqueueAll(std::move(result)); QueueManager::instance()->do_id_map.EnqueueAll(std::move(result),
request.is_interactive);
} }
bool IndexMain_DoParse( bool IndexMain_DoParse(
@ -466,7 +466,7 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {
#endif #endif
Index_OnIndexed reply(update, response->perf); Index_OnIndexed reply(update, response->perf);
queue->on_indexed.Enqueue(std::move(reply)); queue->on_indexed.Enqueue(std::move(reply), response->is_interactive);
return true; return true;
} }
@ -609,24 +609,20 @@ void Indexer_Main(Config* config,
} }
} }
bool QueryDb_ImportMain(Config* config, void QueryDb_Handle(std::unique_ptr<BaseIpcMessage>& message) {
for (MessageHandler* handler : *MessageHandler::message_handlers) {
if (handler->GetId() == message->method_id) {
handler->Run(std::move(message));
break;
}
}
}
namespace {
void QueryDb_DoIdMap(QueueManager* queue,
QueryDatabase* db, QueryDatabase* db,
ImportManager* import_manager, ImportManager* import_manager,
ImportPipelineStatus* status, Index_DoIdMap* request) {
SemanticHighlightSymbolCache* semantic_cache,
WorkingFiles* working_files) {
auto* queue = QueueManager::instance();
ActiveThread active_thread(config, status);
bool did_work = false;
while (true) {
optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue();
if (!request)
break;
did_work = true;
assert(request->current); assert(request->current);
// If the request does not have previous state and we have already imported // If the request does not have previous state and we have already imported
@ -638,7 +634,7 @@ bool QueryDb_ImportMain(Config* config,
assert(!request->load_previous); assert(!request->load_previous);
request->load_previous = true; request->load_previous = true;
queue->load_previous_index.Enqueue(std::move(*request)); queue->load_previous_index.Enqueue(std::move(*request));
continue; return;
} }
// Check if the file is already being imported into querydb. If it is, drop // Check if the file is already being imported into querydb. If it is, drop
@ -649,11 +645,11 @@ bool QueryDb_ImportMain(Config* config,
if (!import_manager->StartQueryDbImport(request->current->path)) { if (!import_manager->StartQueryDbImport(request->current->path)) {
LOG_S(INFO) << "Dropping index as it is already being imported for " LOG_S(INFO) << "Dropping index as it is already being imported for "
<< request->current->path; << request->current->path;
continue; return;
} }
Index_OnIdMapped response(request->cache_manager, request->perf, request->is_interactive, Index_OnIdMapped response(request->cache_manager, request->perf,
request->write_to_disk); request->is_interactive, request->write_to_disk);
Timer time; Timer time;
auto make_map = [db](std::unique_ptr<IndexFile> file) auto make_map = [db](std::unique_ptr<IndexFile> file)
@ -670,15 +666,15 @@ bool QueryDb_ImportMain(Config* config,
response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset();
queue->on_id_mapped.Enqueue(std::move(response)); queue->on_id_mapped.Enqueue(std::move(response));
} }
while (true) {
optional<Index_OnIndexed> response = queue->on_indexed.TryDequeue();
if (!response)
break;
did_work = true;
void QueryDb_OnIndexed(QueueManager* queue,
QueryDatabase* db,
ImportManager* import_manager,
ImportPipelineStatus* status,
SemanticHighlightSymbolCache* semantic_cache,
WorkingFiles* working_files,
Index_OnIndexed* response) {
Timer time; Timer time;
db->ApplyIndexUpdate(&response->update); db->ApplyIndexUpdate(&response->update);
time.ResetAndPrint("Applying index update for " + time.ResetAndPrint("Applying index update for " +
@ -709,6 +705,36 @@ bool QueryDb_ImportMain(Config* config,
// update. // update.
import_manager->DoneQueryDbImport(updated_file.value.path); import_manager->DoneQueryDbImport(updated_file.value.path);
} }
}
} // namespace
bool QueryDb_ImportMain(Config* config,
QueryDatabase* db,
ImportManager* import_manager,
ImportPipelineStatus* status,
SemanticHighlightSymbolCache* semantic_cache,
WorkingFiles* working_files) {
auto* queue = QueueManager::instance();
ActiveThread active_thread(config, status);
bool did_work = false;
while (true) {
optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue();
if (!request)
break;
did_work = true;
QueryDb_DoIdMap(queue, db, import_manager, &*request);
}
while (true) {
optional<Index_OnIndexed> response = queue->on_indexed.TryDequeue();
if (!response)
break;
did_work = true;
QueryDb_OnIndexed(queue, db, import_manager, status, semantic_cache,
working_files, &*response);
} }
return did_work; return did_work;

View File

@ -4,6 +4,7 @@
#include <clang-c/Index.h> #include <clang-c/Index.h>
#include <atomic> #include <atomic>
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -42,6 +43,9 @@ void Indexer_Main(Config* config,
WorkingFiles* working_files, WorkingFiles* working_files,
MultiQueueWaiter* waiter); MultiQueueWaiter* waiter);
struct BaseIpcMessage;
void QueryDb_Handle(std::unique_ptr<BaseIpcMessage>& message);
bool QueryDb_ImportMain(Config* config, bool QueryDb_ImportMain(Config* config,
QueryDatabase* db, QueryDatabase* db,
ImportManager* import_manager, ImportManager* import_manager,

View File

@ -863,7 +863,6 @@ void QueryDatabase::ApplyIndexUpdate(IndexUpdate* update) {
AddRangeWithGen(&def.def_var_name, merge_update.to_add, def.gen); \ AddRangeWithGen(&def.def_var_name, merge_update.to_add, def.gen); \
RemoveRangeWithGen(&def.def_var_name, merge_update.to_remove); \ RemoveRangeWithGen(&def.def_var_name, merge_update.to_remove); \
VerifyUnique(def.def_var_name); \ VerifyUnique(def.def_var_name); \
UpdateGen(this, def.def_var_name); \
} }
for (const std::string& filename : update->files_removed) for (const std::string& filename : update->files_removed)

View File

@ -110,19 +110,12 @@ struct ThreadedQueue : public BaseThreadQueue {
// Returns the number of elements in the queue. This is lock-free. // Returns the number of elements in the queue. This is lock-free.
size_t Size() const { return total_count_; } size_t Size() const { return total_count_; }
// Add an element to the front of the queue.
void PriorityEnqueue(T&& t) {
std::lock_guard<std::mutex> lock(mutex_);
priority_.push(std::move(t));
++total_count_;
waiter_->cv.notify_one();
if (waiter1_)
waiter1_->cv.notify_one();
}
// Add an element to the queue. // Add an element to the queue.
void Enqueue(T&& t) { void Enqueue(T&& t, bool priority = false) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (priority)
priority_.push(std::move(t));
else
queue_.push(std::move(t)); queue_.push(std::move(t));
++total_count_; ++total_count_;
waiter_->cv.notify_one(); waiter_->cv.notify_one();
@ -130,8 +123,13 @@ struct ThreadedQueue : public BaseThreadQueue {
waiter1_->cv.notify_one(); waiter1_->cv.notify_one();
} }
// Add an element to the front of the queue.
void PriorityEnqueue(T&& t) {
Enqueue(std::move(t), true);
}
// Add a set of elements to the queue. // Add a set of elements to the queue.
void EnqueueAll(std::vector<T>&& elements) { void EnqueueAll(std::vector<T>&& elements, bool priority = false) {
if (elements.empty()) if (elements.empty())
return; return;
@ -140,6 +138,9 @@ struct ThreadedQueue : public BaseThreadQueue {
total_count_ += elements.size(); total_count_ += elements.size();
for (T& element : elements) { for (T& element : elements) {
if (priority)
priority_.push(std::move(element));
else
queue_.push(std::move(element)); queue_.push(std::move(element));
} }
elements.clear(); elements.clear();
@ -202,28 +203,31 @@ struct ThreadedQueue : public BaseThreadQueue {
// Get the first element from the queue without blocking. Returns a null // Get the first element from the queue without blocking. Returns a null
// value if the queue is empty. // value if the queue is empty.
template <typename TAction> optional<T> TryDequeueHelper(int which) {
optional<T> TryDequeuePlusAction(TAction action) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (priority_.empty() && queue_.empty())
return nullopt;
auto execute = [&](std::queue<T>* q) { auto execute = [&](std::queue<T>* q) {
auto val = std::move(q->front()); auto val = std::move(q->front());
q->pop(); q->pop();
--total_count_; --total_count_;
action(val);
return std::move(val); return std::move(val);
}; };
if (!priority_.empty()) if (which & 2 && priority_.size())
return execute(&priority_); return execute(&priority_);
if (which & 1 && queue_.size())
return execute(&queue_); return execute(&queue_);
return nullopt;
} }
optional<T> TryDequeue() { optional<T> TryDequeue() {
return TryDequeuePlusAction([](const T&) {}); return TryDequeueHelper(3);
}
optional<T> TryDequeueLow() {
return TryDequeueHelper(1);
}
optional<T> TryDequeueHigh() {
return TryDequeueHelper(2);
} }
mutable std::mutex mutex_; mutable std::mutex mutex_;