diff --git a/src/messages/initialize.cc b/src/messages/initialize.cc index 2b51be38..a2d15848 100644 --- a/src/messages/initialize.cc +++ b/src/messages/initialize.cc @@ -230,6 +230,7 @@ void *Indexer(void *arg_) { std::string name = "indexer" + std::to_string(idx); set_thread_name(name.c_str()); pipeline::Indexer_Main(h->manager, h->vfs, h->project, h->wfiles); + pipeline::ThreadLeave(); return nullptr; } } // namespace @@ -353,7 +354,6 @@ void MessageHandler::shutdown(EmptyParam &, ReplyOnce &reply) { } void MessageHandler::exit(EmptyParam &) { - // FIXME cancel threads - ::exit(0); + pipeline::quit.store(true, std::memory_order_relaxed); } } // namespace ccls diff --git a/src/pipeline.cc b/src/pipeline.cc index 051a880a..5e1b5a5d 100644 --- a/src/pipeline.cc +++ b/src/pipeline.cc @@ -65,6 +65,7 @@ void StandaloneInitialize(MessageHandler &, const std::string &root); namespace pipeline { +std::atomic quit; std::atomic loaded_ts = ATOMIC_VAR_INIT(0), pending_index_requests = ATOMIC_VAR_INIT(0); int64_t tick = 0; @@ -79,6 +80,10 @@ struct Index_Request { int64_t ts = tick++; }; +std::mutex thread_mtx; +std::condition_variable no_active_threads; +int active_threads; + MultiQueueWaiter *main_waiter; MultiQueueWaiter *indexer_waiter; MultiQueueWaiter *stdout_waiter; @@ -349,8 +354,31 @@ bool Indexer_Parse(SemaManager *completion, WorkingFiles *wfiles, return true; } +void Quit(SemaManager &manager) { + quit.store(true, std::memory_order_relaxed); + manager.Quit(); + + { std::lock_guard lock(index_request->mutex_); } + indexer_waiter->cv.notify_all(); + { std::lock_guard lock(for_stdout->mutex_); } + stdout_waiter->cv.notify_one(); + std::unique_lock lock(thread_mtx); + no_active_threads.wait(lock, [] { return !active_threads; }); +} + } // namespace +void ThreadEnter() { + std::lock_guard lock(thread_mtx); + active_threads++; +} + +void ThreadLeave() { + std::lock_guard lock(thread_mtx); + if (!--active_threads) + no_active_threads.notify_one(); +} + void Init() { main_waiter = new MultiQueueWaiter; on_request = new ThreadedQueue(main_waiter); @@ -368,7 +396,8 @@ void Indexer_Main(SemaManager *manager, VFS *vfs, Project *project, GroupMatch matcher(g_config->index.whitelist, g_config->index.blacklist); while (true) if (!Indexer_Parse(manager, wfiles, project, vfs, matcher)) - indexer_waiter->Wait(index_request); + if (indexer_waiter->Wait(quit, index_request)) + break; } void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) { @@ -407,6 +436,7 @@ void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) { } void LaunchStdin() { + ThreadEnter(); std::thread([]() { set_thread_name("stdin"); std::string str; @@ -446,23 +476,27 @@ void LaunchStdin() { JsonReader reader{document.get()}; if (!reader.m->HasMember("jsonrpc") || std::string((*reader.m)["jsonrpc"].GetString()) != "2.0") - return; + break; RequestId id; std::string method; ReflectMember(reader, "id", id); ReflectMember(reader, "method", method); + if (method.empty()) + continue; + bool should_exit = method == "exit"; on_request->PushBack( {id, std::move(method), std::move(message), std::move(document)}); - if (method == "exit") + if (should_exit) break; } - }) - .detach(); + ThreadLeave(); + }).detach(); } void LaunchStdout() { - std::thread([=]() { + ThreadEnter(); + std::thread([]() { set_thread_name("stdout"); while (true) { @@ -471,10 +505,11 @@ void LaunchStdout() { llvm::outs() << "Content-Length: " << s.size() << "\r\n\r\n" << s; llvm::outs().flush(); } - stdout_waiter->Wait(for_stdout); + if (stdout_waiter->Wait(quit, for_stdout)) + break; } - }) - .detach(); + ThreadLeave(); + }).detach(); } void MainLoop() { @@ -528,16 +563,20 @@ void MainLoop() { Main_OnIndexed(&db, &wfiles, &*update); } - if (did_work) + if (did_work) { has_indexed |= indexed; - else { + if (quit.load(std::memory_order_relaxed)) + break; + } else { if (has_indexed) { FreeUnusedMemory(); has_indexed = false; } - main_waiter->Wait(on_indexed, on_request); + main_waiter->Wait(quit, on_indexed, on_request); } } + + Quit(manager); } void Standalone(const std::string &root) { @@ -578,6 +617,7 @@ void Standalone(const std::string &root) { } if (tty) puts(""); + Quit(manager); } void Index(const std::string &path, const std::vector &args, diff --git a/src/pipeline.hh b/src/pipeline.hh index 2083303a..b63c17f0 100644 --- a/src/pipeline.hh +++ b/src/pipeline.hh @@ -39,8 +39,12 @@ enum class IndexMode { }; namespace pipeline { +extern std::atomic quit; extern std::atomic loaded_ts, pending_index_requests; extern int64_t tick; + +void ThreadEnter(); +void ThreadLeave(); void Init(); void LaunchStdin(); void LaunchStdout(); diff --git a/src/platform.hh b/src/platform.hh index 6f1e7658..bc2ed2ce 100644 --- a/src/platform.hh +++ b/src/platform.hh @@ -7,6 +7,7 @@ #include #include +namespace ccls { std::string NormalizePath(const std::string &path); // Free any unused memory and return it to the system. @@ -19,3 +20,4 @@ std::string GetExternalCommandOutput(const std::vector &command, std::string_view input); void SpawnThread(void *(*fn)(void *), void *arg); +} // namespace ccls diff --git a/src/platform_posix.cc b/src/platform_posix.cc index 52e19fc8..c20099fe 100644 --- a/src/platform_posix.cc +++ b/src/platform_posix.cc @@ -30,8 +30,16 @@ #include #include +#include +#include +#include #include +namespace ccls { +namespace pipeline { +void ThreadEnter(); +} + std::string NormalizePath(const std::string &path) { llvm::SmallString<256> P(path); llvm::sys::path::remove_dots(P, true); @@ -101,14 +109,15 @@ void SpawnThread(void *(*fn)(void *), void *arg) { pthread_attr_t attr; struct rlimit rlim; size_t stack_size = 4 * 1024 * 1024; - if (getrlimit(RLIMIT_STACK, &rlim) == 0 && - rlim.rlim_cur != RLIM_INFINITY) + if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != RLIM_INFINITY) stack_size = rlim.rlim_cur; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setstacksize(&attr, stack_size); + pipeline::ThreadEnter(); pthread_create(&thd, &attr, fn, arg); pthread_attr_destroy(&attr); } +} // namespace ccls #endif diff --git a/src/platform_win.cc b/src/platform_win.cc index c892c2bd..d10858d5 100644 --- a/src/platform_win.cc +++ b/src/platform_win.cc @@ -18,6 +18,7 @@ #include #include +namespace ccls { std::string NormalizePath(const std::string &path) { DWORD retval = 0; TCHAR buffer[MAX_PATH] = TEXT(""); @@ -52,5 +53,6 @@ std::string GetExternalCommandOutput(const std::vector &command, void SpawnThread(void *(*fn)(void *), void *arg) { std::thread(fn, arg).detach(); } +} #endif diff --git a/src/sema_manager.cc b/src/sema_manager.cc index f22ed9ae..2b2c2c5e 100644 --- a/src/sema_manager.cc +++ b/src/sema_manager.cc @@ -6,6 +6,7 @@ #include "clang_tu.hh" #include "filesystem.hh" #include "log.hh" +#include "pipeline.hh" #include "platform.hh" #include @@ -392,6 +393,8 @@ void *PreambleMain(void *manager_) { set_thread_name("preamble"); while (true) { SemaManager::PreambleTask task = manager->preamble_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; bool created = false; std::shared_ptr session = @@ -413,6 +416,7 @@ void *PreambleMain(void *manager_) { manager->ScheduleDiag(task.path, debounce); } } + pipeline::ThreadLeave(); return nullptr; } @@ -421,6 +425,8 @@ void *CompletionMain(void *manager_) { set_thread_name("comp"); while (true) { std::unique_ptr task = manager->comp_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; // Drop older requests if we're not buffering. while (g_config->completion.dropOldRequests && @@ -429,6 +435,8 @@ void *CompletionMain(void *manager_) { task->Consumer.reset(); task->on_complete(nullptr); task = manager->comp_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; } std::shared_ptr session = manager->EnsureSession(task->path); @@ -469,6 +477,7 @@ void *CompletionMain(void *manager_) { task->on_complete(&Clang->getCodeCompletionConsumer()); } + pipeline::ThreadLeave(); return nullptr; } @@ -505,6 +514,8 @@ void *DiagnosticMain(void *manager_) { set_thread_name("diag"); while (true) { SemaManager::DiagTask task = manager->diag_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; int64_t wait = task.wait_until - chrono::duration_cast( chrono::high_resolution_clock::now().time_since_epoch()) @@ -616,6 +627,7 @@ void *DiagnosticMain(void *manager_) { } manager->on_diagnostic_(task.path, ls_diags); } + pipeline::ThreadLeave(); return nullptr; } @@ -693,4 +705,10 @@ void SemaManager::Clear() { std::lock_guard lock(mutex); sessions.Clear(); } + +void SemaManager::Quit() { + comp_tasks.PushBack(nullptr); + diag_tasks.PushBack({}); + preamble_tasks.PushBack({}); +} } // namespace ccls diff --git a/src/sema_manager.hh b/src/sema_manager.hh index 8a9f42a8..72dde8ed 100644 --- a/src/sema_manager.hh +++ b/src/sema_manager.hh @@ -136,7 +136,8 @@ struct SemaManager { void OnClose(const std::string &path); std::shared_ptr EnsureSession(const std::string &path, bool *created = nullptr); - void Clear(void); + void Clear(); + void Quit(); // Global state. Project *project_; diff --git a/src/threaded_queue.hh b/src/threaded_queue.hh index 5d07640c..e81638c9 100644 --- a/src/threaded_queue.hh +++ b/src/threaded_queue.hh @@ -54,10 +54,15 @@ struct MultiQueueWaiter { return false; } - template void Wait(BaseThreadQueue... queues) { + template + bool Wait(std::atomic &quit, BaseThreadQueue... queues) { MultiQueueLock l(queues...); - while (!HasState({queues...})) + while (!quit.load(std::memory_order_relaxed)) { + if (HasState({queues...})) + return false; cv.wait(l); + } + return true; } };