Remove out of process indexer support.

malloc_trim does a good enough job that we don't need it anymore.
This commit is contained in:
Jacob Dufault 2017-08-14 19:07:46 -07:00
parent ee003a2cf0
commit db9a97d586

View File

@ -601,89 +601,24 @@ struct Index_OnIndexed {
// IndexRequest is all messages that can be sent from the querydb process to
// the indexer process.
struct IndexProcess_Request {
enum class Type {
kInvalid = 0,
kInitialize = 1,
kQuit = 2,
kIndex = 3
};
struct IndexRequest {
std::string path;
std::vector<std::string> args; // TODO: make this a string that is parsed lazily.
Type type = Type::kInvalid;
optional<Config> initialize_args;
struct IndexArgs {
std::string path;
std::vector<std::string> args; // TODO: make this a string that is parsed lazily.
};
optional<IndexArgs> index_args;
static IndexProcess_Request CreateInitialize(const Config& config) {
IndexProcess_Request m;
m.type = Type::kInitialize;
m.initialize_args = config;
return m;
}
static IndexProcess_Request CreateQuit() {
IndexProcess_Request m;
m.type = Type::kQuit;
return m;
}
static IndexProcess_Request CreateIndex(const std::string& path, const std::vector<std::string>& args) {
IndexProcess_Request m;
m.type = Type::kIndex;
m.index_args = IndexArgs();
m.index_args->path = path;
m.index_args->args = args;
return m;
}
IndexRequest(const std::string& path, const std::vector<std::string>& args) : path(path), args(args) {}
};
MAKE_REFLECT_TYPE_PROXY(IndexProcess_Request::Type, int);
MAKE_REFLECT_STRUCT(IndexProcess_Request::IndexArgs, path, args);
MAKE_REFLECT_STRUCT(IndexProcess_Request, type, initialize_args, index_args);
MAKE_REFLECT_STRUCT(IndexRequest, path, args);
using Index_IndexProcess_Request_IndexQueue = ThreadedQueue<IndexProcess_Request::IndexArgs>;
using Index_IndexProcess_Request_IndexQueue = ThreadedQueue<IndexRequest>;
// IndexResponse is all messages that can be sent from the indexer process to
// the querydb process.
struct IndexProcess_Response {
enum class Type {
kInvalid = 0,
kShutdown = 1,
kIndexResult = 2
};
Type type;
struct IndexResult {
std::string file_path;
PerformanceImportFile perf;
struct IndexResultArgs {
std::string file_path;
PerformanceImportFile perf;
};
optional<IndexResultArgs> index_result_args;
static IndexProcess_Response CreateShutdown() {
IndexProcess_Response response;
response.type = Type::kShutdown;
return response;
}
static IndexProcess_Response CreateIndexResult(const std::string& file_path, const PerformanceImportFile& perf) {
IndexProcess_Response response;
response.type = Type::kIndexResult;
response.index_result_args = IndexResultArgs();
response.index_result_args->file_path = file_path;
response.index_result_args->perf = perf;
return response;
}
IndexResult(const std::string& file_path, const PerformanceImportFile& perf) : file_path(file_path), perf(perf) {}
};
MAKE_REFLECT_TYPE_PROXY(IndexProcess_Response::Type, int);
MAKE_REFLECT_STRUCT(IndexProcess_Response::IndexResultArgs, file_path, perf);
MAKE_REFLECT_STRUCT(IndexProcess_Response, type, index_result_args);
MAKE_REFLECT_STRUCT(IndexResult, file_path, perf);
@ -719,7 +654,7 @@ MAKE_REFLECT_STRUCT(IndexProcess_Response, type, index_result_args);
struct QueueManager {
using IndexProcess_ResponseQueue = ThreadedQueue<IndexProcess_Response::IndexResultArgs>;
using IndexProcess_ResponseQueue = ThreadedQueue<IndexResult>;
using Index_DoIdMapQueue = ThreadedQueue<Index_DoIdMap>;
using Index_OnIdMappedQueue = ThreadedQueue<Index_OnIdMapped>;
using Index_OnIndexedQueue = ThreadedQueue<Index_OnIndexed>;
@ -883,8 +818,7 @@ bool IndexMain_DoIndex(Config* config,
WorkingFiles* working_files,
clang::Index* index,
QueueManager* queue) {
optional<IndexProcess_Response::IndexResultArgs> request =
queue->process_response.TryDequeue();
optional<IndexResult> request = queue->process_response.TryDequeue();
if (!request)
return false;
@ -1022,18 +956,7 @@ void IndexMain(Config* config,
struct IQueryDbResponder {
virtual void Write(IndexProcess_Response response) = 0;
};
struct OutOfProcessQueryDbResponder : IQueryDbResponder {
void Write(IndexProcess_Response response) override {
rapidjson::StringBuffer output;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
Reflect(writer, response);
std::cerr << "!! Wrote to querydb " + std::string(output.GetString()) + "\n";
std::cout << output.GetSize() << "\n\n" << output.GetString();
}
virtual void Write(IndexResult result) = 0;
};
struct InProcessQueryDbResponder : IQueryDbResponder {
@ -1042,76 +965,19 @@ struct InProcessQueryDbResponder : IQueryDbResponder {
InProcessQueryDbResponder(QueueManager::IndexProcess_ResponseQueue* queue)
: queue_(queue) {}
void Write(IndexProcess_Response response) override {
if (response.index_result_args)
queue_->Enqueue(std::move(*response.index_result_args));
void Write(IndexResult result) override {
queue_->Enqueue(std::move(result));
}
};
optional<std::string> ReadContentFromSource(std::function<optional<char>()> read) {
// Read the content length. It is terminated by two \n\n characters.
std::string stringified_content_length;
char last = 0;
while (true) {
optional<char> opt_c = read();
if (!opt_c)
return nullopt;
char c = *opt_c;
if (last == '\n' && c == '\n')
break;
last = c;
stringified_content_length += c;
}
int content_length = atoi(stringified_content_length.c_str());
// Read content.
std::string content;
content.reserve(content_length);
for (size_t i = 0; i < content_length; ++i) {
char c;
std::cin.read(&c, 1);
content += c;
}
return content;
}
void PumpIndexThreadStdioReaderMain(ThreadedQueue<IndexProcess_Request>* messages) {
// Read content.
optional<std::string> content = ReadContentFromSource([]() {
// Bad stdin means parent process has probably exited. Either way, indexer
// process can no longer be communicated with so exit.
if (!std::cin.good())
exit(0);
char c = 0;
std::cin.read(&c, 1);
return c;
});
assert(content);
// Parse content.
rapidjson::Document document;
document.Parse(content->c_str(), content->size());
assert(!document.HasParseError());
// Deserialize content.
IndexProcess_Request message;
Reflect(document, message);
assert(message.type != IndexProcess_Request::Type::kInvalid);
// Push message to queue.
messages->Enqueue(std::move(message));
}
std::vector<IndexProcess_Response> DoParseFile(
std::vector<IndexResult> DoParseFile(
Config* config,
clang::Index* index,
FileConsumer::SharedState* file_consumer_shared,
CacheLoader* cache_loader,
const std::string& path,
const std::vector<std::string>& args) {
std::vector<IndexProcess_Response> result;
std::vector<IndexResult> result;
IndexFile* previous_index = cache_loader->TryLoad(path);
if (previous_index) {
@ -1150,10 +1016,10 @@ std::vector<IndexProcess_Response> DoParseFile(
// TODO/FIXME: real perf
PerformanceImportFile perf;
result.push_back(IndexProcess_Response::CreateIndexResult(path, perf));
result.push_back(IndexResult(path, perf));
for (const std::string& dependency : previous_index->dependencies) {
LOG_S(INFO) << "Emitting index result for " << dependency;
result.push_back(IndexProcess_Response::CreateIndexResult(dependency, perf));
result.push_back(IndexResult(dependency, perf));
}
return result;
}
@ -1209,13 +1075,18 @@ std::vector<IndexProcess_Response> DoParseFile(
perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset();
LOG_S(INFO) << "Emitting index result for " << new_index->path;
result.push_back(IndexProcess_Response::CreateIndexResult(new_index->path, perf));
result.push_back(IndexResult(new_index->path, perf));
}
return result;
}
std::vector<IndexProcess_Response> ParseFile(
// TODO: import to CACHE_DIR/staging/foo.cc
// TODO: split index files into foo.cc.json, foo.cc.timestamp, foo.cc
std::vector<IndexResult> ParseFile(
Config* config,
clang::Index* index,
FileConsumer::SharedState* file_consumer_shared,
@ -1235,7 +1106,7 @@ std::vector<IndexProcess_Response> ParseFile(
void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexProcess_Request_IndexQueue* queue, std::atomic<int>* busy, FileConsumer::SharedState* file_consumer_shared) {
while (true) {
IndexProcess_Request::IndexArgs request = queue->DequeuePlusAction([&]() {
IndexRequest request = queue->DequeuePlusAction([&]() {
++(*busy);
});
@ -1243,7 +1114,7 @@ void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexPr
Project::Entry entry;
entry.filename = request.path;
entry.args = request.args;
std::vector<IndexProcess_Response> responses = ParseFile(config, &index, file_consumer_shared, entry);
std::vector<IndexResult> responses = ParseFile(config, &index, file_consumer_shared, entry);
for (const auto& response : responses)
responder->Write(response);
@ -1254,14 +1125,12 @@ void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexPr
struct IIndexerProcess {
virtual void Restart() = 0;
virtual void EnableAutoRestart() = 0;
virtual void SetConfig(const Config& config) = 0;
virtual void SendMessage(IndexProcess_Request message) = 0;
virtual void SendMessage(IndexRequest message) = 0;
};
struct InProcessIndexer : IIndexerProcess {
ThreadedQueue<IndexProcess_Request>* messages_;
ThreadedQueue<IndexRequest>* messages_;
Index_IndexProcess_Request_IndexQueue queue_;
std::vector<std::thread> indexer_threads_;
std::atomic<int> num_busy_indexers_;
@ -1275,39 +1144,22 @@ struct InProcessIndexer : IIndexerProcess {
// interested in indexing.
FileConsumer::SharedState file_consumer_shared_;
explicit InProcessIndexer(IQueryDbResponder* responder, ThreadedQueue<IndexProcess_Request>* messages)
explicit InProcessIndexer(IQueryDbResponder* responder, ThreadedQueue<IndexRequest>* messages)
: messages_(messages), num_busy_indexers_(0), responder_(responder) {}
void Restart() override {} // no-op
void EnableAutoRestart() override {} // no-op
void SetConfig(const Config& config) override {
SendMessage(IndexProcess_Request::CreateInitialize(config));
config_ = config;
for (int i = 0; i < config_.indexerCount; ++i) {
indexer_threads_.push_back(std::thread([&, i]() {
SetCurrentThreadName("indexer" + std::to_string(i));
IndexThreadMain(&config_, responder_, &queue_, &num_busy_indexers_, &file_consumer_shared_);
}));
}
}
void SendMessage(IndexProcess_Request message) override {
switch (message.type) {
case IndexProcess_Request::Type::kInitialize: {
config_ = *message.initialize_args;
for (int i = 0; i < config_.indexerCount; ++i) {
indexer_threads_.push_back(std::thread([&, i]() {
SetCurrentThreadName("indexer" + std::to_string(i));
IndexThreadMain(&config_, responder_, &queue_, &num_busy_indexers_, &file_consumer_shared_);
}));
}
break;
}
case IndexProcess_Request::Type::kIndex: {
// Dispatch the request so one of the indexers will pick it up.
queue_.Enqueue(std::move(*message.index_args));
break;
}
case IndexProcess_Request::Type::kInvalid:
case IndexProcess_Request::Type::kQuit: {
LOG_S(ERROR) << "Unhandled IndexProcess_Request::Type " << static_cast<int>(message.type);
break;
}
}
void SendMessage(IndexRequest message) override {
// Dispatch the request so one of the indexers will pick it up.
queue_.Enqueue(std::move(message));
}
bool TryWaitUntilIdle() {
@ -1323,194 +1175,6 @@ struct InProcessIndexer : IIndexerProcess {
}
};
struct OutOfProcessIndexer : IIndexerProcess {
struct ProcessState {
std::unique_ptr<TinyProcessLib::Process> process;
std::string unhandled_output;
};
std::recursive_mutex processes_mutex_;
std::unordered_map<int, ProcessState> processes_;
int next_process_id_ = 0;
optional<Config> config_;
std::string bin_name_;
QueueManager::IndexProcess_ResponseQueue* response_queue_;
const int kMaxIndexRequestsUntilRestart = 25;
bool enable_auto_restart_ = false;
int number_of_index_requests_since_last_restart_ = 0;
OutOfProcessIndexer(const std::string& bin_name,
QueueManager::IndexProcess_ResponseQueue* response_queue)
: bin_name_(bin_name), response_queue_(response_queue) {
CreateIndexProcess();
}
void Restart() override {
SendMessage(IndexProcess_Request::CreateQuit());
CreateIndexProcess();
number_of_index_requests_since_last_restart_ = 0;
}
void EnableAutoRestart() override {
enable_auto_restart_ = true;
}
void SetConfig(const Config& config) override {
assert(!config_);
config_ = config;
SendMessage(IndexProcess_Request::CreateInitialize(*config_));
}
void SendMessage(IndexProcess_Request message) override {
if (message.type == IndexProcess_Request::Type::kIndex &&
number_of_index_requests_since_last_restart_++ > kMaxIndexRequestsUntilRestart) {
Restart();
}
std::string content;
rapidjson::StringBuffer output;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
Reflect(writer, message);
std::cerr << "!!! WRITING TO INDEXER: " << output.GetString() << std::endl;
//std::lock_guard<std::recursive_mutex> processes_lock(processes_mutex_);
processes_[next_process_id_ - 1].process->write(std::to_string(output.GetSize()) + "\n\n");
processes_[next_process_id_ - 1].process->write(output.GetString(), output.GetSize());
}
void CreateIndexProcess() {
int process_id = next_process_id_++;
ProcessState state;
state.process = MakeUnique<TinyProcessLib::Process>(
bin_name_ + std::string(" --indexer"), ".",
[this, process_id](const char* bytes, size_t n) { OnStdOut(process_id, bytes, n); },
[this](const char* bytes, size_t n) { OnStdErr(bytes, n); },
true /*open_stdin*/);
//std::lock_guard<std::recursive_mutex> processes_lock(processes_mutex_);
processes_[process_id] = std::move(state);
if (config_)
SendMessage(IndexProcess_Request::CreateInitialize(*config_));
}
void ParseOutput(int process_id) {
//std::lock_guard<std::recursive_mutex> processes_lock(processes_mutex_);
while (true) {
size_t next_idx = 0;
optional<std::string> content = ReadContentFromSource([&]() -> optional<char> {
if (next_idx >= processes_[process_id].unhandled_output.size())
return nullopt;
return processes_[process_id].unhandled_output[next_idx++];
});
std::cerr << "!! querydb failed to read input; next_idx=" << next_idx << std::endl;
if (!content)
return;
processes_[process_id].unhandled_output = processes_[process_id].unhandled_output.substr(next_idx);
std::cerr << "!! querydb got input " << *content << " from process " << process_id << std::endl;
// Parse content.
rapidjson::Document document;
document.Parse(content->c_str(), content->size());
assert(!document.HasParseError());
// Deserialize content.
IndexProcess_Response message;
Reflect(document, message);
// Cleanup state from our side if the process exits.
if (message.type == IndexProcess_Response::Type::kShutdown) {
std::cerr << "!!! Got process shutdown message !!!\n";
// Delete the process on a separate thread, since a thread cannot destroy itself.
//std::async([&, process_id]() {
// std::lock_guard<std::recursive_mutex> processes_lock(processes_mutex_);
// processes_.erase(processes_.find(process_id));
//});
return;
}
// Push message to queue.
response_queue_->Enqueue(std::move(*message.index_result_args));
}
}
void OnStdOut(int process_id, const char* bytes, size_t n) {
std::string content;
for (size_t i = 0; i < n; ++i)
content += bytes[i];
std::cerr << "!!! ON STDOUT for process " + std::to_string(process_id) + " with content " + content + "\n";
{
//std::lock_guard<std::recursive_mutex> processes_lock(processes_mutex_);
for (size_t i = 0; i < n; ++i)
processes_[process_id].unhandled_output += bytes[i];
}
std::cerr << "!&&! Begin ParseOutput\n";
ParseOutput(process_id);
std::cerr << "!&&! End ParseOutput\n";
}
void OnStdErr(const char* bytes, size_t n) {
std::string content = "OOP [indexer]: ";
for (size_t i = 0; i < n; ++i)
content += bytes[i];
std::cerr << content;
}
};
// Main function for the out-of-process indexer.
void IndexProcessMain() {
// TODO
// querydb process is responsible for owning the buffer.
//constexpr const char* kIpcBufferName = "CqueryIpc";
//constexpr size_t kIpcBufferSize = 1024 * 8;
//MessageQueue queue(Buffer::CreateSharedBuffer(kIpcBufferName, kIpcBufferSize), true /*buffer_has_data*/);
std::cerr << "Indexer process starting\n";
OutOfProcessQueryDbResponder responder;
ThreadedQueue<IndexProcess_Request> messages;
InProcessIndexer indexer(&responder, &messages);
std::thread stdin_reader([&]() {
SetCurrentThreadName("IndexStdinReader");
while (true)
PumpIndexThreadStdioReaderMain(&messages);
});
SetCurrentThreadName("IndexMain");
while (true) {
IndexProcess_Request message = messages.Dequeue();
std::cerr << "!! Got message.type=" + std::to_string((int)message.type) + "\n";
if (message.type == IndexProcess_Request::Type::kInitialize)
indexer.SetConfig(*message.initialize_args);
if (message.type == IndexProcess_Request::Type::kQuit) {
if (!indexer.TryWaitUntilIdle()) {
// Process other messages.
assert(!messages.IsEmpty());
messages.Enqueue(std::move(message));
continue;
}
responder.Write(IndexProcess_Response::CreateShutdown());
assert(messages.IsEmpty());
exit(0);
}
indexer.SendMessage(message);
}
}
@ -1680,10 +1344,8 @@ bool QueryDbMainLoop(
//std::cerr << "[" << i << "/" << (project->entries.size() - 1)
// << "] Dispatching index request for file " << entry.filename
// << std::endl;
indexer_process->SendMessage(IndexProcess_Request::CreateIndex(entry.filename, entry.args));
indexer_process->SendMessage(IndexRequest(entry.filename, entry.args));
});
indexer_process->Restart();
indexer_process->EnableAutoRestart();
// We need to support multiple concurrent index processes.
time.ResetAndPrint("[perf] Dispatched initial index requests");
@ -1745,7 +1407,7 @@ bool QueryDbMainLoop(
project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) {
LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1)
<< "] Dispatching index request for file " << entry.filename;
indexer_process->SendMessage(IndexProcess_Request::CreateIndex(entry.filename, entry.args));
indexer_process->SendMessage(IndexRequest(entry.filename, entry.args));
});
break;
}
@ -2003,7 +1665,7 @@ bool QueryDbMainLoop(
// if so, ignore that index response.
// TODO: send as priority request
Project::Entry entry = project->FindCompilationEntryForFile(path);
indexer_process->SendMessage(IndexProcess_Request::CreateIndex(entry.filename, entry.args));
indexer_process->SendMessage(IndexRequest(entry.filename, entry.args));
clang_complete->NotifySave(path);
@ -2935,7 +2597,7 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter*
FileConsumer::SharedState file_consumer_shared;
InProcessQueryDbResponder responder(&queue.process_response);
ThreadedQueue<IndexProcess_Request> queue_process_request;
ThreadedQueue<IndexRequest> queue_process_request;
auto indexer = MakeUnique<InProcessIndexer>(&responder, &queue_process_request);
//auto indexer = MakeUnique<OutOfProcessIndexer>(bin_name, &queue_process_response);
@ -3289,10 +2951,6 @@ int main(int argc, char** argv) {
std::cin.get();
return 0;
}
else if (HasOption(options, "--indexer")) {
IndexProcessMain();
return 0;
}
else if (HasOption(options, "--language-server")) {
//std::cerr << "Running language server" << std::endl;
Config config;