Simplify pipeline and fix race

This commit is contained in:
Fangrui Song 2018-09-17 18:03:59 -07:00
parent 14b73f0d6f
commit 763106c3d4
12 changed files with 106 additions and 169 deletions

View File

@ -16,7 +16,6 @@ limitations under the License.
#include "config.h" #include "config.h"
Config *g_config; Config *g_config;
thread_local int g_thread_id;
namespace ccls { namespace ccls {
void DoPathMapping(std::string &arg) { void DoPathMapping(std::string &arg) {

View File

@ -273,7 +273,6 @@ MAKE_REFLECT_STRUCT(Config, compilationDatabaseCommand,
index, largeFileSize, workspaceSymbol, xref); index, largeFileSize, workspaceSymbol, xref);
extern Config *g_config; extern Config *g_config;
thread_local extern int g_thread_id;
namespace ccls { namespace ccls {
void DoPathMapping(std::string &arg); void DoPathMapping(std::string &arg);

View File

@ -69,43 +69,21 @@ VFS::State VFS::Get(const std::string &file) {
auto it = state.find(file); auto it = state.find(file);
if (it != state.end()) if (it != state.end())
return it->second; return it->second;
return {0, 0, 0}; return {0, 0};
} }
bool VFS::Mark(const std::string &file, int owner, int stage) { bool VFS::Stamp(const std::string &file, int64_t ts, int64_t offset) {
std::lock_guard<std::mutex> lock(mutex);
State &st = state[file];
if (st.stage < stage) {
st.owner = owner;
st.stage = stage;
return true;
} else
return false;
}
bool VFS::Stamp(const std::string &file, int64_t ts) {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
State &st = state[file]; State &st = state[file];
if (st.timestamp < ts) { if (st.timestamp < ts) {
st.timestamp = ts; st.timestamp = ts + offset;
return true; return true;
} else } else
return false; return false;
} }
void VFS::ResetLocked(const std::string &file) {
State &st = state[file];
if (st.owner == 0 || st.owner == g_thread_id)
st.stage = 0;
}
void VFS::Reset(const std::string &file) {
std::lock_guard<std::mutex> lock(mutex);
ResetLocked(file);
}
FileConsumer::FileConsumer(VFS *vfs, const std::string &parse_file) FileConsumer::FileConsumer(VFS *vfs, const std::string &parse_file)
: vfs_(vfs), parse_file_(parse_file), thread_id_(g_thread_id) {} : vfs_(vfs), parse_file_(parse_file) {}
IndexFile *FileConsumer::TryConsumeFile( IndexFile *FileConsumer::TryConsumeFile(
const clang::FileEntry &File, const clang::FileEntry &File,
@ -116,9 +94,9 @@ IndexFile *FileConsumer::TryConsumeFile(
return it->second.get(); return it->second.get();
std::string file_name = FileName(File); std::string file_name = FileName(File);
// We did not take the file from global. Cache that we failed so we don't try int64_t tim = File.getModificationTime();
// again and return nullptr. assert(tim);
if (!vfs_->Mark(file_name, thread_id_, 2)) { if (!vfs_->Stamp(file_name, tim, 0)) {
local_[UniqueID] = nullptr; local_[UniqueID] = nullptr;
return nullptr; return nullptr;
} }

View File

@ -43,18 +43,13 @@ struct FileContents {
struct VFS { struct VFS {
struct State { struct State {
int64_t timestamp; int64_t timestamp;
int owner;
int stage;
bool loaded = false; bool loaded = false;
}; };
mutable std::unordered_map<std::string, State> state; mutable std::unordered_map<std::string, State> state;
mutable std::mutex mutex; mutable std::mutex mutex;
State Get(const std::string &file); State Get(const std::string &file);
bool Mark(const std::string &file, int owner, int stage); bool Stamp(const std::string &file, int64_t ts, int64_t offset);
bool Stamp(const std::string &file, int64_t ts);
void ResetLocked(const std::string &file);
void Reset(const std::string &file);
}; };
namespace std { namespace std {
@ -95,5 +90,4 @@ private:
local_; local_;
VFS *vfs_; VFS *vfs_;
std::string parse_file_; std::string parse_file_;
int thread_id_;
}; };

View File

@ -1263,9 +1263,10 @@ Index(CompletionManager *completion, WorkingFiles *wfiles, VFS *vfs,
} }
std::string buf = wfiles->GetContent(file); std::string buf = wfiles->GetContent(file);
std::vector<std::unique_ptr<llvm::MemoryBuffer>> Bufs; std::vector<std::unique_ptr<llvm::MemoryBuffer>> Bufs;
if (g_config->index.onChange && buf.size()) { if (buf.size()) {
// If there is a completion session, reuse its preamble if exists. // If there is a completion session, reuse its preamble if exists.
bool done_remap = false; bool done_remap = false;
#if 0
std::shared_ptr<CompletionSession> session = std::shared_ptr<CompletionSession> session =
completion->TryGetSession(file, false, false); completion->TryGetSession(file, false, false);
if (session) if (session)
@ -1278,6 +1279,7 @@ Index(CompletionManager *completion, WorkingFiles *wfiles, VFS *vfs,
done_remap = true; done_remap = true;
} }
} }
#endif
for (auto &[filename, content] : remapped) { for (auto &[filename, content] : remapped) {
if (filename == file && done_remap) if (filename == file && done_remap)
continue; continue;

View File

@ -19,18 +19,7 @@ limitations under the License.
#include "pipeline.hh" #include "pipeline.hh"
using namespace ccls; using namespace ccls;
// static
std::optional<Matcher> Matcher::Create(const std::string &search) { std::optional<Matcher> Matcher::Create(const std::string &search) {
/*
std::string real_search;
real_search.reserve(search.size() * 3 + 2);
for (auto c : search) {
real_search += ".*";
real_search += c;
}
real_search += ".*";
*/
try { try {
Matcher m; Matcher m;
m.regex_string = search; m.regex_string = search;
@ -73,18 +62,16 @@ GroupMatch::GroupMatch(const std::vector<std::string> &whitelist,
bool GroupMatch::IsMatch(const std::string &value, bool GroupMatch::IsMatch(const std::string &value,
std::string *match_failure_reason) const { std::string *match_failure_reason) const {
for (const Matcher &m : whitelist) { for (const Matcher &m : whitelist)
if (m.IsMatch(value)) if (m.IsMatch(value))
return true; return true;
}
for (const Matcher &m : blacklist) { for (const Matcher &m : blacklist)
if (m.IsMatch(value)) { if (m.IsMatch(value)) {
if (match_failure_reason) if (match_failure_reason)
*match_failure_reason = "blacklist \"" + m.regex_string + "\""; *match_failure_reason = "blacklist \"" + m.regex_string + "\"";
return false; return false;
} }
}
return true; return true;
} }

View File

@ -82,17 +82,6 @@ struct Handler_CclsReload : BaseMessageHandler<In_CclsReload> {
q.pop(); q.pop();
need_index.insert(file->def->path); need_index.insert(file->def->path);
std::optional<int64_t> write_time =
pipeline::LastWriteTime(file->def->path);
if (!write_time)
continue;
{
std::lock_guard<std::mutex> lock(vfs->mutex);
VFS::State &st = vfs->state[file->def->path];
if (st.timestamp < write_time)
st.stage = 0;
}
if (request->params.dependencies) if (request->params.dependencies)
for (const std::string &path : graph[file->def->path]) { for (const std::string &path : graph[file->def->path]) {
auto it = path_to_file.find(path); auto it = path_to_file.find(path);

View File

@ -502,7 +502,6 @@ struct Handler_Initialize : BaseMessageHandler<In_InitializeRequest> {
LOG_S(INFO) << "start " << g_config->index.threads << " indexers"; LOG_S(INFO) << "start " << g_config->index.threads << " indexers";
for (int i = 0; i < g_config->index.threads; i++) { for (int i = 0; i < g_config->index.threads; i++) {
std::thread([=]() { std::thread([=]() {
g_thread_id = i + 1;
std::string name = "indexer" + std::to_string(i); std::string name = "indexer" + std::to_string(i);
set_thread_name(name.c_str()); set_thread_name(name.c_str());
pipeline::Indexer_Main(clang_complete, vfs, project, working_files); pipeline::Indexer_Main(clang_complete, vfs, project, working_files);

View File

@ -26,7 +26,6 @@ limitations under the License.
#include "project.h" #include "project.h"
#include "query_utils.h" #include "query_utils.h"
#include <llvm/ADT/Twine.h>
#include <llvm/Support/Threading.h> #include <llvm/Support/Threading.h>
#include <llvm/Support/Timer.h> #include <llvm/Support/Timer.h>
using namespace llvm; using namespace llvm;
@ -75,6 +74,9 @@ void DiagnosticsPublisher::Publish(WorkingFiles *working_files,
} }
namespace ccls::pipeline { namespace ccls::pipeline {
int64_t loaded_ts = 0, tick = 0;
namespace { namespace {
struct Index_Request { struct Index_Request {
@ -82,6 +84,7 @@ struct Index_Request {
std::vector<std::string> args; std::vector<std::string> args;
IndexMode mode; IndexMode mode;
lsRequestId id; lsRequestId id;
int64_t ts = tick++;
}; };
struct Stdout_Request { struct Stdout_Request {
@ -172,6 +175,8 @@ std::unique_ptr<IndexFile> RawCacheLoad(const std::string &path) {
bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles, bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
Project *project, VFS *vfs, const GroupMatch &matcher) { Project *project, VFS *vfs, const GroupMatch &matcher) {
const int N_MUTEXES = 256;
static std::mutex mutexes[N_MUTEXES];
std::optional<Index_Request> opt_request = index_request->TryPopFront(); std::optional<Index_Request> opt_request = index_request->TryPopFront();
if (!opt_request) if (!opt_request)
return false; return false;
@ -186,8 +191,8 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
return false; return false;
} }
if (std::string reason; !matcher.IsMatch(request.path, &reason)) { if (!matcher.IsMatch(request.path)) {
LOG_IF_S(INFO, loud) << "skip " << request.path << " for " << reason; LOG_IF_S(INFO, loud) << "skip " << request.path;
return false; return false;
} }
@ -200,74 +205,75 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
std::optional<int64_t> write_time = LastWriteTime(path_to_index); std::optional<int64_t> write_time = LastWriteTime(path_to_index);
if (!write_time) if (!write_time)
return true; return true;
int reparse = vfs->Stamp(path_to_index, *write_time); int reparse = vfs->Stamp(path_to_index, *write_time, -1);
if (request.path != path_to_index) { if (request.path != path_to_index) {
std::optional<int64_t> mtime1 = LastWriteTime(request.path); std::optional<int64_t> mtime1 = LastWriteTime(request.path);
if (!mtime1) if (!mtime1)
return true; return true;
if (vfs->Stamp(request.path, *mtime1)) if (vfs->Stamp(request.path, *mtime1, -1))
reparse = 2; reparse = 1;
} }
if (g_config->index.onChange) if (g_config->index.onChange)
reparse = 2; reparse = 2;
if (!vfs->Mark(path_to_index, g_thread_id, 1) && !reparse) if (!reparse)
return true; return true;
prev = RawCacheLoad(path_to_index); if (reparse < 2) do {
if (!prev) std::unique_lock lock(
reparse = 2; mutexes[std::hash<std::string>()(path_to_index) % N_MUTEXES]);
else { prev = RawCacheLoad(path_to_index);
if (CacheInvalid(vfs, prev.get(), path_to_index, entry.args, std::nullopt)) if (!prev || CacheInvalid(vfs, prev.get(), path_to_index, entry.args,
reparse = 2; std::nullopt))
int reparseForDep = g_config->index.reparseForDependency; break;
if (reparseForDep > 1 || (reparseForDep == 1 && !Project::loaded)) bool update = false;
for (const auto &dep : prev->dependencies) { for (const auto &dep : prev->dependencies)
if (auto write_time1 = LastWriteTime(dep.first().str())) { if (auto mtime1 = LastWriteTime(dep.first.val().str())) {
if (dep.second < *write_time1) { if (dep.second < *mtime1)
reparse = 2; update = true;
std::lock_guard<std::mutex> lock(vfs->mutex); } else {
vfs->state[dep.first().str()].stage = 0; update = true;
}
} else
reparse = 2;
} }
} int forDep = g_config->index.reparseForDependency;
if (update && (forDep > 1 || (forDep == 1 && request.ts < loaded_ts)))
break;
// Grab the ownership if (reparse < 2) {
if (reparse) { LOG_S(INFO) << "load cache for " << path_to_index;
std::lock_guard<std::mutex> lock(vfs->mutex); auto dependencies = prev->dependencies;
vfs->state[path_to_index].owner = g_thread_id; if (reparse) {
vfs->state[path_to_index].stage = 0;
}
if (reparse < 2) {
LOG_S(INFO) << "load cache for " << path_to_index;
auto dependencies = prev->dependencies;
if (reparse) {
IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get());
on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive);
std::lock_guard lock(vfs->mutex);
vfs->state[path_to_index].loaded = true;
}
for (const auto &dep : dependencies) {
std::string path = dep.first().str();
if (vfs->Mark(path, 0, 2) && (prev = RawCacheLoad(path))) {
IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get()); IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get());
on_indexed->PushBack(std::move(update), on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive); request.mode != IndexMode::NonInteractive);
std::lock_guard lock1(vfs->mutex);
vfs->state[path_to_index].loaded = true;
}
lock.unlock();
for (const auto &dep : dependencies) {
std::string path = dep.first.val().str();
std::lock_guard lock1(
mutexes[std::hash<std::string>()(path) % N_MUTEXES]);
prev = RawCacheLoad(path);
if (!prev)
continue;
{ {
std::lock_guard lock(vfs->mutex); std::lock_guard lock2(vfs->mutex);
vfs->state[path].loaded = true; VFS::State &st = vfs->state[path];
if (st.loaded)
continue;
st.loaded = true;
st.timestamp = prev->mtime;
} }
IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get());
on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive);
if (entry.id >= 0) { if (entry.id >= 0) {
std::lock_guard lock(project->mutex_); std::lock_guard lock2(project->mutex_);
project->path_to_entry_index[path] = entry.id; project->path_to_entry_index[path] = entry.id;
} }
} }
return true;
} }
return true; } while (0);
}
LOG_IF_S(INFO, loud) << "parse " << path_to_index; LOG_IF_S(INFO, loud) << "parse " << path_to_index;
@ -288,64 +294,52 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
out.error.message = "Failed to index " + path_to_index; out.error.message = "Failed to index " + path_to_index;
pipeline::WriteStdout(kMethodType_Unknown, out); pipeline::WriteStdout(kMethodType_Unknown, out);
} }
vfs->Reset(path_to_index);
return true; return true;
} }
for (std::unique_ptr<IndexFile> &curr : indexes) { for (std::unique_ptr<IndexFile> &curr : indexes) {
std::string path = curr->path; std::string path = curr->path;
bool do_update = path == path_to_index || path == request.path, loaded; if (!matcher.IsMatch(path)) {
{ LOG_IF_S(INFO, loud) << "skip index for " << path;
std::lock_guard<std::mutex> lock(vfs->mutex);
VFS::State &st = vfs->state[path];
if (st.timestamp < curr->mtime) {
st.timestamp = curr->mtime;
do_update = true;
}
loaded = st.loaded;
st.loaded = true;
}
if (std::string reason; !matcher.IsMatch(path, &reason)) {
LOG_IF_S(INFO, loud) << "skip emitting and storing index of " << path << " for "
<< reason;
do_update = false;
}
if (!do_update) {
vfs->Reset(path);
continue; continue;
} }
prev.reset();
if (loaded)
prev = RawCacheLoad(path);
// Store current index.
LOG_IF_S(INFO, loud) << "store index for " << path << " (delta: " << !!prev LOG_IF_S(INFO, loud) << "store index for " << path << " (delta: " << !!prev
<< ")"; << ")";
if (g_config->cacheDirectory.empty()) { {
std::lock_guard lock(g_index_mutex); std::lock_guard lock(mutexes[std::hash<std::string>()(path) % N_MUTEXES]);
auto it = g_index.insert_or_assign( bool loaded;
path, InMemoryIndexFile{curr->file_contents, *curr}); {
std::string().swap(it.first->second.index.file_contents); std::lock_guard lock1(vfs->mutex);
} else { loaded = vfs->state[path].loaded;
std::string cache_path = GetCachePath(path); }
WriteToFile(cache_path, curr->file_contents); if (loaded)
WriteToFile(AppendSerializationFormat(cache_path), prev = RawCacheLoad(path);
Serialize(g_config->cacheFormat, *curr)); else
prev.reset();
if (g_config->cacheDirectory.empty()) {
std::lock_guard lock(g_index_mutex);
auto it = g_index.insert_or_assign(
path, InMemoryIndexFile{curr->file_contents, *curr});
std::string().swap(it.first->second.index.file_contents);
} else {
std::string cache_path = GetCachePath(path);
WriteToFile(cache_path, curr->file_contents);
WriteToFile(AppendSerializationFormat(cache_path),
Serialize(g_config->cacheFormat, *curr));
}
on_indexed->PushBack(IndexUpdate::CreateDelta(prev.get(), curr.get()),
request.mode != IndexMode::NonInteractive);
{
std::lock_guard lock1(vfs->mutex);
vfs->state[path].loaded = true;
}
if (entry.id >= 0) {
std::lock_guard<std::mutex> lock(project->mutex_);
for (auto &dep : curr->dependencies)
project->path_to_entry_index[dep.first()] = entry.id;
}
} }
vfs->Reset(path);
if (entry.id >= 0) {
std::lock_guard<std::mutex> lock(project->mutex_);
for (auto &dep : curr->dependencies)
project->path_to_entry_index[dep.first()] = entry.id;
}
// Build delta update.
IndexUpdate update = IndexUpdate::CreateDelta(prev.get(), curr.get());
on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive);
} }
return true; return true;
@ -376,7 +370,6 @@ void Indexer_Main(CompletionManager *completion, VFS *vfs, Project *project,
void Main_OnIndexed(DB *db, SemanticHighlightSymbolCache *semantic_cache, void Main_OnIndexed(DB *db, SemanticHighlightSymbolCache *semantic_cache,
WorkingFiles *working_files, IndexUpdate *update) { WorkingFiles *working_files, IndexUpdate *update) {
if (update->refresh) { if (update->refresh) {
Project::loaded = true;
LOG_S(INFO) LOG_S(INFO)
<< "loaded project. Refresh semantic highlight for all working file."; << "loaded project. Refresh semantic highlight for all working file.";
std::lock_guard<std::mutex> lock(working_files->files_mutex); std::lock_guard<std::mutex> lock(working_files->files_mutex);

View File

@ -35,6 +35,7 @@ enum class IndexMode {
}; };
namespace pipeline { namespace pipeline {
extern int64_t loaded_ts, tick;
void Init(); void Init();
void LaunchStdin(); void LaunchStdin();
void LaunchStdout(); void LaunchStdout();

View File

@ -351,10 +351,7 @@ int ComputeGuessScore(std::string_view a, std::string_view b) {
} // namespace } // namespace
bool Project::loaded = false;
void Project::Load(const std::string &root_directory) { void Project::Load(const std::string &root_directory) {
Project::loaded = false;
ProjectConfig project; ProjectConfig project;
project.extra_flags = g_config->clang.extraArgs; project.extra_flags = g_config->clang.extraArgs;
project.project_dir = root_directory; project.project_dir = root_directory;
@ -470,6 +467,7 @@ void Project::Index(WorkingFiles *wfiles, lsRequestId id) {
interactive ? IndexMode::Normal : IndexMode::NonInteractive, interactive ? IndexMode::Normal : IndexMode::NonInteractive,
id); id);
}); });
pipeline::loaded_ts = pipeline::tick;
// Dummy request to indicate that project is loaded and // Dummy request to indicate that project is loaded and
// trigger refreshing semantic highlight for all working files. // trigger refreshing semantic highlight for all working files.
pipeline::Index("", {}, IndexMode::NonInteractive); pipeline::Index("", {}, IndexMode::NonInteractive);

View File

@ -72,6 +72,4 @@ struct Project {
ForAllFilteredFiles(std::function<void(int i, const Entry &entry)> action); ForAllFilteredFiles(std::function<void(int i, const Entry &entry)> action);
void Index(WorkingFiles *wfiles, lsRequestId id); void Index(WorkingFiles *wfiles, lsRequestId id);
static bool loaded;
}; };