Rendezvous after receiving "exit" notification (#159)

This commit is contained in:
Fangrui Song 2018-12-13 20:13:35 -08:00
parent 6945a56fb8
commit df7221affc
9 changed files with 100 additions and 19 deletions

View File

@ -242,6 +242,7 @@ void *Indexer(void *arg_) {
std::string name = "indexer" + std::to_string(idx); std::string name = "indexer" + std::to_string(idx);
set_thread_name(name.c_str()); set_thread_name(name.c_str());
pipeline::Indexer_Main(h->manager, h->vfs, h->project, h->wfiles); pipeline::Indexer_Main(h->manager, h->vfs, h->project, h->wfiles);
pipeline::ThreadLeave();
return nullptr; return nullptr;
} }
} // namespace } // namespace
@ -365,7 +366,6 @@ void MessageHandler::shutdown(EmptyParam &, ReplyOnce &reply) {
} }
void MessageHandler::exit(EmptyParam &) { void MessageHandler::exit(EmptyParam &) {
// FIXME cancel threads pipeline::quit.store(true, std::memory_order_relaxed);
::exit(0);
} }
} // namespace ccls } // namespace ccls

View File

@ -77,6 +77,7 @@ void StandaloneInitialize(MessageHandler &, const std::string &root);
namespace pipeline { namespace pipeline {
std::atomic<bool> quit;
std::atomic<int64_t> loaded_ts = ATOMIC_VAR_INIT(0), std::atomic<int64_t> loaded_ts = ATOMIC_VAR_INIT(0),
pending_index_requests = ATOMIC_VAR_INIT(0); pending_index_requests = ATOMIC_VAR_INIT(0);
int64_t tick = 0; int64_t tick = 0;
@ -91,6 +92,10 @@ struct Index_Request {
int64_t ts = tick++; int64_t ts = tick++;
}; };
std::mutex thread_mtx;
std::condition_variable no_active_threads;
int active_threads;
MultiQueueWaiter *main_waiter; MultiQueueWaiter *main_waiter;
MultiQueueWaiter *indexer_waiter; MultiQueueWaiter *indexer_waiter;
MultiQueueWaiter *stdout_waiter; MultiQueueWaiter *stdout_waiter;
@ -361,8 +366,31 @@ bool Indexer_Parse(SemaManager *completion, WorkingFiles *wfiles,
return true; return true;
} }
void Quit(SemaManager &manager) {
quit.store(true, std::memory_order_relaxed);
manager.Quit();
{ std::lock_guard lock(index_request->mutex_); }
indexer_waiter->cv.notify_all();
{ std::lock_guard lock(for_stdout->mutex_); }
stdout_waiter->cv.notify_one();
std::unique_lock lock(thread_mtx);
no_active_threads.wait(lock, [] { return !active_threads; });
}
} // namespace } // namespace
void ThreadEnter() {
std::lock_guard lock(thread_mtx);
active_threads++;
}
void ThreadLeave() {
std::lock_guard lock(thread_mtx);
if (!--active_threads)
no_active_threads.notify_one();
}
void Init() { void Init() {
main_waiter = new MultiQueueWaiter; main_waiter = new MultiQueueWaiter;
on_request = new ThreadedQueue<InMessage>(main_waiter); on_request = new ThreadedQueue<InMessage>(main_waiter);
@ -380,7 +408,8 @@ void Indexer_Main(SemaManager *manager, VFS *vfs, Project *project,
GroupMatch matcher(g_config->index.whitelist, g_config->index.blacklist); GroupMatch matcher(g_config->index.whitelist, g_config->index.blacklist);
while (true) while (true)
if (!Indexer_Parse(manager, wfiles, project, vfs, matcher)) if (!Indexer_Parse(manager, wfiles, project, vfs, matcher))
indexer_waiter->Wait(index_request); if (indexer_waiter->Wait(quit, index_request))
break;
} }
void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) { void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) {
@ -419,6 +448,7 @@ void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) {
} }
void LaunchStdin() { void LaunchStdin() {
ThreadEnter();
std::thread([]() { std::thread([]() {
set_thread_name("stdin"); set_thread_name("stdin");
std::string str; std::string str;
@ -458,23 +488,27 @@ void LaunchStdin() {
JsonReader reader{document.get()}; JsonReader reader{document.get()};
if (!reader.m->HasMember("jsonrpc") || if (!reader.m->HasMember("jsonrpc") ||
std::string((*reader.m)["jsonrpc"].GetString()) != "2.0") std::string((*reader.m)["jsonrpc"].GetString()) != "2.0")
return; break;
RequestId id; RequestId id;
std::string method; std::string method;
ReflectMember(reader, "id", id); ReflectMember(reader, "id", id);
ReflectMember(reader, "method", method); ReflectMember(reader, "method", method);
if (method.empty())
continue;
bool should_exit = method == "exit";
on_request->PushBack( on_request->PushBack(
{id, std::move(method), std::move(message), std::move(document)}); {id, std::move(method), std::move(message), std::move(document)});
if (method == "exit") if (should_exit)
break; break;
} }
}) ThreadLeave();
.detach(); }).detach();
} }
void LaunchStdout() { void LaunchStdout() {
std::thread([=]() { ThreadEnter();
std::thread([]() {
set_thread_name("stdout"); set_thread_name("stdout");
while (true) { while (true) {
@ -483,10 +517,11 @@ void LaunchStdout() {
llvm::outs() << "Content-Length: " << s.size() << "\r\n\r\n" << s; llvm::outs() << "Content-Length: " << s.size() << "\r\n\r\n" << s;
llvm::outs().flush(); llvm::outs().flush();
} }
stdout_waiter->Wait(for_stdout); if (stdout_waiter->Wait(quit, for_stdout))
break;
} }
}) ThreadLeave();
.detach(); }).detach();
} }
void MainLoop() { void MainLoop() {
@ -540,16 +575,20 @@ void MainLoop() {
Main_OnIndexed(&db, &wfiles, &*update); Main_OnIndexed(&db, &wfiles, &*update);
} }
if (did_work) if (did_work) {
has_indexed |= indexed; has_indexed |= indexed;
else { if (quit.load(std::memory_order_relaxed))
break;
} else {
if (has_indexed) { if (has_indexed) {
FreeUnusedMemory(); FreeUnusedMemory();
has_indexed = false; has_indexed = false;
} }
main_waiter->Wait(on_indexed, on_request); main_waiter->Wait(quit, on_indexed, on_request);
} }
} }
Quit(manager);
} }
void Standalone(const std::string &root) { void Standalone(const std::string &root) {
@ -590,6 +629,7 @@ void Standalone(const std::string &root) {
} }
if (tty) if (tty)
puts(""); puts("");
Quit(manager);
} }
void Index(const std::string &path, const std::vector<const char *> &args, void Index(const std::string &path, const std::vector<const char *> &args,

View File

@ -36,8 +36,12 @@ enum class IndexMode {
}; };
namespace pipeline { namespace pipeline {
extern std::atomic<bool> quit;
extern std::atomic<int64_t> loaded_ts, pending_index_requests; extern std::atomic<int64_t> loaded_ts, pending_index_requests;
extern int64_t tick; extern int64_t tick;
void ThreadEnter();
void ThreadLeave();
void Init(); void Init();
void LaunchStdin(); void LaunchStdin();
void LaunchStdout(); void LaunchStdout();

View File

@ -19,6 +19,7 @@ limitations under the License.
#include <string_view> #include <string_view>
#include <vector> #include <vector>
namespace ccls {
std::string NormalizePath(const std::string &path); std::string NormalizePath(const std::string &path);
// Free any unused memory and return it to the system. // Free any unused memory and return it to the system.
@ -31,3 +32,4 @@ std::string GetExternalCommandOutput(const std::vector<std::string> &command,
std::string_view input); std::string_view input);
void SpawnThread(void *(*fn)(void *), void *arg); void SpawnThread(void *(*fn)(void *), void *arg);
} // namespace ccls

View File

@ -42,8 +42,16 @@ limitations under the License.
#include <llvm/ADT/SmallString.h> #include <llvm/ADT/SmallString.h>
#include <llvm/Support/Path.h> #include <llvm/Support/Path.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <string> #include <string>
namespace ccls {
namespace pipeline {
void ThreadEnter();
}
std::string NormalizePath(const std::string &path) { std::string NormalizePath(const std::string &path) {
llvm::SmallString<256> P(path); llvm::SmallString<256> P(path);
llvm::sys::path::remove_dots(P, true); llvm::sys::path::remove_dots(P, true);
@ -113,14 +121,15 @@ void SpawnThread(void *(*fn)(void *), void *arg) {
pthread_attr_t attr; pthread_attr_t attr;
struct rlimit rlim; struct rlimit rlim;
size_t stack_size = 4 * 1024 * 1024; size_t stack_size = 4 * 1024 * 1024;
if (getrlimit(RLIMIT_STACK, &rlim) == 0 && if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != RLIM_INFINITY)
rlim.rlim_cur != RLIM_INFINITY)
stack_size = rlim.rlim_cur; stack_size = rlim.rlim_cur;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setstacksize(&attr, stack_size); pthread_attr_setstacksize(&attr, stack_size);
pipeline::ThreadEnter();
pthread_create(&thd, &attr, fn, arg); pthread_create(&thd, &attr, fn, arg);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
} }
} // namespace ccls
#endif #endif

View File

@ -30,6 +30,7 @@ limitations under the License.
#include <string> #include <string>
#include <thread> #include <thread>
namespace ccls {
std::string NormalizePath(const std::string &path) { std::string NormalizePath(const std::string &path) {
DWORD retval = 0; DWORD retval = 0;
TCHAR buffer[MAX_PATH] = TEXT(""); TCHAR buffer[MAX_PATH] = TEXT("");
@ -64,5 +65,6 @@ std::string GetExternalCommandOutput(const std::vector<std::string> &command,
void SpawnThread(void *(*fn)(void *), void *arg) { void SpawnThread(void *(*fn)(void *), void *arg) {
std::thread(fn, arg).detach(); std::thread(fn, arg).detach();
} }
}
#endif #endif

View File

@ -18,6 +18,7 @@ limitations under the License.
#include "clang_tu.hh" #include "clang_tu.hh"
#include "filesystem.hh" #include "filesystem.hh"
#include "log.hh" #include "log.hh"
#include "pipeline.hh"
#include "platform.hh" #include "platform.hh"
#include <clang/Lex/PreprocessorOptions.h> #include <clang/Lex/PreprocessorOptions.h>
@ -403,6 +404,8 @@ void *PreambleMain(void *manager_) {
set_thread_name("preamble"); set_thread_name("preamble");
while (true) { while (true) {
SemaManager::PreambleTask task = manager->preamble_tasks.Dequeue(); SemaManager::PreambleTask task = manager->preamble_tasks.Dequeue();
if (pipeline::quit.load(std::memory_order_relaxed))
break;
bool created = false; bool created = false;
std::shared_ptr<Session> session = std::shared_ptr<Session> session =
@ -424,6 +427,7 @@ void *PreambleMain(void *manager_) {
manager->ScheduleDiag(task.path, debounce); manager->ScheduleDiag(task.path, debounce);
} }
} }
pipeline::ThreadLeave();
return nullptr; return nullptr;
} }
@ -432,6 +436,8 @@ void *CompletionMain(void *manager_) {
set_thread_name("comp"); set_thread_name("comp");
while (true) { while (true) {
std::unique_ptr<SemaManager::CompTask> task = manager->comp_tasks.Dequeue(); std::unique_ptr<SemaManager::CompTask> task = manager->comp_tasks.Dequeue();
if (pipeline::quit.load(std::memory_order_relaxed))
break;
// Drop older requests if we're not buffering. // Drop older requests if we're not buffering.
while (g_config->completion.dropOldRequests && while (g_config->completion.dropOldRequests &&
@ -440,6 +446,8 @@ void *CompletionMain(void *manager_) {
task->Consumer.reset(); task->Consumer.reset();
task->on_complete(nullptr); task->on_complete(nullptr);
task = manager->comp_tasks.Dequeue(); task = manager->comp_tasks.Dequeue();
if (pipeline::quit.load(std::memory_order_relaxed))
break;
} }
std::shared_ptr<Session> session = manager->EnsureSession(task->path); std::shared_ptr<Session> session = manager->EnsureSession(task->path);
@ -480,6 +488,7 @@ void *CompletionMain(void *manager_) {
task->on_complete(&Clang->getCodeCompletionConsumer()); task->on_complete(&Clang->getCodeCompletionConsumer());
} }
pipeline::ThreadLeave();
return nullptr; return nullptr;
} }
@ -516,6 +525,8 @@ void *DiagnosticMain(void *manager_) {
set_thread_name("diag"); set_thread_name("diag");
while (true) { while (true) {
SemaManager::DiagTask task = manager->diag_tasks.Dequeue(); SemaManager::DiagTask task = manager->diag_tasks.Dequeue();
if (pipeline::quit.load(std::memory_order_relaxed))
break;
int64_t wait = task.wait_until - int64_t wait = task.wait_until -
chrono::duration_cast<chrono::milliseconds>( chrono::duration_cast<chrono::milliseconds>(
chrono::high_resolution_clock::now().time_since_epoch()) chrono::high_resolution_clock::now().time_since_epoch())
@ -627,6 +638,7 @@ void *DiagnosticMain(void *manager_) {
} }
manager->on_diagnostic_(task.path, ls_diags); manager->on_diagnostic_(task.path, ls_diags);
} }
pipeline::ThreadLeave();
return nullptr; return nullptr;
} }
@ -704,4 +716,10 @@ void SemaManager::Clear() {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
sessions.Clear(); sessions.Clear();
} }
void SemaManager::Quit() {
comp_tasks.PushBack(nullptr);
diag_tasks.PushBack({});
preamble_tasks.PushBack({});
}
} // namespace ccls } // namespace ccls

View File

@ -148,7 +148,8 @@ struct SemaManager {
void OnClose(const std::string &path); void OnClose(const std::string &path);
std::shared_ptr<ccls::Session> EnsureSession(const std::string &path, std::shared_ptr<ccls::Session> EnsureSession(const std::string &path,
bool *created = nullptr); bool *created = nullptr);
void Clear(void); void Clear();
void Quit();
// Global state. // Global state.
Project *project_; Project *project_;

View File

@ -66,11 +66,16 @@ struct MultiQueueWaiter {
return false; return false;
} }
template <typename... BaseThreadQueue> void Wait(BaseThreadQueue... queues) { template <typename... BaseThreadQueue>
bool Wait(std::atomic<bool> &quit, BaseThreadQueue... queues) {
MultiQueueLock<BaseThreadQueue...> l(queues...); MultiQueueLock<BaseThreadQueue...> l(queues...);
while (!HasState({queues...})) while (!quit.load(std::memory_order_relaxed)) {
if (HasState({queues...}))
return false;
cv.wait(l); cv.wait(l);
} }
return true;
}
}; };
// A threadsafe-queue. http://stackoverflow.com/a/16075550 // A threadsafe-queue. http://stackoverflow.com/a/16075550