diff --git a/src/messages/initialize.cc b/src/messages/initialize.cc index e80b09d8..022da192 100644 --- a/src/messages/initialize.cc +++ b/src/messages/initialize.cc @@ -242,6 +242,7 @@ void *Indexer(void *arg_) { std::string name = "indexer" + std::to_string(idx); set_thread_name(name.c_str()); pipeline::Indexer_Main(h->manager, h->vfs, h->project, h->wfiles); + pipeline::ThreadLeave(); return nullptr; } } // namespace @@ -365,7 +366,6 @@ void MessageHandler::shutdown(EmptyParam &, ReplyOnce &reply) { } void MessageHandler::exit(EmptyParam &) { - // FIXME cancel threads - ::exit(0); + pipeline::quit.store(true, std::memory_order_relaxed); } } // namespace ccls diff --git a/src/pipeline.cc b/src/pipeline.cc index ab6fd2e1..1d84669e 100644 --- a/src/pipeline.cc +++ b/src/pipeline.cc @@ -77,6 +77,7 @@ void StandaloneInitialize(MessageHandler &, const std::string &root); namespace pipeline { +std::atomic quit; std::atomic loaded_ts = ATOMIC_VAR_INIT(0), pending_index_requests = ATOMIC_VAR_INIT(0); int64_t tick = 0; @@ -91,6 +92,10 @@ struct Index_Request { int64_t ts = tick++; }; +std::mutex thread_mtx; +std::condition_variable no_active_threads; +int active_threads; + MultiQueueWaiter *main_waiter; MultiQueueWaiter *indexer_waiter; MultiQueueWaiter *stdout_waiter; @@ -361,8 +366,31 @@ bool Indexer_Parse(SemaManager *completion, WorkingFiles *wfiles, return true; } +void Quit(SemaManager &manager) { + quit.store(true, std::memory_order_relaxed); + manager.Quit(); + + { std::lock_guard lock(index_request->mutex_); } + indexer_waiter->cv.notify_all(); + { std::lock_guard lock(for_stdout->mutex_); } + stdout_waiter->cv.notify_one(); + std::unique_lock lock(thread_mtx); + no_active_threads.wait(lock, [] { return !active_threads; }); +} + } // namespace +void ThreadEnter() { + std::lock_guard lock(thread_mtx); + active_threads++; +} + +void ThreadLeave() { + std::lock_guard lock(thread_mtx); + if (!--active_threads) + no_active_threads.notify_one(); +} + void Init() { main_waiter = new MultiQueueWaiter; on_request = new ThreadedQueue(main_waiter); @@ -380,7 +408,8 @@ void Indexer_Main(SemaManager *manager, VFS *vfs, Project *project, GroupMatch matcher(g_config->index.whitelist, g_config->index.blacklist); while (true) if (!Indexer_Parse(manager, wfiles, project, vfs, matcher)) - indexer_waiter->Wait(index_request); + if (indexer_waiter->Wait(quit, index_request)) + break; } void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) { @@ -419,6 +448,7 @@ void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) { } void LaunchStdin() { + ThreadEnter(); std::thread([]() { set_thread_name("stdin"); std::string str; @@ -458,23 +488,27 @@ void LaunchStdin() { JsonReader reader{document.get()}; if (!reader.m->HasMember("jsonrpc") || std::string((*reader.m)["jsonrpc"].GetString()) != "2.0") - return; + break; RequestId id; std::string method; ReflectMember(reader, "id", id); ReflectMember(reader, "method", method); + if (method.empty()) + continue; + bool should_exit = method == "exit"; on_request->PushBack( {id, std::move(method), std::move(message), std::move(document)}); - if (method == "exit") + if (should_exit) break; } - }) - .detach(); + ThreadLeave(); + }).detach(); } void LaunchStdout() { - std::thread([=]() { + ThreadEnter(); + std::thread([]() { set_thread_name("stdout"); while (true) { @@ -483,10 +517,11 @@ void LaunchStdout() { llvm::outs() << "Content-Length: " << s.size() << "\r\n\r\n" << s; llvm::outs().flush(); } - stdout_waiter->Wait(for_stdout); + if (stdout_waiter->Wait(quit, for_stdout)) + break; } - }) - .detach(); + ThreadLeave(); + }).detach(); } void MainLoop() { @@ -540,16 +575,20 @@ void MainLoop() { Main_OnIndexed(&db, &wfiles, &*update); } - if (did_work) + if (did_work) { has_indexed |= indexed; - else { + if (quit.load(std::memory_order_relaxed)) + break; + } else { if (has_indexed) { FreeUnusedMemory(); has_indexed = false; } - main_waiter->Wait(on_indexed, on_request); + main_waiter->Wait(quit, on_indexed, on_request); } } + + Quit(manager); } void Standalone(const std::string &root) { @@ -590,6 +629,7 @@ void Standalone(const std::string &root) { } if (tty) puts(""); + Quit(manager); } void Index(const std::string &path, const std::vector &args, diff --git a/src/pipeline.hh b/src/pipeline.hh index e70d9dd8..da6b44bc 100644 --- a/src/pipeline.hh +++ b/src/pipeline.hh @@ -36,8 +36,12 @@ enum class IndexMode { }; namespace pipeline { +extern std::atomic quit; extern std::atomic loaded_ts, pending_index_requests; extern int64_t tick; + +void ThreadEnter(); +void ThreadLeave(); void Init(); void LaunchStdin(); void LaunchStdout(); diff --git a/src/platform.hh b/src/platform.hh index 076860f5..12446263 100644 --- a/src/platform.hh +++ b/src/platform.hh @@ -19,6 +19,7 @@ limitations under the License. #include #include +namespace ccls { std::string NormalizePath(const std::string &path); // Free any unused memory and return it to the system. @@ -31,3 +32,4 @@ std::string GetExternalCommandOutput(const std::vector &command, std::string_view input); void SpawnThread(void *(*fn)(void *), void *arg); +} // namespace ccls diff --git a/src/platform_posix.cc b/src/platform_posix.cc index 9dd79726..35d8d271 100644 --- a/src/platform_posix.cc +++ b/src/platform_posix.cc @@ -42,8 +42,16 @@ limitations under the License. #include #include +#include +#include +#include #include +namespace ccls { +namespace pipeline { +void ThreadEnter(); +} + std::string NormalizePath(const std::string &path) { llvm::SmallString<256> P(path); llvm::sys::path::remove_dots(P, true); @@ -113,14 +121,15 @@ void SpawnThread(void *(*fn)(void *), void *arg) { pthread_attr_t attr; struct rlimit rlim; size_t stack_size = 4 * 1024 * 1024; - if (getrlimit(RLIMIT_STACK, &rlim) == 0 && - rlim.rlim_cur != RLIM_INFINITY) + if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != RLIM_INFINITY) stack_size = rlim.rlim_cur; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setstacksize(&attr, stack_size); + pipeline::ThreadEnter(); pthread_create(&thd, &attr, fn, arg); pthread_attr_destroy(&attr); } +} // namespace ccls #endif diff --git a/src/platform_win.cc b/src/platform_win.cc index 31ba74e9..a2ca5185 100644 --- a/src/platform_win.cc +++ b/src/platform_win.cc @@ -30,6 +30,7 @@ limitations under the License. #include #include +namespace ccls { std::string NormalizePath(const std::string &path) { DWORD retval = 0; TCHAR buffer[MAX_PATH] = TEXT(""); @@ -64,5 +65,6 @@ std::string GetExternalCommandOutput(const std::vector &command, void SpawnThread(void *(*fn)(void *), void *arg) { std::thread(fn, arg).detach(); } +} #endif diff --git a/src/sema_manager.cc b/src/sema_manager.cc index f5dee411..194e703d 100644 --- a/src/sema_manager.cc +++ b/src/sema_manager.cc @@ -18,6 +18,7 @@ limitations under the License. #include "clang_tu.hh" #include "filesystem.hh" #include "log.hh" +#include "pipeline.hh" #include "platform.hh" #include @@ -403,6 +404,8 @@ void *PreambleMain(void *manager_) { set_thread_name("preamble"); while (true) { SemaManager::PreambleTask task = manager->preamble_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; bool created = false; std::shared_ptr session = @@ -424,6 +427,7 @@ void *PreambleMain(void *manager_) { manager->ScheduleDiag(task.path, debounce); } } + pipeline::ThreadLeave(); return nullptr; } @@ -432,6 +436,8 @@ void *CompletionMain(void *manager_) { set_thread_name("comp"); while (true) { std::unique_ptr task = manager->comp_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; // Drop older requests if we're not buffering. while (g_config->completion.dropOldRequests && @@ -440,6 +446,8 @@ void *CompletionMain(void *manager_) { task->Consumer.reset(); task->on_complete(nullptr); task = manager->comp_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; } std::shared_ptr session = manager->EnsureSession(task->path); @@ -480,6 +488,7 @@ void *CompletionMain(void *manager_) { task->on_complete(&Clang->getCodeCompletionConsumer()); } + pipeline::ThreadLeave(); return nullptr; } @@ -516,6 +525,8 @@ void *DiagnosticMain(void *manager_) { set_thread_name("diag"); while (true) { SemaManager::DiagTask task = manager->diag_tasks.Dequeue(); + if (pipeline::quit.load(std::memory_order_relaxed)) + break; int64_t wait = task.wait_until - chrono::duration_cast( chrono::high_resolution_clock::now().time_since_epoch()) @@ -627,6 +638,7 @@ void *DiagnosticMain(void *manager_) { } manager->on_diagnostic_(task.path, ls_diags); } + pipeline::ThreadLeave(); return nullptr; } @@ -704,4 +716,10 @@ void SemaManager::Clear() { std::lock_guard lock(mutex); sessions.Clear(); } + +void SemaManager::Quit() { + comp_tasks.PushBack(nullptr); + diag_tasks.PushBack({}); + preamble_tasks.PushBack({}); +} } // namespace ccls diff --git a/src/sema_manager.hh b/src/sema_manager.hh index 632b2eaa..5d573349 100644 --- a/src/sema_manager.hh +++ b/src/sema_manager.hh @@ -148,7 +148,8 @@ struct SemaManager { void OnClose(const std::string &path); std::shared_ptr EnsureSession(const std::string &path, bool *created = nullptr); - void Clear(void); + void Clear(); + void Quit(); // Global state. Project *project_; diff --git a/src/threaded_queue.hh b/src/threaded_queue.hh index ced88be4..a2b5f48a 100644 --- a/src/threaded_queue.hh +++ b/src/threaded_queue.hh @@ -66,10 +66,15 @@ struct MultiQueueWaiter { return false; } - template void Wait(BaseThreadQueue... queues) { + template + bool Wait(std::atomic &quit, BaseThreadQueue... queues) { MultiQueueLock l(queues...); - while (!HasState({queues...})) + while (!quit.load(std::memory_order_relaxed)) { + if (HasState({queues...})) + return false; cv.wait(l); + } + return true; } };