Remove some additional overhead created for out of process

This commit is contained in:
Jacob Dufault 2017-08-14 22:53:44 -07:00
parent db9a97d586
commit 5a451d3ad1

View File

@ -609,8 +609,6 @@ struct IndexRequest {
};
MAKE_REFLECT_STRUCT(IndexRequest, path, args);
using Index_IndexProcess_Request_IndexQueue = ThreadedQueue<IndexRequest>;
struct IndexResult {
std::string file_path;
@ -654,17 +652,19 @@ MAKE_REFLECT_STRUCT(IndexResult, file_path, perf);
struct QueueManager {
using IndexProcess_RequestQueue = ThreadedQueue<IndexRequest>;
using IndexProcess_ResponseQueue = ThreadedQueue<IndexResult>;
using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>;
using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>;
IndexProcess_ResponseQueue process_response;
IndexProcess_RequestQueue index_request;
IndexProcess_ResponseQueue index_response;
Index_DoIdMapQueue do_id_map;
Index_OnIdMappedQueue on_id_mapped;
Index_OnIndexedQueue on_indexed;
QueueManager(MultiQueueWaiter* waiter) : process_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {}
QueueManager(MultiQueueWaiter* waiter) : index_request(waiter), index_response(waiter), do_id_map(waiter), on_id_mapped(waiter), on_indexed(waiter) {}
};
void RegisterMessageTypes() {
@ -812,171 +812,15 @@ struct IndexManager {
//} // namespace
bool IndexMain_DoIndex(Config* config,
FileConsumer::SharedState* file_consumer_shared,
Project* project,
WorkingFiles* working_files,
clang::Index* index,
QueueManager* queue) {
optional<IndexResult> request = queue->process_response.TryDequeue();
if (!request)
return false;
std::unique_ptr<IndexFile> current_index =
LoadCachedIndex(config, request->file_path);
if (!current_index) {
std::cerr << "!!! Failed to load index for " + request->file_path + "\n";
return false;
}
assert(current_index);
// TODO: get real value for is_interactive
Index_DoIdMap response(std::move(current_index), request->perf,
false /*is_interactive*/);
queue->do_id_map.Enqueue(std::move(response));
return true;
}
bool IndexMain_DoCreateIndexUpdate(
QueueManager* queue) {
// TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed.
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue();
if (!response)
return false;
Timer time;
IdMap* previous_id_map = nullptr;
IndexFile* previous_index = nullptr;
if (response->previous) {
LOG_S(INFO) << "Creating delta update for " << response->previous->file->path;
previous_id_map = response->previous->ids.get();
previous_index = response->previous->file.get();
}
IndexUpdate update = IndexUpdate::CreateDelta(
previous_id_map, response->current->ids.get(),
previous_index, response->current->file.get());
response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset();
#if false
#define PRINT_SECTION(name) \
if (response->perf.name) {\
total += response->perf.name; \
output << " " << #name << ": " << FormatMicroseconds(response->perf.name); \
}
std::stringstream output;
long long total = 0;
output << "[perf]";
PRINT_SECTION(index_parse);
PRINT_SECTION(index_build);
PRINT_SECTION(index_save_to_disk);
PRINT_SECTION(index_load_cached);
PRINT_SECTION(querydb_id_map);
PRINT_SECTION(index_make_delta);
output << "\n total: " << FormatMicroseconds(total);
output << " path: " << response->current_index->path;
output << std::endl;
std::cerr << output.rdbuf();
#undef PRINT_SECTION
if (response->is_interactive)
std::cerr << "Applying IndexUpdate" << std::endl << update.ToString() << std::endl;
#endif
Index_OnIndexed reply(update, response->perf);
queue->on_indexed.Enqueue(std::move(reply));
return true;
}
bool IndexMergeIndexUpdates(QueueManager* queue) {
// TODO/FIXME: it looks like there is a crash here?
optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue();
if (!root)
return false;
bool did_merge = false;
while (true) {
optional<Index_OnIndexed> to_join = queue->on_indexed.TryDequeue();
if (!to_join) {
queue->on_indexed.Enqueue(std::move(*root));
return did_merge;
}
did_merge = true;
//Timer time;
root->update.Merge(to_join->update);
//time.ResetAndPrint("[indexer] Joining two querydb updates");
}
}
void IndexMain(Config* config,
FileConsumer::SharedState* file_consumer_shared,
Project* project,
WorkingFiles* working_files,
MultiQueueWaiter* waiter,
QueueManager* queue) {
SetCurrentThreadName("indexer");
// TODO: dispose of index after it is not used for a while.
clang::Index index(1, 0);
while (true) {
// TODO: process all off IndexMain_DoIndex before calling
// IndexMain_DoCreateIndexUpdate for
// better icache behavior. We need to have some threads spinning on
// both though
// otherwise memory usage will get bad.
// We need to make sure to run both IndexMain_DoIndex and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
bool did_index =
IndexMain_DoIndex(config, file_consumer_shared, project, working_files,
&index, queue);
bool did_create_update =
IndexMain_DoCreateIndexUpdate(queue);
bool did_merge = false;
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
if (!did_index && !did_create_update)
did_merge = IndexMergeIndexUpdates(queue);
// We didn't do any work, so wait for a notification.
if (!did_index && !did_create_update && !did_merge)
waiter->Wait(
{&queue->process_response, &queue->on_id_mapped, &queue->on_indexed});
}
}
struct IQueryDbResponder {
virtual void Write(IndexResult result) = 0;
};
struct InProcessQueryDbResponder : IQueryDbResponder {
QueueManager::IndexProcess_ResponseQueue* queue_;
InProcessQueryDbResponder(QueueManager::IndexProcess_ResponseQueue* queue)
: queue_(queue) {}
void Write(IndexResult result) override {
queue_->Enqueue(std::move(result));
}
};
std::vector<IndexResult> DoParseFile(
Config* config,
clang::Index* index,
FileConsumer::SharedState* file_consumer_shared,
CacheLoader* cache_loader,
const std::string& path,
const std::vector<std::string>& args) {
Config* config,
clang::Index* index,
FileConsumer::SharedState* file_consumer_shared,
CacheLoader* cache_loader,
const std::string& path,
const std::vector<std::string>& args) {
LOG_S(INFO) << "Parsing " << path;
std::vector<IndexResult> result;
IndexFile* previous_index = cache_loader->TryLoad(path);
@ -1102,78 +946,175 @@ std::vector<IndexResult> ParseFile(
return DoParseFile(config, index, file_consumer_shared, &cache_loader, tu_path, entry.args);
}
bool IndexMain_DoParse(
Config* config,
QueueManager* queues,
FileConsumer::SharedState* file_consumer_shared,
clang::Index* index) {
IndexRequest request = queues->index_request.Dequeue();
void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexProcess_Request_IndexQueue* queue, std::atomic<int>* busy, FileConsumer::SharedState* file_consumer_shared) {
Project::Entry entry;
entry.filename = request.path;
entry.args = request.args;
std::vector<IndexResult> responses = ParseFile(config, index, file_consumer_shared, entry);
for (auto response : responses)
queues->index_response.Enqueue(std::move(response));
return !responses.empty();
}
bool IndexMain_DoIndex(Config* config,
FileConsumer::SharedState* file_consumer_shared,
Project* project,
WorkingFiles* working_files,
clang::Index* index,
QueueManager* queue) {
optional<IndexResult> request = queue->index_response.TryDequeue();
if (!request)
return false;
std::unique_ptr<IndexFile> current_index =
LoadCachedIndex(config, request->file_path);
if (!current_index) {
std::cerr << "!!! Failed to load index for " + request->file_path + "\n";
return false;
}
assert(current_index);
// TODO: get real value for is_interactive
Index_DoIdMap response(std::move(current_index), request->perf,
false /*is_interactive*/);
queue->do_id_map.Enqueue(std::move(response));
return true;
}
bool IndexMain_DoCreateIndexUpdate(
QueueManager* queue) {
// TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed.
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue();
if (!response)
return false;
Timer time;
IdMap* previous_id_map = nullptr;
IndexFile* previous_index = nullptr;
if (response->previous) {
LOG_S(INFO) << "Creating delta update for " << response->previous->file->path;
previous_id_map = response->previous->ids.get();
previous_index = response->previous->file.get();
}
IndexUpdate update = IndexUpdate::CreateDelta(
previous_id_map, response->current->ids.get(),
previous_index, response->current->file.get());
response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset();
#if false
#define PRINT_SECTION(name) \
if (response->perf.name) {\
total += response->perf.name; \
output << " " << #name << ": " << FormatMicroseconds(response->perf.name); \
}
std::stringstream output;
long long total = 0;
output << "[perf]";
PRINT_SECTION(index_parse);
PRINT_SECTION(index_build);
PRINT_SECTION(index_save_to_disk);
PRINT_SECTION(index_load_cached);
PRINT_SECTION(querydb_id_map);
PRINT_SECTION(index_make_delta);
output << "\n total: " << FormatMicroseconds(total);
output << " path: " << response->current_index->path;
output << std::endl;
std::cerr << output.rdbuf();
#undef PRINT_SECTION
if (response->is_interactive)
std::cerr << "Applying IndexUpdate" << std::endl << update.ToString() << std::endl;
#endif
Index_OnIndexed reply(update, response->perf);
queue->on_indexed.Enqueue(std::move(reply));
return true;
}
bool IndexMergeIndexUpdates(QueueManager* queue) {
// TODO/FIXME: it looks like there is a crash here?
optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue();
if (!root)
return false;
bool did_merge = false;
while (true) {
IndexRequest request = queue->DequeuePlusAction([&]() {
++(*busy);
});
optional<Index_OnIndexed> to_join = queue->on_indexed.TryDequeue();
if (!to_join) {
queue->on_indexed.Enqueue(std::move(*root));
return did_merge;
}
clang::Index index(0, 0);
Project::Entry entry;
entry.filename = request.path;
entry.args = request.args;
std::vector<IndexResult> responses = ParseFile(config, &index, file_consumer_shared, entry);
did_merge = true;
//Timer time;
root->update.Merge(to_join->update);
//time.ResetAndPrint("[indexer] Joining two querydb updates");
}
}
for (const auto& response : responses)
responder->Write(response);
void IndexMain(Config* config,
FileConsumer::SharedState* file_consumer_shared,
Project* project,
WorkingFiles* working_files,
MultiQueueWaiter* waiter,
QueueManager* queue) {
SetCurrentThreadName("indexer");
// TODO: dispose of index after it is not used for a while.
clang::Index index(1, 0);
--(*busy);
while (true) {
// TODO: process all off IndexMain_DoIndex before calling
// IndexMain_DoCreateIndexUpdate for
// better icache behavior. We need to have some threads spinning on
// both though
// otherwise memory usage will get bad.
bool did_parse = IndexMain_DoParse(config, queue, file_consumer_shared, &index);
// We need to make sure to run both IndexMain_DoIndex and
// IndexMain_DoCreateIndexUpdate so we don't starve querydb from doing any
// work. Running both also lets the user query the partially constructed
// index.
bool did_index =
IndexMain_DoIndex(config, file_consumer_shared, project, working_files,
&index, queue);
bool did_create_update =
IndexMain_DoCreateIndexUpdate(queue);
bool did_merge = false;
// Nothing to index and no index updates to create, so join some already
// created index updates to reduce work on querydb thread.
if (!did_parse && !did_index && !did_create_update)
did_merge = IndexMergeIndexUpdates(queue);
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_index && !did_create_update && !did_merge)
waiter->Wait(
{&queue->index_request, &queue->index_response, &queue->on_id_mapped, &queue->on_indexed});
}
}
struct IIndexerProcess {
virtual void SetConfig(const Config& config) = 0;
virtual void SendMessage(IndexRequest message) = 0;
};
struct InProcessIndexer : IIndexerProcess {
ThreadedQueue<IndexRequest>* messages_;
Index_IndexProcess_Request_IndexQueue queue_;
std::vector<std::thread> indexer_threads_;
std::atomic<int> num_busy_indexers_;
Config config_;
IQueryDbResponder* responder_;
// TODO: Remove FileConsumer::SharedState support from indexer. Indexer
// always generates every index since it doesn't take a huge amount of time.
// Then the querydb process decides if the new index is worth importing. At
// some point we can "blacklist" certain files we are definately not
// interested in indexing.
FileConsumer::SharedState file_consumer_shared_;
explicit InProcessIndexer(IQueryDbResponder* responder, ThreadedQueue<IndexRequest>* messages)
: messages_(messages), num_busy_indexers_(0), responder_(responder) {}
void SetConfig(const Config& config) override {
config_ = config;
for (int i = 0; i < config_.indexerCount; ++i) {
indexer_threads_.push_back(std::thread([&, i]() {
SetCurrentThreadName("indexer" + std::to_string(i));
IndexThreadMain(&config_, responder_, &queue_, &num_busy_indexers_, &file_consumer_shared_);
}));
}
}
void SendMessage(IndexRequest message) override {
// Dispatch the request so one of the indexers will pick it up.
queue_.Enqueue(std::move(message));
}
bool TryWaitUntilIdle() {
// Wait until all indexers are done running and we have finished all of our work.
while (num_busy_indexers_ != 0 || !queue_.IsEmpty() || !messages_->IsEmpty()) {
// There are other messages that need to be processed; start the loop over.
if (!messages_->IsEmpty())
return false;
std::cerr << "!! Trying to exit indexer; there are still " + std::to_string(num_busy_indexers_) + " indexers running\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return true;
}
};
@ -1249,7 +1190,6 @@ bool QueryDbMainLoop(
CacheManager* db_cache,
MultiQueueWaiter* waiter,
QueueManager* queue,
IIndexerProcess* indexer_process,
Project* project,
FileConsumer::SharedState* file_consumer_shared,
WorkingFiles* working_files,
@ -1327,9 +1267,6 @@ bool QueryDbMainLoop(
});
}
// Send config to indexer process.
indexer_process->SetConfig(*config);
Timer time;
// Open up / load the project.
@ -1344,7 +1281,7 @@ bool QueryDbMainLoop(
//std::cerr << "[" << i << "/" << (project->entries.size() - 1)
// << "] Dispatching index request for file " << entry.filename
// << std::endl;
indexer_process->SendMessage(IndexRequest(entry.filename, entry.args));
queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args));
});
// We need to support multiple concurrent index processes.
@ -1407,7 +1344,7 @@ bool QueryDbMainLoop(
project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) {
LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1)
<< "] Dispatching index request for file " << entry.filename;
indexer_process->SendMessage(IndexRequest(entry.filename, entry.args));
queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args));
});
break;
}
@ -1665,7 +1602,7 @@ bool QueryDbMainLoop(
// if so, ignore that index response.
// TODO: send as priority request
Project::Entry entry = project->FindCompilationEntryForFile(path);
indexer_process->SendMessage(IndexRequest(entry.filename, entry.args));
queue->index_request.Enqueue(IndexRequest(entry.filename, entry.args));
clang_complete->NotifySave(path);
@ -2596,11 +2533,6 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
auto signature_cache = MakeUnique<CodeCompleteCache>();
FileConsumer::SharedState file_consumer_shared;
InProcessQueryDbResponder responder(&queue.process_response);
ThreadedQueue<IndexRequest> queue_process_request;
auto indexer = MakeUnique<InProcessIndexer>(&responder, &queue_process_request);
//auto indexer = MakeUnique<OutOfProcessIndexer>(bin_name, &queue_process_response);
// Run query db main loop.
SetCurrentThreadName("querydb");
QueryDatabase db;
@ -2608,7 +2540,6 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
while (true) {
bool did_work = QueryDbMainLoop(
config, &db, &db_cache, waiter, &queue,
indexer.get(),
&project, &file_consumer_shared, &working_files,
&clang_complete, &include_complete, global_code_complete_cache.get(), non_global_code_complete_cache.get(), signature_cache.get());
if (!did_work) {