Simplify queue management

This commit is contained in:
Jacob Dufault 2017-08-01 20:23:37 -07:00
parent c89f651cd8
commit cac5dcaf8f

View File

@ -718,10 +718,19 @@ MAKE_REFLECT_STRUCT(IndexProcess_Response, type, index_result_args);
using IndexProcess_ResponseQueue = ThreadedQueue<IndexProcess_Response::IndexResultArgs>; struct QueueManager {
using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>; using IndexProcess_ResponseQueue = ThreadedQueue<IndexProcess_Response::IndexResultArgs>;
using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>; using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>; using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>;
IndexProcess_ResponseQueue process_response;
Index_DoIdMapQueue do_id_map;
Index_OnIdMappedQueue on_id_mapped;
Index_OnIndexedQueue on_indexed;
QueueManager(MultiQueueWaiter* waiter) : process_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {}
};
void RegisterMessageTypes() { void RegisterMessageTypes() {
MessageRegistry::instance()->Register<Ipc_CancelRequest>(); MessageRegistry::instance()->Register<Ipc_CancelRequest>();
@ -873,10 +882,9 @@ bool IndexMain_DoIndex(Config* config,
Project* project, Project* project,
WorkingFiles* working_files, WorkingFiles* working_files,
clang::Index* index, clang::Index* index,
IndexProcess_ResponseQueue* queue_index_response, QueueManager* queue) {
Index_DoIdMapQueue* queue_do_id_map) {
optional<IndexProcess_Response::IndexResultArgs> request = optional<IndexProcess_Response::IndexResultArgs> request =
queue_index_response->TryDequeue(); queue->process_response.TryDequeue();
if (!request) if (!request)
return false; return false;
@ -892,20 +900,20 @@ bool IndexMain_DoIndex(Config* config,
// TODO: get real value for is_interactive // TODO: get real value for is_interactive
Index_DoIdMap response(std::move(current_index), request->perf, Index_DoIdMap response(std::move(current_index), request->perf,
false /*is_interactive*/); false /*is_interactive*/);
queue_do_id_map->Enqueue(std::move(response)); queue->do_id_map.Enqueue(std::move(response));
return true; return true;
} }
bool IndexMain_DoCreateIndexUpdate( bool IndexMain_DoCreateIndexUpdate(
Index_OnIdMappedQueue* queue_on_id_mapped, QueueManager* queue) {
Index_OnIndexedQueue* queue_on_indexed) { // TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed.
optional<Index_OnIdMapped> response = queue_on_id_mapped->TryDequeue(); optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue();
if (!response) if (!response)
return false; return false;
Timer time; Timer time;
IdMap* previous_id_map = nullptr; IdMap* previous_id_map = nullptr;
IndexFile* previous_index = nullptr; IndexFile* previous_index = nullptr;
if (response->previous) { if (response->previous) {
@ -944,22 +952,22 @@ bool IndexMain_DoCreateIndexUpdate(
#endif #endif
Index_OnIndexed reply(update, response->perf); Index_OnIndexed reply(update, response->perf);
queue_on_indexed->Enqueue(std::move(reply)); queue->on_indexed.Enqueue(std::move(reply));
return true; return true;
} }
bool IndexMergeIndexUpdates(Index_OnIndexedQueue* queue_on_indexed) { bool IndexMergeIndexUpdates(QueueManager* queue) {
// TODO/FIXME: it looks like there is a crash here? // TODO/FIXME: it looks like there is a crash here?
optional<Index_OnIndexed> root = queue_on_indexed->TryDequeue(); optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue();
if (!root) if (!root)
return false; return false;
bool did_merge = false; bool did_merge = false;
while (true) { while (true) {
optional<Index_OnIndexed> to_join = queue_on_indexed->TryDequeue(); optional<Index_OnIndexed> to_join = queue->on_indexed.TryDequeue();
if (!to_join) { if (!to_join) {
queue_on_indexed->Enqueue(std::move(*root)); queue->on_indexed.Enqueue(std::move(*root));
return did_merge; return did_merge;
} }
@ -975,10 +983,7 @@ void IndexMain(Config* config,
Project* project, Project* project,
WorkingFiles* working_files, WorkingFiles* working_files,
MultiQueueWaiter* waiter, MultiQueueWaiter* waiter,
IndexProcess_ResponseQueue* queue_index_response, QueueManager* queue) {
Index_DoIdMapQueue* queue_do_id_map,
Index_OnIdMappedQueue* queue_on_id_mapped,
Index_OnIndexedQueue* queue_on_indexed) {
SetCurrentThreadName("indexer"); SetCurrentThreadName("indexer");
// TODO: dispose of index after it is not used for a while. // TODO: dispose of index after it is not used for a while.
clang::Index index(1, 0); clang::Index index(1, 0);
@ -996,20 +1001,20 @@ void IndexMain(Config* config,
// index. // index.
bool did_index = bool did_index =
IndexMain_DoIndex(config, file_consumer_shared, project, working_files, IndexMain_DoIndex(config, file_consumer_shared, project, working_files,
&index, queue_index_response, queue_do_id_map); &index, queue);
bool did_create_update = bool did_create_update =
IndexMain_DoCreateIndexUpdate(queue_on_id_mapped, queue_on_indexed); IndexMain_DoCreateIndexUpdate(queue);
bool did_merge = false; bool did_merge = false;
// Nothing to index and no index updates to create, so join some already // Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread. // created index updates to reduce work on querydb thread.
if (!did_index && !did_create_update) if (!did_index && !did_create_update)
did_merge = IndexMergeIndexUpdates(queue_on_indexed); did_merge = IndexMergeIndexUpdates(queue);
// We didn't do any work, so wait for a notification. // We didn't do any work, so wait for a notification.
if (!did_index && !did_create_update && !did_merge) if (!did_index && !did_create_update && !did_merge)
waiter->Wait( waiter->Wait(
{queue_index_response, queue_on_id_mapped, queue_on_indexed}); {&queue->process_response, &queue->on_id_mapped, &queue->on_indexed});
} }
} }
@ -1031,9 +1036,9 @@ struct OutOfProcessQueryDbResponder : IQueryDbResponder {
}; };
struct InProcessQueryDbResponder : IQueryDbResponder { struct InProcessQueryDbResponder : IQueryDbResponder {
IndexProcess_ResponseQueue* queue_; QueueManager::IndexProcess_ResponseQueue* queue_;
InProcessQueryDbResponder(IndexProcess_ResponseQueue* queue) InProcessQueryDbResponder(QueueManager::IndexProcess_ResponseQueue* queue)
: queue_(queue) {} : queue_(queue) {}
void Write(IndexProcess_Response response) override { void Write(IndexProcess_Response response) override {
@ -1329,14 +1334,14 @@ struct OutOfProcessIndexer : IIndexerProcess {
optional<Config> config_; optional<Config> config_;
std::string bin_name_; std::string bin_name_;
IndexProcess_ResponseQueue* response_queue_; QueueManager::IndexProcess_ResponseQueue* response_queue_;
const int kMaxIndexRequestsUntilRestart = 25; const int kMaxIndexRequestsUntilRestart = 25;
bool enable_auto_restart_ = false; bool enable_auto_restart_ = false;
int number_of_index_requests_since_last_restart_ = 0; int number_of_index_requests_since_last_restart_ = 0;
OutOfProcessIndexer(const std::string& bin_name, OutOfProcessIndexer(const std::string& bin_name,
IndexProcess_ResponseQueue* response_queue) QueueManager::IndexProcess_ResponseQueue* response_queue)
: bin_name_(bin_name), response_queue_(response_queue) { : bin_name_(bin_name), response_queue_(response_queue) {
CreateIndexProcess(); CreateIndexProcess();
} }
@ -1578,10 +1583,7 @@ bool QueryDbMainLoop(
QueryDatabase* db, QueryDatabase* db,
CacheManager* db_cache, CacheManager* db_cache,
MultiQueueWaiter* waiter, MultiQueueWaiter* waiter,
IndexProcess_ResponseQueue* queue_index_response, QueueManager* queue,
Index_DoIdMapQueue* queue_do_id_map,
Index_OnIdMappedQueue* queue_on_id_mapped,
Index_OnIndexedQueue* queue_on_indexed,
IIndexerProcess* indexer_process, IIndexerProcess* indexer_process,
Project* project, Project* project,
FileConsumer::SharedState* file_consumer_shared, FileConsumer::SharedState* file_consumer_shared,
@ -1656,7 +1658,7 @@ bool QueryDbMainLoop(
std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl; std::cerr << "[querydb] Starting " << config->indexerCount << " indexers" << std::endl;
for (int i = 0; i < config->indexerCount; ++i) { for (int i = 0; i < config->indexerCount; ++i) {
new std::thread([&]() { new std::thread([&]() {
IndexMain(config, file_consumer_shared, project, working_files, waiter, queue_index_response, queue_do_id_map, queue_on_id_mapped, queue_on_indexed); IndexMain(config, file_consumer_shared, project, working_files, waiter, queue);
}); });
} }
@ -2865,7 +2867,7 @@ bool QueryDbMainLoop(
while (true) { while (true) {
optional<Index_DoIdMap> request = queue_do_id_map->TryDequeue(); optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue();
if (!request) if (!request)
break; break;
@ -2881,11 +2883,11 @@ bool QueryDbMainLoop(
response.previous = db_cache->UpdateAndReturnOldFile(response.current); response.previous = db_cache->UpdateAndReturnOldFile(response.current);
response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset();
queue_on_id_mapped->Enqueue(std::move(response)); queue->on_id_mapped.Enqueue(std::move(response));
} }
while (true) { while (true) {
optional<Index_OnIndexed> response = queue_on_indexed->TryDequeue(); optional<Index_OnIndexed> response = queue->on_indexed.TryDequeue();
if (!response) if (!response)
break; break;
@ -2918,10 +2920,7 @@ bool QueryDbMainLoop(
void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) { void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* waiter) {
// Create queues. // Create queues.
IndexProcess_ResponseQueue queue_process_response(waiter); QueueManager queue(waiter);
Index_DoIdMapQueue queue_do_id_map(waiter);
Index_OnIdMappedQueue queue_on_id_mapped(waiter);
Index_OnIndexedQueue queue_on_indexed(waiter);
Project project; Project project;
WorkingFiles working_files; WorkingFiles working_files;
@ -2934,7 +2933,7 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
auto signature_cache = MakeUnique<CodeCompleteCache>(); auto signature_cache = MakeUnique<CodeCompleteCache>();
FileConsumer::SharedState file_consumer_shared; FileConsumer::SharedState file_consumer_shared;
InProcessQueryDbResponder responder(&queue_process_response); InProcessQueryDbResponder responder(&queue.process_response);
ThreadedQueue<IndexProcess_Request> queue_process_request; ThreadedQueue<IndexProcess_Request> queue_process_request;
auto indexer = MakeUnique<InProcessIndexer>(&responder, &queue_process_request); auto indexer = MakeUnique<InProcessIndexer>(&responder, &queue_process_request);
//auto indexer = MakeUnique<OutOfProcessIndexer>(bin_name, &queue_process_response); //auto indexer = MakeUnique<OutOfProcessIndexer>(bin_name, &queue_process_response);
@ -2945,7 +2944,7 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
CacheManager db_cache; CacheManager db_cache;
while (true) { while (true) {
bool did_work = QueryDbMainLoop( bool did_work = QueryDbMainLoop(
config, &db, &db_cache, waiter, &queue_process_response, &queue_do_id_map, &queue_on_id_mapped, &queue_on_indexed, config, &db, &db_cache, waiter, &queue,
indexer.get(), indexer.get(),
&project, &file_consumer_shared, &working_files, &project, &file_consumer_shared, &working_files,
&clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get()); &clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get());
@ -2953,8 +2952,8 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
IpcManager* ipc = IpcManager::instance(); IpcManager* ipc = IpcManager::instance();
waiter->Wait({ waiter->Wait({
ipc->threaded_queue_for_server_.get(), ipc->threaded_queue_for_server_.get(),
&queue_do_id_map, &queue.do_id_map,
&queue_on_indexed &queue.on_indexed
}); });
} }
} }