From b08e59e8e125c3a0e0f3ab25a49eedc96281e4b7 Mon Sep 17 00:00:00 2001 From: Fangrui Song Date: Mon, 17 Sep 2018 18:03:59 -0700 Subject: [PATCH] Simplify pipeline and fix race --- src/config.cc | 1 - src/config.h | 1 - src/file_consumer.cc | 36 ++----- src/file_consumer.h | 8 +- src/indexer.cc | 4 +- src/match.cc | 17 +--- src/messages/ccls_reload.cc | 11 --- src/messages/initialize.cc | 1 - src/pipeline.cc | 189 +++++++++++++++++------------------- src/pipeline.hh | 1 + src/project.cc | 4 +- src/project.h | 2 - 12 files changed, 106 insertions(+), 169 deletions(-) diff --git a/src/config.cc b/src/config.cc index d4992bf5..9ea383f3 100644 --- a/src/config.cc +++ b/src/config.cc @@ -4,7 +4,6 @@ #include "config.h" Config *g_config; -thread_local int g_thread_id; namespace ccls { void DoPathMapping(std::string &arg) { diff --git a/src/config.h b/src/config.h index 1d96b488..b3bcde8c 100644 --- a/src/config.h +++ b/src/config.h @@ -261,7 +261,6 @@ MAKE_REFLECT_STRUCT(Config, compilationDatabaseCommand, index, largeFileSize, workspaceSymbol, xref); extern Config *g_config; -thread_local extern int g_thread_id; namespace ccls { void DoPathMapping(std::string &arg); diff --git a/src/file_consumer.cc b/src/file_consumer.cc index 265114cb..a7115ed2 100644 --- a/src/file_consumer.cc +++ b/src/file_consumer.cc @@ -57,43 +57,21 @@ VFS::State VFS::Get(const std::string &file) { auto it = state.find(file); if (it != state.end()) return it->second; - return {0, 0, 0}; + return {0, 0}; } -bool VFS::Mark(const std::string &file, int owner, int stage) { - std::lock_guard lock(mutex); - State &st = state[file]; - if (st.stage < stage) { - st.owner = owner; - st.stage = stage; - return true; - } else - return false; -} - -bool VFS::Stamp(const std::string &file, int64_t ts) { +bool VFS::Stamp(const std::string &file, int64_t ts, int64_t offset) { std::lock_guard lock(mutex); State &st = state[file]; if (st.timestamp < ts) { - st.timestamp = ts; + st.timestamp = ts + offset; return true; } else return false; } -void VFS::ResetLocked(const std::string &file) { - State &st = state[file]; - if (st.owner == 0 || st.owner == g_thread_id) - st.stage = 0; -} - -void VFS::Reset(const std::string &file) { - std::lock_guard lock(mutex); - ResetLocked(file); -} - FileConsumer::FileConsumer(VFS *vfs, const std::string &parse_file) - : vfs_(vfs), parse_file_(parse_file), thread_id_(g_thread_id) {} + : vfs_(vfs), parse_file_(parse_file) {} IndexFile *FileConsumer::TryConsumeFile( const clang::FileEntry &File, @@ -104,9 +82,9 @@ IndexFile *FileConsumer::TryConsumeFile( return it->second.get(); std::string file_name = FileName(File); - // We did not take the file from global. Cache that we failed so we don't try - // again and return nullptr. - if (!vfs_->Mark(file_name, thread_id_, 2)) { + int64_t tim = File.getModificationTime(); + assert(tim); + if (!vfs_->Stamp(file_name, tim, 0)) { local_[UniqueID] = nullptr; return nullptr; } diff --git a/src/file_consumer.h b/src/file_consumer.h index 4e8f85bd..fe92f3a9 100644 --- a/src/file_consumer.h +++ b/src/file_consumer.h @@ -31,18 +31,13 @@ struct FileContents { struct VFS { struct State { int64_t timestamp; - int owner; - int stage; bool loaded = false; }; mutable std::unordered_map state; mutable std::mutex mutex; State Get(const std::string &file); - bool Mark(const std::string &file, int owner, int stage); - bool Stamp(const std::string &file, int64_t ts); - void ResetLocked(const std::string &file); - void Reset(const std::string &file); + bool Stamp(const std::string &file, int64_t ts, int64_t offset); }; namespace std { @@ -83,5 +78,4 @@ private: local_; VFS *vfs_; std::string parse_file_; - int thread_id_; }; diff --git a/src/indexer.cc b/src/indexer.cc index ae026038..d77be6f7 100644 --- a/src/indexer.cc +++ b/src/indexer.cc @@ -1238,9 +1238,10 @@ Index(CompletionManager *completion, WorkingFiles *wfiles, VFS *vfs, CI->getLangOpts()->RetainCommentsFromSystemHeaders = true; std::string buf = wfiles->GetContent(file); std::vector> Bufs; - if (g_config->index.onChange && buf.size()) { + if (buf.size()) { // If there is a completion session, reuse its preamble if exists. bool done_remap = false; +#if 0 std::shared_ptr session = completion->TryGetSession(file, false, false); if (session) @@ -1253,6 +1254,7 @@ Index(CompletionManager *completion, WorkingFiles *wfiles, VFS *vfs, done_remap = true; } } +#endif for (auto &[filename, content] : remapped) { if (filename == file && done_remap) continue; diff --git a/src/match.cc b/src/match.cc index 1e4cd4df..83003e50 100644 --- a/src/match.cc +++ b/src/match.cc @@ -7,18 +7,7 @@ #include "pipeline.hh" using namespace ccls; -// static std::optional Matcher::Create(const std::string &search) { - /* - std::string real_search; - real_search.reserve(search.size() * 3 + 2); - for (auto c : search) { - real_search += ".*"; - real_search += c; - } - real_search += ".*"; - */ - try { Matcher m; m.regex_string = search; @@ -61,18 +50,16 @@ GroupMatch::GroupMatch(const std::vector &whitelist, bool GroupMatch::IsMatch(const std::string &value, std::string *match_failure_reason) const { - for (const Matcher &m : whitelist) { + for (const Matcher &m : whitelist) if (m.IsMatch(value)) return true; - } - for (const Matcher &m : blacklist) { + for (const Matcher &m : blacklist) if (m.IsMatch(value)) { if (match_failure_reason) *match_failure_reason = "blacklist \"" + m.regex_string + "\""; return false; } - } return true; } diff --git a/src/messages/ccls_reload.cc b/src/messages/ccls_reload.cc index 4d441d5e..cea27bee 100644 --- a/src/messages/ccls_reload.cc +++ b/src/messages/ccls_reload.cc @@ -70,17 +70,6 @@ struct Handler_CclsReload : BaseMessageHandler { q.pop(); need_index.insert(file->def->path); - std::optional write_time = - pipeline::LastWriteTime(file->def->path); - if (!write_time) - continue; - { - std::lock_guard lock(vfs->mutex); - VFS::State &st = vfs->state[file->def->path]; - if (st.timestamp < write_time) - st.stage = 0; - } - if (request->params.dependencies) for (const std::string &path : graph[file->def->path]) { auto it = path_to_file.find(path); diff --git a/src/messages/initialize.cc b/src/messages/initialize.cc index d80960c0..284d47f3 100644 --- a/src/messages/initialize.cc +++ b/src/messages/initialize.cc @@ -490,7 +490,6 @@ struct Handler_Initialize : BaseMessageHandler { LOG_S(INFO) << "start " << g_config->index.threads << " indexers"; for (int i = 0; i < g_config->index.threads; i++) { std::thread([=]() { - g_thread_id = i + 1; std::string name = "indexer" + std::to_string(i); set_thread_name(name.c_str()); pipeline::Indexer_Main(clang_complete, vfs, project, working_files); diff --git a/src/pipeline.cc b/src/pipeline.cc index 54179b31..8d65886d 100644 --- a/src/pipeline.cc +++ b/src/pipeline.cc @@ -14,7 +14,6 @@ #include "project.h" #include "query_utils.h" -#include #include #include using namespace llvm; @@ -63,6 +62,9 @@ void DiagnosticsPublisher::Publish(WorkingFiles *working_files, } namespace ccls::pipeline { + +int64_t loaded_ts = 0, tick = 0; + namespace { struct Index_Request { @@ -70,6 +72,7 @@ struct Index_Request { std::vector args; IndexMode mode; lsRequestId id; + int64_t ts = tick++; }; struct Stdout_Request { @@ -160,6 +163,8 @@ std::unique_ptr RawCacheLoad(const std::string &path) { bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles, Project *project, VFS *vfs, const GroupMatch &matcher) { + const int N_MUTEXES = 256; + static std::mutex mutexes[N_MUTEXES]; std::optional opt_request = index_request->TryPopFront(); if (!opt_request) return false; @@ -174,8 +179,8 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles, return false; } - if (std::string reason; !matcher.IsMatch(request.path, &reason)) { - LOG_IF_S(INFO, loud) << "skip " << request.path << " for " << reason; + if (!matcher.IsMatch(request.path)) { + LOG_IF_S(INFO, loud) << "skip " << request.path; return false; } @@ -188,74 +193,75 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles, std::optional write_time = LastWriteTime(path_to_index); if (!write_time) return true; - int reparse = vfs->Stamp(path_to_index, *write_time); + int reparse = vfs->Stamp(path_to_index, *write_time, -1); if (request.path != path_to_index) { std::optional mtime1 = LastWriteTime(request.path); if (!mtime1) return true; - if (vfs->Stamp(request.path, *mtime1)) - reparse = 2; + if (vfs->Stamp(request.path, *mtime1, -1)) + reparse = 1; } if (g_config->index.onChange) reparse = 2; - if (!vfs->Mark(path_to_index, g_thread_id, 1) && !reparse) + if (!reparse) return true; - prev = RawCacheLoad(path_to_index); - if (!prev) - reparse = 2; - else { - if (CacheInvalid(vfs, prev.get(), path_to_index, entry.args, std::nullopt)) - reparse = 2; - int reparseForDep = g_config->index.reparseForDependency; - if (reparseForDep > 1 || (reparseForDep == 1 && !Project::loaded)) - for (const auto &dep : prev->dependencies) { - if (auto write_time1 = LastWriteTime(dep.first().str())) { - if (dep.second < *write_time1) { - reparse = 2; - std::lock_guard lock(vfs->mutex); - vfs->state[dep.first().str()].stage = 0; - } - } else - reparse = 2; + if (reparse < 2) do { + std::unique_lock lock( + mutexes[std::hash()(path_to_index) % N_MUTEXES]); + prev = RawCacheLoad(path_to_index); + if (!prev || CacheInvalid(vfs, prev.get(), path_to_index, entry.args, + std::nullopt)) + break; + bool update = false; + for (const auto &dep : prev->dependencies) + if (auto mtime1 = LastWriteTime(dep.first.val().str())) { + if (dep.second < *mtime1) + update = true; + } else { + update = true; } - } + int forDep = g_config->index.reparseForDependency; + if (update && (forDep > 1 || (forDep == 1 && request.ts < loaded_ts))) + break; - // Grab the ownership - if (reparse) { - std::lock_guard lock(vfs->mutex); - vfs->state[path_to_index].owner = g_thread_id; - vfs->state[path_to_index].stage = 0; - } - - if (reparse < 2) { - LOG_S(INFO) << "load cache for " << path_to_index; - auto dependencies = prev->dependencies; - if (reparse) { - IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get()); - on_indexed->PushBack(std::move(update), - request.mode != IndexMode::NonInteractive); - std::lock_guard lock(vfs->mutex); - vfs->state[path_to_index].loaded = true; - } - for (const auto &dep : dependencies) { - std::string path = dep.first().str(); - if (vfs->Mark(path, 0, 2) && (prev = RawCacheLoad(path))) { + if (reparse < 2) { + LOG_S(INFO) << "load cache for " << path_to_index; + auto dependencies = prev->dependencies; + if (reparse) { IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get()); on_indexed->PushBack(std::move(update), request.mode != IndexMode::NonInteractive); + std::lock_guard lock1(vfs->mutex); + vfs->state[path_to_index].loaded = true; + } + lock.unlock(); + for (const auto &dep : dependencies) { + std::string path = dep.first.val().str(); + std::lock_guard lock1( + mutexes[std::hash()(path) % N_MUTEXES]); + prev = RawCacheLoad(path); + if (!prev) + continue; { - std::lock_guard lock(vfs->mutex); - vfs->state[path].loaded = true; + std::lock_guard lock2(vfs->mutex); + VFS::State &st = vfs->state[path]; + if (st.loaded) + continue; + st.loaded = true; + st.timestamp = prev->mtime; } + IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get()); + on_indexed->PushBack(std::move(update), + request.mode != IndexMode::NonInteractive); if (entry.id >= 0) { - std::lock_guard lock(project->mutex_); + std::lock_guard lock2(project->mutex_); project->path_to_entry_index[path] = entry.id; } } + return true; } - return true; - } + } while (0); LOG_IF_S(INFO, loud) << "parse " << path_to_index; @@ -276,64 +282,52 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles, out.error.message = "Failed to index " + path_to_index; pipeline::WriteStdout(kMethodType_Unknown, out); } - vfs->Reset(path_to_index); return true; } for (std::unique_ptr &curr : indexes) { std::string path = curr->path; - bool do_update = path == path_to_index || path == request.path, loaded; - { - std::lock_guard lock(vfs->mutex); - VFS::State &st = vfs->state[path]; - if (st.timestamp < curr->mtime) { - st.timestamp = curr->mtime; - do_update = true; - } - loaded = st.loaded; - st.loaded = true; - } - if (std::string reason; !matcher.IsMatch(path, &reason)) { - LOG_IF_S(INFO, loud) << "skip emitting and storing index of " << path << " for " - << reason; - do_update = false; - } - if (!do_update) { - vfs->Reset(path); + if (!matcher.IsMatch(path)) { + LOG_IF_S(INFO, loud) << "skip index for " << path; continue; } - prev.reset(); - if (loaded) - prev = RawCacheLoad(path); - - // Store current index. LOG_IF_S(INFO, loud) << "store index for " << path << " (delta: " << !!prev << ")"; - if (g_config->cacheDirectory.empty()) { - std::lock_guard lock(g_index_mutex); - auto it = g_index.insert_or_assign( - path, InMemoryIndexFile{curr->file_contents, *curr}); - std::string().swap(it.first->second.index.file_contents); - } else { - std::string cache_path = GetCachePath(path); - WriteToFile(cache_path, curr->file_contents); - WriteToFile(AppendSerializationFormat(cache_path), - Serialize(g_config->cacheFormat, *curr)); + { + std::lock_guard lock(mutexes[std::hash()(path) % N_MUTEXES]); + bool loaded; + { + std::lock_guard lock1(vfs->mutex); + loaded = vfs->state[path].loaded; + } + if (loaded) + prev = RawCacheLoad(path); + else + prev.reset(); + if (g_config->cacheDirectory.empty()) { + std::lock_guard lock(g_index_mutex); + auto it = g_index.insert_or_assign( + path, InMemoryIndexFile{curr->file_contents, *curr}); + std::string().swap(it.first->second.index.file_contents); + } else { + std::string cache_path = GetCachePath(path); + WriteToFile(cache_path, curr->file_contents); + WriteToFile(AppendSerializationFormat(cache_path), + Serialize(g_config->cacheFormat, *curr)); + } + on_indexed->PushBack(IndexUpdate::CreateDelta(prev.get(), curr.get()), + request.mode != IndexMode::NonInteractive); + { + std::lock_guard lock1(vfs->mutex); + vfs->state[path].loaded = true; + } + if (entry.id >= 0) { + std::lock_guard lock(project->mutex_); + for (auto &dep : curr->dependencies) + project->path_to_entry_index[dep.first()] = entry.id; + } } - - vfs->Reset(path); - if (entry.id >= 0) { - std::lock_guard lock(project->mutex_); - for (auto &dep : curr->dependencies) - project->path_to_entry_index[dep.first()] = entry.id; - } - - // Build delta update. - IndexUpdate update = IndexUpdate::CreateDelta(prev.get(), curr.get()); - - on_indexed->PushBack(std::move(update), - request.mode != IndexMode::NonInteractive); } return true; @@ -364,7 +358,6 @@ void Indexer_Main(CompletionManager *completion, VFS *vfs, Project *project, void Main_OnIndexed(DB *db, SemanticHighlightSymbolCache *semantic_cache, WorkingFiles *working_files, IndexUpdate *update) { if (update->refresh) { - Project::loaded = true; LOG_S(INFO) << "loaded project. Refresh semantic highlight for all working file."; std::lock_guard lock(working_files->files_mutex); diff --git a/src/pipeline.hh b/src/pipeline.hh index a22c059a..7ff74533 100644 --- a/src/pipeline.hh +++ b/src/pipeline.hh @@ -38,6 +38,7 @@ enum class IndexMode { }; namespace pipeline { +extern int64_t loaded_ts, tick; void Init(); void LaunchStdin(); void LaunchStdout(); diff --git a/src/project.cc b/src/project.cc index 895ab92e..dac67002 100644 --- a/src/project.cc +++ b/src/project.cc @@ -339,10 +339,7 @@ int ComputeGuessScore(std::string_view a, std::string_view b) { } // namespace -bool Project::loaded = false; - void Project::Load(const std::string &root_directory) { - Project::loaded = false; ProjectConfig project; project.extra_flags = g_config->clang.extraArgs; project.project_dir = root_directory; @@ -458,6 +455,7 @@ void Project::Index(WorkingFiles *wfiles, lsRequestId id) { interactive ? IndexMode::Normal : IndexMode::NonInteractive, id); }); + pipeline::loaded_ts = pipeline::tick; // Dummy request to indicate that project is loaded and // trigger refreshing semantic highlight for all working files. pipeline::Index("", {}, IndexMode::NonInteractive); diff --git a/src/project.h b/src/project.h index 6320a171..33098164 100644 --- a/src/project.h +++ b/src/project.h @@ -60,6 +60,4 @@ struct Project { ForAllFilteredFiles(std::function action); void Index(WorkingFiles *wfiles, lsRequestId id); - - static bool loaded; };