From db9a97d5862e60f4255f4598e1b73a1eed524453 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Mon, 14 Aug 2017 19:07:46 -0700 Subject: [PATCH] Remove out of process indexer support. malloc_trim does a good enough job that we don't need it anymore. --- src/command_line.cc | 434 +++++--------------------------------------- 1 file changed, 46 insertions(+), 388 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 6d2b66ad..772a4198 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -601,89 +601,24 @@ struct Index_OnIndexed { -// IndexRequest is all messages that can be sent from the querydb process to -// the indexer process. -struct IndexProcess_Request { - enum class Type { - kInvalid = 0, - kInitialize = 1, - kQuit = 2, - kIndex = 3 - }; +struct IndexRequest { + std::string path; + std::vector args; // TODO: make this a string that is parsed lazily. - Type type = Type::kInvalid; - - optional initialize_args; - - struct IndexArgs { - std::string path; - std::vector args; // TODO: make this a string that is parsed lazily. - }; - optional index_args; - - static IndexProcess_Request CreateInitialize(const Config& config) { - IndexProcess_Request m; - m.type = Type::kInitialize; - m.initialize_args = config; - return m; - } - - static IndexProcess_Request CreateQuit() { - IndexProcess_Request m; - m.type = Type::kQuit; - return m; - } - - static IndexProcess_Request CreateIndex(const std::string& path, const std::vector& args) { - IndexProcess_Request m; - m.type = Type::kIndex; - m.index_args = IndexArgs(); - m.index_args->path = path; - m.index_args->args = args; - return m; - } + IndexRequest(const std::string& path, const std::vector& args) : path(path), args(args) {} }; -MAKE_REFLECT_TYPE_PROXY(IndexProcess_Request::Type, int); -MAKE_REFLECT_STRUCT(IndexProcess_Request::IndexArgs, path, args); -MAKE_REFLECT_STRUCT(IndexProcess_Request, type, initialize_args, index_args); +MAKE_REFLECT_STRUCT(IndexRequest, path, args); -using Index_IndexProcess_Request_IndexQueue = ThreadedQueue; +using Index_IndexProcess_Request_IndexQueue = ThreadedQueue; -// IndexResponse is all messages that can be sent from the indexer process to -// the querydb process. -struct IndexProcess_Response { - enum class Type { - kInvalid = 0, - kShutdown = 1, - kIndexResult = 2 - }; - Type type; +struct IndexResult { + std::string file_path; + PerformanceImportFile perf; - struct IndexResultArgs { - std::string file_path; - PerformanceImportFile perf; - }; - optional index_result_args; - - static IndexProcess_Response CreateShutdown() { - IndexProcess_Response response; - response.type = Type::kShutdown; - return response; - } - - static IndexProcess_Response CreateIndexResult(const std::string& file_path, const PerformanceImportFile& perf) { - IndexProcess_Response response; - response.type = Type::kIndexResult; - response.index_result_args = IndexResultArgs(); - response.index_result_args->file_path = file_path; - response.index_result_args->perf = perf; - return response; - } + IndexResult(const std::string& file_path, const PerformanceImportFile& perf) : file_path(file_path), perf(perf) {} }; -MAKE_REFLECT_TYPE_PROXY(IndexProcess_Response::Type, int); -MAKE_REFLECT_STRUCT(IndexProcess_Response::IndexResultArgs, file_path, perf); -MAKE_REFLECT_STRUCT(IndexProcess_Response, type, index_result_args); +MAKE_REFLECT_STRUCT(IndexResult, file_path, perf); @@ -719,7 +654,7 @@ MAKE_REFLECT_STRUCT(IndexProcess_Response, type, index_result_args); struct QueueManager { - using IndexProcess_ResponseQueue = ThreadedQueue; + using IndexProcess_ResponseQueue = ThreadedQueue; using Index_DoIdMapQueue = ThreadedQueue; using Index_OnIdMappedQueue = ThreadedQueue; using Index_OnIndexedQueue = ThreadedQueue; @@ -883,8 +818,7 @@ bool IndexMain_DoIndex(Config* config, WorkingFiles* working_files, clang::Index* index, QueueManager* queue) { - optional request = - queue->process_response.TryDequeue(); + optional request = queue->process_response.TryDequeue(); if (!request) return false; @@ -1022,18 +956,7 @@ void IndexMain(Config* config, struct IQueryDbResponder { - virtual void Write(IndexProcess_Response response) = 0; -}; - -struct OutOfProcessQueryDbResponder : IQueryDbResponder { - void Write(IndexProcess_Response response) override { - rapidjson::StringBuffer output; - rapidjson::PrettyWriter writer(output); - - Reflect(writer, response); - std::cerr << "!! Wrote to querydb " + std::string(output.GetString()) + "\n"; - std::cout << output.GetSize() << "\n\n" << output.GetString(); - } + virtual void Write(IndexResult result) = 0; }; struct InProcessQueryDbResponder : IQueryDbResponder { @@ -1042,76 +965,19 @@ struct InProcessQueryDbResponder : IQueryDbResponder { InProcessQueryDbResponder(QueueManager::IndexProcess_ResponseQueue* queue) : queue_(queue) {} - void Write(IndexProcess_Response response) override { - if (response.index_result_args) - queue_->Enqueue(std::move(*response.index_result_args)); + void Write(IndexResult result) override { + queue_->Enqueue(std::move(result)); } }; -optional ReadContentFromSource(std::function()> read) { - // Read the content length. It is terminated by two \n\n characters. - std::string stringified_content_length; - char last = 0; - while (true) { - optional opt_c = read(); - if (!opt_c) - return nullopt; - char c = *opt_c; - if (last == '\n' && c == '\n') - break; - last = c; - stringified_content_length += c; - } - int content_length = atoi(stringified_content_length.c_str()); - - // Read content. - std::string content; - content.reserve(content_length); - for (size_t i = 0; i < content_length; ++i) { - char c; - std::cin.read(&c, 1); - content += c; - } - - return content; -} - -void PumpIndexThreadStdioReaderMain(ThreadedQueue* messages) { - // Read content. - optional content = ReadContentFromSource([]() { - // Bad stdin means parent process has probably exited. Either way, indexer - // process can no longer be communicated with so exit. - if (!std::cin.good()) - exit(0); - - char c = 0; - std::cin.read(&c, 1); - return c; - }); - assert(content); - - // Parse content. - rapidjson::Document document; - document.Parse(content->c_str(), content->size()); - assert(!document.HasParseError()); - - // Deserialize content. - IndexProcess_Request message; - Reflect(document, message); - assert(message.type != IndexProcess_Request::Type::kInvalid); - - // Push message to queue. - messages->Enqueue(std::move(message)); -} - -std::vector DoParseFile( +std::vector DoParseFile( Config* config, clang::Index* index, FileConsumer::SharedState* file_consumer_shared, CacheLoader* cache_loader, const std::string& path, const std::vector& args) { - std::vector result; + std::vector result; IndexFile* previous_index = cache_loader->TryLoad(path); if (previous_index) { @@ -1150,10 +1016,10 @@ std::vector DoParseFile( // TODO/FIXME: real perf PerformanceImportFile perf; - result.push_back(IndexProcess_Response::CreateIndexResult(path, perf)); + result.push_back(IndexResult(path, perf)); for (const std::string& dependency : previous_index->dependencies) { LOG_S(INFO) << "Emitting index result for " << dependency; - result.push_back(IndexProcess_Response::CreateIndexResult(dependency, perf)); + result.push_back(IndexResult(dependency, perf)); } return result; } @@ -1209,13 +1075,18 @@ std::vector DoParseFile( perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset(); LOG_S(INFO) << "Emitting index result for " << new_index->path; - result.push_back(IndexProcess_Response::CreateIndexResult(new_index->path, perf)); + result.push_back(IndexResult(new_index->path, perf)); } return result; } -std::vector ParseFile( + +// TODO: import to CACHE_DIR/staging/foo.cc +// TODO: split index files into foo.cc.json, foo.cc.timestamp, foo.cc + + +std::vector ParseFile( Config* config, clang::Index* index, FileConsumer::SharedState* file_consumer_shared, @@ -1235,7 +1106,7 @@ std::vector ParseFile( void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexProcess_Request_IndexQueue* queue, std::atomic* busy, FileConsumer::SharedState* file_consumer_shared) { while (true) { - IndexProcess_Request::IndexArgs request = queue->DequeuePlusAction([&]() { + IndexRequest request = queue->DequeuePlusAction([&]() { ++(*busy); }); @@ -1243,7 +1114,7 @@ void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexPr Project::Entry entry; entry.filename = request.path; entry.args = request.args; - std::vector responses = ParseFile(config, &index, file_consumer_shared, entry); + std::vector responses = ParseFile(config, &index, file_consumer_shared, entry); for (const auto& response : responses) responder->Write(response); @@ -1254,14 +1125,12 @@ void IndexThreadMain(Config* config, IQueryDbResponder* responder, Index_IndexPr struct IIndexerProcess { - virtual void Restart() = 0; - virtual void EnableAutoRestart() = 0; virtual void SetConfig(const Config& config) = 0; - virtual void SendMessage(IndexProcess_Request message) = 0; + virtual void SendMessage(IndexRequest message) = 0; }; struct InProcessIndexer : IIndexerProcess { - ThreadedQueue* messages_; + ThreadedQueue* messages_; Index_IndexProcess_Request_IndexQueue queue_; std::vector indexer_threads_; std::atomic num_busy_indexers_; @@ -1275,39 +1144,22 @@ struct InProcessIndexer : IIndexerProcess { // interested in indexing. FileConsumer::SharedState file_consumer_shared_; - explicit InProcessIndexer(IQueryDbResponder* responder, ThreadedQueue* messages) + explicit InProcessIndexer(IQueryDbResponder* responder, ThreadedQueue* messages) : messages_(messages), num_busy_indexers_(0), responder_(responder) {} - void Restart() override {} // no-op - void EnableAutoRestart() override {} // no-op - void SetConfig(const Config& config) override { - SendMessage(IndexProcess_Request::CreateInitialize(config)); + config_ = config; + for (int i = 0; i < config_.indexerCount; ++i) { + indexer_threads_.push_back(std::thread([&, i]() { + SetCurrentThreadName("indexer" + std::to_string(i)); + IndexThreadMain(&config_, responder_, &queue_, &num_busy_indexers_, &file_consumer_shared_); + })); + } } - void SendMessage(IndexProcess_Request message) override { - switch (message.type) { - case IndexProcess_Request::Type::kInitialize: { - config_ = *message.initialize_args; - for (int i = 0; i < config_.indexerCount; ++i) { - indexer_threads_.push_back(std::thread([&, i]() { - SetCurrentThreadName("indexer" + std::to_string(i)); - IndexThreadMain(&config_, responder_, &queue_, &num_busy_indexers_, &file_consumer_shared_); - })); - } - break; - } - case IndexProcess_Request::Type::kIndex: { - // Dispatch the request so one of the indexers will pick it up. - queue_.Enqueue(std::move(*message.index_args)); - break; - } - case IndexProcess_Request::Type::kInvalid: - case IndexProcess_Request::Type::kQuit: { - LOG_S(ERROR) << "Unhandled IndexProcess_Request::Type " << static_cast(message.type); - break; - } - } + void SendMessage(IndexRequest message) override { + // Dispatch the request so one of the indexers will pick it up. + queue_.Enqueue(std::move(message)); } bool TryWaitUntilIdle() { @@ -1323,194 +1175,6 @@ struct InProcessIndexer : IIndexerProcess { } }; -struct OutOfProcessIndexer : IIndexerProcess { - struct ProcessState { - std::unique_ptr process; - std::string unhandled_output; - }; - std::recursive_mutex processes_mutex_; - std::unordered_map processes_; - int next_process_id_ = 0; - - optional config_; - - std::string bin_name_; - QueueManager::IndexProcess_ResponseQueue* response_queue_; - - const int kMaxIndexRequestsUntilRestart = 25; - bool enable_auto_restart_ = false; - int number_of_index_requests_since_last_restart_ = 0; - - OutOfProcessIndexer(const std::string& bin_name, - QueueManager::IndexProcess_ResponseQueue* response_queue) - : bin_name_(bin_name), response_queue_(response_queue) { - CreateIndexProcess(); - } - - void Restart() override { - SendMessage(IndexProcess_Request::CreateQuit()); - CreateIndexProcess(); - number_of_index_requests_since_last_restart_ = 0; - } - - void EnableAutoRestart() override { - enable_auto_restart_ = true; - } - - void SetConfig(const Config& config) override { - assert(!config_); - config_ = config; - SendMessage(IndexProcess_Request::CreateInitialize(*config_)); - } - - void SendMessage(IndexProcess_Request message) override { - if (message.type == IndexProcess_Request::Type::kIndex && - number_of_index_requests_since_last_restart_++ > kMaxIndexRequestsUntilRestart) { - Restart(); - } - - std::string content; - - rapidjson::StringBuffer output; - rapidjson::PrettyWriter writer(output); - - Reflect(writer, message); - std::cerr << "!!! WRITING TO INDEXER: " << output.GetString() << std::endl; - - //std::lock_guard processes_lock(processes_mutex_); - processes_[next_process_id_ - 1].process->write(std::to_string(output.GetSize()) + "\n\n"); - processes_[next_process_id_ - 1].process->write(output.GetString(), output.GetSize()); - } - - void CreateIndexProcess() { - int process_id = next_process_id_++; - ProcessState state; - state.process = MakeUnique( - bin_name_ + std::string(" --indexer"), ".", - [this, process_id](const char* bytes, size_t n) { OnStdOut(process_id, bytes, n); }, - [this](const char* bytes, size_t n) { OnStdErr(bytes, n); }, - true /*open_stdin*/); - - //std::lock_guard processes_lock(processes_mutex_); - processes_[process_id] = std::move(state); - - if (config_) - SendMessage(IndexProcess_Request::CreateInitialize(*config_)); - } - - void ParseOutput(int process_id) { - //std::lock_guard processes_lock(processes_mutex_); - while (true) { - size_t next_idx = 0; - optional content = ReadContentFromSource([&]() -> optional { - if (next_idx >= processes_[process_id].unhandled_output.size()) - return nullopt; - return processes_[process_id].unhandled_output[next_idx++]; - }); - std::cerr << "!! querydb failed to read input; next_idx=" << next_idx << std::endl; - if (!content) - return; - processes_[process_id].unhandled_output = processes_[process_id].unhandled_output.substr(next_idx); - - std::cerr << "!! querydb got input " << *content << " from process " << process_id << std::endl; - - // Parse content. - rapidjson::Document document; - document.Parse(content->c_str(), content->size()); - assert(!document.HasParseError()); - - // Deserialize content. - IndexProcess_Response message; - Reflect(document, message); - - // Cleanup state from our side if the process exits. - if (message.type == IndexProcess_Response::Type::kShutdown) { - std::cerr << "!!! Got process shutdown message !!!\n"; - // Delete the process on a separate thread, since a thread cannot destroy itself. - //std::async([&, process_id]() { - // std::lock_guard processes_lock(processes_mutex_); - // processes_.erase(processes_.find(process_id)); - //}); - return; - } - - // Push message to queue. - response_queue_->Enqueue(std::move(*message.index_result_args)); - } - } - - void OnStdOut(int process_id, const char* bytes, size_t n) { - std::string content; - for (size_t i = 0; i < n; ++i) - content += bytes[i]; - std::cerr << "!!! ON STDOUT for process " + std::to_string(process_id) + " with content " + content + "\n"; - - { - //std::lock_guard processes_lock(processes_mutex_); - for (size_t i = 0; i < n; ++i) - processes_[process_id].unhandled_output += bytes[i]; - } - - std::cerr << "!&&! Begin ParseOutput\n"; - ParseOutput(process_id); - std::cerr << "!&&! End ParseOutput\n"; - } - - void OnStdErr(const char* bytes, size_t n) { - std::string content = "OOP [indexer]: "; - for (size_t i = 0; i < n; ++i) - content += bytes[i]; - std::cerr << content; - } -}; - -// Main function for the out-of-process indexer. -void IndexProcessMain() { - // TODO - // querydb process is responsible for owning the buffer. - //constexpr const char* kIpcBufferName = "CqueryIpc"; - //constexpr size_t kIpcBufferSize = 1024 * 8; - //MessageQueue queue(Buffer::CreateSharedBuffer(kIpcBufferName, kIpcBufferSize), true /*buffer_has_data*/); - - std::cerr << "Indexer process starting\n"; - - OutOfProcessQueryDbResponder responder; - ThreadedQueue messages; - InProcessIndexer indexer(&responder, &messages); - - std::thread stdin_reader([&]() { - SetCurrentThreadName("IndexStdinReader"); - while (true) - PumpIndexThreadStdioReaderMain(&messages); - }); - - SetCurrentThreadName("IndexMain"); - while (true) { - IndexProcess_Request message = messages.Dequeue(); - std::cerr << "!! Got message.type=" + std::to_string((int)message.type) + "\n"; - - if (message.type == IndexProcess_Request::Type::kInitialize) - indexer.SetConfig(*message.initialize_args); - - if (message.type == IndexProcess_Request::Type::kQuit) { - if (!indexer.TryWaitUntilIdle()) { - // Process other messages. - assert(!messages.IsEmpty()); - messages.Enqueue(std::move(message)); - continue; - } - - responder.Write(IndexProcess_Response::CreateShutdown()); - assert(messages.IsEmpty()); - exit(0); - } - - indexer.SendMessage(message); - } -} - - - @@ -1680,10 +1344,8 @@ bool QueryDbMainLoop( //std::cerr << "[" << i << "/" << (project->entries.size() - 1) // << "] Dispatching index request for file " << entry.filename // << std::endl; - indexer_process->SendMessage(IndexProcess_Request::CreateIndex(entry.filename, entry.args)); + indexer_process->SendMessage(IndexRequest(entry.filename, entry.args)); }); - indexer_process->Restart(); - indexer_process->EnableAutoRestart(); // We need to support multiple concurrent index processes. time.ResetAndPrint("[perf] Dispatched initial index requests"); @@ -1745,7 +1407,7 @@ bool QueryDbMainLoop( project->ForAllFilteredFiles(config, [&](int i, const Project::Entry& entry) { LOG_S(INFO) << "[" << i << "/" << (project->entries.size() - 1) << "] Dispatching index request for file " << entry.filename; - indexer_process->SendMessage(IndexProcess_Request::CreateIndex(entry.filename, entry.args)); + indexer_process->SendMessage(IndexRequest(entry.filename, entry.args)); }); break; } @@ -2003,7 +1665,7 @@ bool QueryDbMainLoop( // if so, ignore that index response. // TODO: send as priority request Project::Entry entry = project->FindCompilationEntryForFile(path); - indexer_process->SendMessage(IndexProcess_Request::CreateIndex(entry.filename, entry.args)); + indexer_process->SendMessage(IndexRequest(entry.filename, entry.args)); clang_complete->NotifySave(path); @@ -2935,7 +2597,7 @@ void QueryDbMain(const std::string& bin_name, Config* config, MultiQueueWaiter* FileConsumer::SharedState file_consumer_shared; InProcessQueryDbResponder responder(&queue.process_response); - ThreadedQueue queue_process_request; + ThreadedQueue queue_process_request; auto indexer = MakeUnique(&responder, &queue_process_request); //auto indexer = MakeUnique(bin_name, &queue_process_response); @@ -3289,10 +2951,6 @@ int main(int argc, char** argv) { std::cin.get(); return 0; } - else if (HasOption(options, "--indexer")) { - IndexProcessMain(); - return 0; - } else if (HasOption(options, "--language-server")) { //std::cerr << "Running language server" << std::endl; Config config;