From 9a529bd69176a87ece1b535b956dc037052b6ff4 Mon Sep 17 00:00:00 2001 From: Fangrui Song Date: Mon, 24 Dec 2018 22:20:00 -0800 Subject: [PATCH] Delay requests if the document has not not indexed (#176) This fixes a plethora of "not indexed" errors when the document has not been indexed. * Message handler throws NotIndexed if not overdue * The message is put into backlog and tagged with backlog_path * path2backlog[path] tracks backlog associated with document `path` * The backlog is cleared when the index is merged * backlog[0] is forced to run if it becomes overdue --- src/config.hh | 10 ++++- src/lsp.hh | 4 +- src/message_handler.cc | 38 ++++++++++++----- src/message_handler.hh | 12 +++++- src/messages/ccls_call.cc | 3 +- src/messages/ccls_inheritance.cc | 6 +-- src/messages/ccls_member.cc | 7 +--- src/messages/ccls_navigate.cc | 4 +- src/messages/ccls_vars.cc | 4 +- src/messages/initialize.cc | 2 +- src/messages/textDocument_code.cc | 17 +++----- src/messages/textDocument_completion.cc | 2 +- src/messages/textDocument_definition.cc | 21 +++------- src/messages/textDocument_document.cc | 15 ++----- src/messages/textDocument_foldingRange.cc | 7 +--- src/messages/textDocument_formatting.cc | 15 ++----- src/messages/textDocument_hover.cc | 7 +--- src/messages/textDocument_references.cc | 7 +--- src/messages/textDocument_rename.cc | 4 +- src/messages/textDocument_signatureHelp.cc | 5 +-- src/pipeline.cc | 48 ++++++++++++++++++++-- src/threaded_queue.hh | 9 ++++ 22 files changed, 142 insertions(+), 105 deletions(-) diff --git a/src/config.hh b/src/config.hh index 1fec1140..4c7eabbb 100644 --- a/src/config.hh +++ b/src/config.hh @@ -242,6 +242,12 @@ struct Config { std::vector whitelist; } index; + struct Request { + // If the document of a request has not been indexed, wait up to this many + // milleseconds before reporting error. + int64_t timeout = 5000; + } request; + struct Session { int maxNum = 10; } session; @@ -278,12 +284,14 @@ REFLECT_STRUCT(Config::Index, blacklist, comments, initialBlacklist, initialWhitelist, multiVersion, multiVersionBlacklist, multiVersionWhitelist, onChange, threads, trackDependency, whitelist); +REFLECT_STRUCT(Config::Request, timeout); REFLECT_STRUCT(Config::Session, maxNum); REFLECT_STRUCT(Config::WorkspaceSymbol, caseSensitivity, maxNum, sort); REFLECT_STRUCT(Config::Xref, maxNum); REFLECT_STRUCT(Config, compilationDatabaseCommand, compilationDatabaseDirectory, cacheDirectory, cacheFormat, clang, client, codeLens, completion, - diagnostics, highlight, index, session, workspaceSymbol, xref); + diagnostics, highlight, index, request, session, workspaceSymbol, + xref); extern Config *g_config; diff --git a/src/lsp.hh b/src/lsp.hh index c44dbdf5..8f8efda9 100644 --- a/src/lsp.hh +++ b/src/lsp.hh @@ -21,8 +21,8 @@ limitations under the License. #include +#include #include -#include namespace ccls { struct RequestId { @@ -43,6 +43,8 @@ struct InMessage { std::string method; std::unique_ptr message; std::unique_ptr document; + std::chrono::steady_clock::time_point deadline; + std::string backlog_path; }; enum class ErrorCode { diff --git a/src/message_handler.cc b/src/message_handler.cc index b88f777a..a9878f81 100644 --- a/src/message_handler.cc +++ b/src/message_handler.cc @@ -109,11 +109,8 @@ struct ScanLineEvent { }; } // namespace -void ReplyOnce::NotReady(bool file) { - if (file) - Error(ErrorCode::InvalidRequest, "not opened"); - else - Error(ErrorCode::InternalError, "not indexed"); +void ReplyOnce::NotOpened(std::string_view path) { + Error(ErrorCode::InvalidRequest, std::string(path) + " is not opened"); } void ReplyOnce::ReplyLocationLink(std::vector &result) { @@ -215,13 +212,11 @@ MessageHandler::MessageHandler() { void MessageHandler::Run(InMessage &msg) { rapidjson::Document &doc = *msg.document; - rapidjson::Value param; + rapidjson::Value null; auto it = doc.FindMember("params"); - if (it != doc.MemberEnd()) - param = it->value; - JsonReader reader(¶m); + JsonReader reader(it != doc.MemberEnd() ? &it->value : &null); if (msg.id.Valid()) { - ReplyOnce reply{msg.id}; + ReplyOnce reply{*this, msg.id}; auto it = method2request.find(msg.method); if (it != method2request.end()) { try { @@ -230,6 +225,8 @@ void MessageHandler::Run(InMessage &msg) { reply.Error(ErrorCode::InvalidParams, "invalid params of " + msg.method + ": expected " + ex.what() + " for " + reader.GetPath()); + } catch (NotIndexed &) { + throw; } catch (...) { reply.Error(ErrorCode::InternalError, "failed to process " + msg.method); } @@ -249,7 +246,8 @@ void MessageHandler::Run(InMessage &msg) { } } -QueryFile *MessageHandler::FindFile(const std::string &path, int *out_file_id) { +QueryFile *MessageHandler::FindFile(const std::string &path, + int *out_file_id) { QueryFile *ret = nullptr; auto it = db->name2file_id.find(LowerPathIfInsensitive(path)); if (it != db->name2file_id.end()) { @@ -266,6 +264,24 @@ QueryFile *MessageHandler::FindFile(const std::string &path, int *out_file_id) { return ret; } +std::pair +MessageHandler::FindOrFail(const std::string &path, ReplyOnce &reply, + int *out_file_id) { + WorkingFile *wf = wfiles->GetFile(path); + if (!wf) { + reply.NotOpened(path); + return {nullptr, nullptr}; + } + QueryFile *file = FindFile(path, out_file_id); + if (!file) { + if (!overdue) + throw NotIndexed{path}; + reply.Error(ErrorCode::InvalidRequest, "not indexed"); + return {nullptr, nullptr}; + } + return {file, wf}; +} + void EmitSkippedRanges(WorkingFile *wfile, QueryFile &file) { CclsSetSkippedRanges params; params.uri = DocumentUri::FromPath(wfile->filename); diff --git a/src/message_handler.hh b/src/message_handler.hh index b0a3747c..949fc242 100644 --- a/src/message_handler.hh +++ b/src/message_handler.hh @@ -199,7 +199,13 @@ REFLECT_STRUCT(Diagnostic, range, severity, code, source, message); REFLECT_STRUCT(ShowMessageParam, type, message); REFLECT_UNDERLYING_B(LanguageId); +struct NotIndexed { + std::string path; +}; +struct MessageHandler; + struct ReplyOnce { + MessageHandler &handler; RequestId id; template void operator()(Res &&result) const { if (id.Valid()) @@ -210,7 +216,7 @@ struct ReplyOnce { if (id.Valid()) pipeline::ReplyError(id, [&](JsonWriter &w) { Reflect(w, err); }); } - void NotReady(bool file); + void NotOpened(std::string_view path); void ReplyLocationLink(std::vector &result); }; @@ -225,10 +231,14 @@ struct MessageHandler { llvm::StringMap> method2notification; llvm::StringMap> method2request; + bool overdue = false; MessageHandler(); void Run(InMessage &msg); QueryFile *FindFile(const std::string &path, int *out_file_id = nullptr); + std::pair FindOrFail(const std::string &path, + ReplyOnce &reply, + int *out_file_id = nullptr); private: void Bind(const char *method, void (MessageHandler::*handler)(JsonReader &)); diff --git a/src/messages/ccls_call.cc b/src/messages/ccls_call.cc index 5a0e6d8e..a2a221d4 100644 --- a/src/messages/ccls_call.cc +++ b/src/messages/ccls_call.cc @@ -200,8 +200,7 @@ void MessageHandler::ccls_call(JsonReader &reader, ReplyOnce &reply) { Expand(this, &*result, param.callee, param.callType, param.qualified, param.levels); } else { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) return; for (SymbolRef sym : FindSymbolsAtLocation(wf, file, param.position)) { diff --git a/src/messages/ccls_inheritance.cc b/src/messages/ccls_inheritance.cc index b7520ddf..a9c18b91 100644 --- a/src/messages/ccls_inheritance.cc +++ b/src/messages/ccls_inheritance.cc @@ -146,10 +146,10 @@ void Inheritance(MessageHandler *m, Param ¶m, ReplyOnce &reply) { Expand(m, &*result, param.derived, param.qualified, param.levels))) result.reset(); } else { - QueryFile *file = m->FindFile(param.textDocument.uri.GetPath()); - if (!file) + auto [file, wf] = m->FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) { return; - WorkingFile *wf = m->wfiles->GetFile(file->def->path); + } for (SymbolRef sym : FindSymbolsAtLocation(wf, file, param.position)) if (sym.kind == Kind::Func || sym.kind == Kind::Type) { result = BuildInitial(m, sym, param.derived, param.qualified, diff --git a/src/messages/ccls_member.cc b/src/messages/ccls_member.cc index d94a3d88..52e46823 100644 --- a/src/messages/ccls_member.cc +++ b/src/messages/ccls_member.cc @@ -281,12 +281,9 @@ void MessageHandler::ccls_member(JsonReader &reader, ReplyOnce &reply) { param.levels, param.kind))) result.reset(); } else { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) return; - } for (SymbolRef sym : FindSymbolsAtLocation(wf, file, param.position)) { switch (sym.kind) { case Kind::Func: diff --git a/src/messages/ccls_navigate.cc b/src/messages/ccls_navigate.cc index d09d57a8..2afef719 100644 --- a/src/messages/ccls_navigate.cc +++ b/src/messages/ccls_navigate.cc @@ -41,10 +41,8 @@ Maybe FindParent(QueryFile *file, Pos pos) { void MessageHandler::ccls_navigate(JsonReader &reader, ReplyOnce &reply) { Param param; Reflect(reader, param); - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) { - reply.NotReady(file); return; } Position ls_pos = param.position; diff --git a/src/messages/ccls_vars.cc b/src/messages/ccls_vars.cc index 2673ecf0..4ad5bc21 100644 --- a/src/messages/ccls_vars.cc +++ b/src/messages/ccls_vars.cc @@ -31,10 +31,8 @@ REFLECT_STRUCT(Param, textDocument, position, kind); void MessageHandler::ccls_vars(JsonReader &reader, ReplyOnce &reply) { Param param; Reflect(reader, param); - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) { - reply.NotReady(file); return; } diff --git a/src/messages/initialize.cc b/src/messages/initialize.cc index bbafa9d2..9f2948d6 100644 --- a/src/messages/initialize.cc +++ b/src/messages/initialize.cc @@ -388,7 +388,7 @@ void MessageHandler::initialize(JsonReader &reader, ReplyOnce &reply) { void StandaloneInitialize(MessageHandler &handler, const std::string &root) { InitializeParam param; param.rootUri = DocumentUri::FromPath(root); - ReplyOnce reply; + ReplyOnce reply{handler}; Initialize(&handler, param, reply); } diff --git a/src/messages/textDocument_code.cc b/src/messages/textDocument_code.cc index 4d288f03..b3aa83df 100644 --- a/src/messages/textDocument_code.cc +++ b/src/messages/textDocument_code.cc @@ -35,11 +35,9 @@ REFLECT_STRUCT(CodeAction, title, kind, edit); } void MessageHandler::textDocument_codeAction(CodeActionParam ¶m, ReplyOnce &reply) { - WorkingFile *wf = wfiles->GetFile(param.textDocument.uri.GetPath()); - if (!wf) { - reply.NotReady(true); + WorkingFile *wf = FindOrFail(param.textDocument.uri.GetPath(), reply).second; + if (!wf) return; - } std::vector result; std::vector diagnostics; wfiles->WithLock([&]() { diagnostics = wf->diagnostics; }); @@ -96,16 +94,13 @@ struct CommonCodeLensParams { void MessageHandler::textDocument_codeLens(TextDocumentParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) return; - } std::vector result; - auto Add = [&](const char *singular, Cmd_xref show, Range range, int num, - bool force_display = false) { + auto Add = [&, wf = wf](const char *singular, Cmd_xref show, Range range, + int num, bool force_display = false) { if (!num && !force_display) return; std::optional ls_range = GetLsRange(wf, range); diff --git a/src/messages/textDocument_completion.cc b/src/messages/textDocument_completion.cc index a29d895a..f401931e 100644 --- a/src/messages/textDocument_completion.cc +++ b/src/messages/textDocument_completion.cc @@ -459,7 +459,7 @@ void MessageHandler::textDocument_completion(CompletionParam ¶m, std::string path = param.textDocument.uri.GetPath(); WorkingFile *wf = wfiles->GetFile(path); if (!wf) { - reply.NotReady(true); + reply.NotOpened(path); return; } diff --git a/src/messages/textDocument_definition.cc b/src/messages/textDocument_definition.cc index 46f04cf9..f857e496 100644 --- a/src/messages/textDocument_definition.cc +++ b/src/messages/textDocument_definition.cc @@ -48,12 +48,9 @@ std::vector GetNonDefDeclarationTargets(DB *db, SymbolRef sym) { void MessageHandler::textDocument_declaration(TextDocumentPositionParam ¶m, ReplyOnce &reply) { int file_id; - QueryFile *file = FindFile(param.textDocument.uri.GetPath(), &file_id); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply, &file_id); + if (!wf) return; - } std::vector result; Position &ls_pos = param.position; @@ -69,12 +66,9 @@ void MessageHandler::textDocument_declaration(TextDocumentPositionParam ¶m, void MessageHandler::textDocument_definition(TextDocumentPositionParam ¶m, ReplyOnce &reply) { int file_id; - QueryFile *file = FindFile(param.textDocument.uri.GetPath(), &file_id); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply, &file_id); + if (!wf) return; - } std::vector result; Maybe on_def; @@ -190,12 +184,9 @@ void MessageHandler::textDocument_definition(TextDocumentPositionParam ¶m, void MessageHandler::textDocument_typeDefinition( TextDocumentPositionParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!file) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!file) return; - } std::vector result; auto Add = [&](const QueryType &type) { diff --git a/src/messages/textDocument_document.cc b/src/messages/textDocument_document.cc index 32271c2f..ef8f57d0 100644 --- a/src/messages/textDocument_document.cc +++ b/src/messages/textDocument_document.cc @@ -44,12 +44,9 @@ REFLECT_STRUCT(DocumentHighlight, range, kind, role); void MessageHandler::textDocument_documentHighlight( TextDocumentPositionParam ¶m, ReplyOnce &reply) { int file_id; - QueryFile *file = FindFile(param.textDocument.uri.GetPath(), &file_id); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply, &file_id); + if (!wf) return; - } std::vector result; std::vector syms = @@ -90,10 +87,8 @@ REFLECT_STRUCT(DocumentLink, range, target); void MessageHandler::textDocument_documentLink(TextDocumentParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) { - reply.NotReady(file); return; } @@ -165,10 +160,8 @@ void MessageHandler::textDocument_documentSymbol(JsonReader &reader, Reflect(reader, param); int file_id; - QueryFile *file = FindFile(param.textDocument.uri.GetPath(), &file_id); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply, &file_id); if (!wf) { - reply.NotReady(file); return; } diff --git a/src/messages/textDocument_foldingRange.cc b/src/messages/textDocument_foldingRange.cc index 67fb0c0a..a3f1981c 100644 --- a/src/messages/textDocument_foldingRange.cc +++ b/src/messages/textDocument_foldingRange.cc @@ -31,12 +31,9 @@ REFLECT_STRUCT(FoldingRange, startLine, startCharacter, endLine, endCharacter, void MessageHandler::textDocument_foldingRange(TextDocumentParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) return; - } std::vector result; std::optional ls_range; diff --git a/src/messages/textDocument_formatting.cc b/src/messages/textDocument_formatting.cc index 37bd7c46..ba46313d 100644 --- a/src/messages/textDocument_formatting.cc +++ b/src/messages/textDocument_formatting.cc @@ -80,21 +80,16 @@ void Format(ReplyOnce &reply, WorkingFile *wfile, tooling::Range range) { void MessageHandler::textDocument_formatting(DocumentFormattingParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) return; - } Format(reply, wf, {0, (unsigned)wf->buffer_content.size()}); } void MessageHandler::textDocument_onTypeFormatting( DocumentOnTypeFormattingParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) { - reply.NotReady(file); return; } std::string_view code = wf->buffer_content; @@ -107,10 +102,8 @@ void MessageHandler::textDocument_onTypeFormatting( void MessageHandler::textDocument_rangeFormatting( DocumentRangeFormattingParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) { - reply.NotReady(file); return; } std::string_view code = wf->buffer_content; diff --git a/src/messages/textDocument_hover.cc b/src/messages/textDocument_hover.cc index 7a3e0341..848dc3a5 100644 --- a/src/messages/textDocument_hover.cc +++ b/src/messages/textDocument_hover.cc @@ -93,12 +93,9 @@ GetHover(DB *db, LanguageId lang, SymbolRef sym, int file_id) { void MessageHandler::textDocument_hover(TextDocumentPositionParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) return; - } Hover result; for (SymbolRef sym : FindSymbolsAtLocation(wf, file, param.position)) { diff --git a/src/messages/textDocument_references.cc b/src/messages/textDocument_references.cc index 8c784553..e9858dfc 100644 --- a/src/messages/textDocument_references.cc +++ b/src/messages/textDocument_references.cc @@ -45,12 +45,9 @@ void MessageHandler::textDocument_references(JsonReader &reader, ReplyOnce &reply) { ReferenceParam param; Reflect(reader, param); - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; - if (!wf) { - reply.NotReady(file); + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); + if (!wf) return; - } for (auto &folder : param.folders) EnsureEndsInSlash(folder); diff --git a/src/messages/textDocument_rename.cc b/src/messages/textDocument_rename.cc index 05028263..7c478b12 100644 --- a/src/messages/textDocument_rename.cc +++ b/src/messages/textDocument_rename.cc @@ -56,10 +56,8 @@ WorkspaceEdit BuildWorkspaceEdit(DB *db, WorkingFiles *wfiles, SymbolRef sym, } // namespace void MessageHandler::textDocument_rename(RenameParam ¶m, ReplyOnce &reply) { - QueryFile *file = FindFile(param.textDocument.uri.GetPath()); - WorkingFile *wf = file ? wfiles->GetFile(file->def->path) : nullptr; + auto [file, wf] = FindOrFail(param.textDocument.uri.GetPath(), reply); if (!wf) { - reply.NotReady(file); return; } diff --git a/src/messages/textDocument_signatureHelp.cc b/src/messages/textDocument_signatureHelp.cc index 3806ffe8..420ec164 100644 --- a/src/messages/textDocument_signatureHelp.cc +++ b/src/messages/textDocument_signatureHelp.cc @@ -152,12 +152,11 @@ public: void MessageHandler::textDocument_signatureHelp( TextDocumentPositionParam ¶m, ReplyOnce &reply) { static CompleteConsumerCache cache; - - std::string path = param.textDocument.uri.GetPath(); Position begin_pos = param.position; + std::string path = param.textDocument.uri.GetPath(); WorkingFile *wf = wfiles->GetFile(path); if (!wf) { - reply.NotReady(true); + reply.NotOpened(path); return; } { diff --git a/src/pipeline.cc b/src/pipeline.cc index cb8ccc4e..650c44d2 100644 --- a/src/pipeline.cc +++ b/src/pipeline.cc @@ -32,7 +32,6 @@ limitations under the License. #include #include #include -using namespace llvm; #include #include @@ -41,6 +40,8 @@ using namespace llvm; #ifndef _WIN32 #include #endif +using namespace llvm; +namespace chrono = std::chrono; namespace ccls { namespace { @@ -527,8 +528,11 @@ void LaunchStdin() { if (method.empty()) continue; bool should_exit = method == "exit"; + // g_config is not available before "initialize". Use 0 in that case. on_request->PushBack( - {id, std::move(method), std::move(message), std::move(document)}); + {id, std::move(method), std::move(message), std::move(document), + chrono::steady_clock::now() + + chrono::milliseconds(g_config ? g_config->request.timeout : 0)}); if (should_exit) break; @@ -590,11 +594,34 @@ void MainLoop() { handler.include_complete = &include_complete; bool has_indexed = false; + std::deque backlog; + StringMap> path2backlog; while (true) { + if (backlog.size()) { + auto now = chrono::steady_clock::now(); + handler.overdue = true; + while (backlog.size()) { + if (backlog[0].backlog_path.size()) { + if (now < backlog[0].deadline) + break; + handler.Run(backlog[0]); + path2backlog[backlog[0].backlog_path].pop_front(); + } + backlog.pop_front(); + } + handler.overdue = false; + } + std::vector messages = on_request->DequeueAll(); bool did_work = messages.size(); for (InMessage &message : messages) - handler.Run(message); + try { + handler.Run(message); + } catch (NotIndexed &ex) { + backlog.push_back(std::move(message)); + backlog.back().backlog_path = ex.path; + path2backlog[ex.path].push_back(&backlog.back()); + } bool indexed = false; for (int i = 20; i--;) { @@ -604,6 +631,16 @@ void MainLoop() { did_work = true; indexed = true; Main_OnIndexed(&db, &wfiles, &*update); + if (update->files_def_update) { + auto it = path2backlog.find(update->files_def_update->first.path); + if (it != path2backlog.end()) { + for (auto &message : it->second) { + handler.Run(*message); + message->backlog_path.clear(); + } + path2backlog.erase(it); + } + } } if (did_work) { @@ -615,7 +652,10 @@ void MainLoop() { FreeUnusedMemory(); has_indexed = false; } - main_waiter->Wait(quit, on_indexed, on_request); + if (backlog.empty()) + main_waiter->Wait(quit, on_indexed, on_request); + else + main_waiter->WaitUntil(backlog[0].deadline, on_indexed, on_request); } } diff --git a/src/threaded_queue.hh b/src/threaded_queue.hh index a2b5f48a..103027aa 100644 --- a/src/threaded_queue.hh +++ b/src/threaded_queue.hh @@ -18,6 +18,7 @@ limitations under the License. #include "utils.hh" #include +#include #include #include #include @@ -76,6 +77,14 @@ struct MultiQueueWaiter { } return true; } + + template + void WaitUntil(std::chrono::steady_clock::time_point t, + BaseThreadQueue... queues) { + MultiQueueLock l(queues...); + if (!HasState({queues...})) + cv.wait_until(l, t); + } }; // A threadsafe-queue. http://stackoverflow.com/a/16075550