Simplify pipeline and fix race

This commit is contained in:
Fangrui Song 2018-09-17 18:03:59 -07:00
parent cffc8c8409
commit 1249eb1eb0
12 changed files with 106 additions and 169 deletions

View File

@ -4,7 +4,6 @@
#include "config.h"
Config *g_config;
thread_local int g_thread_id;
namespace ccls {
void DoPathMapping(std::string &arg) {

View File

@ -261,7 +261,6 @@ MAKE_REFLECT_STRUCT(Config, compilationDatabaseCommand,
index, largeFileSize, workspaceSymbol, xref);
extern Config *g_config;
thread_local extern int g_thread_id;
namespace ccls {
void DoPathMapping(std::string &arg);

View File

@ -57,43 +57,21 @@ VFS::State VFS::Get(const std::string &file) {
auto it = state.find(file);
if (it != state.end())
return it->second;
return {0, 0, 0};
return {0, 0};
}
bool VFS::Mark(const std::string &file, int owner, int stage) {
std::lock_guard<std::mutex> lock(mutex);
State &st = state[file];
if (st.stage < stage) {
st.owner = owner;
st.stage = stage;
return true;
} else
return false;
}
bool VFS::Stamp(const std::string &file, int64_t ts) {
bool VFS::Stamp(const std::string &file, int64_t ts, int64_t offset) {
std::lock_guard<std::mutex> lock(mutex);
State &st = state[file];
if (st.timestamp < ts) {
st.timestamp = ts;
st.timestamp = ts + offset;
return true;
} else
return false;
}
void VFS::ResetLocked(const std::string &file) {
State &st = state[file];
if (st.owner == 0 || st.owner == g_thread_id)
st.stage = 0;
}
void VFS::Reset(const std::string &file) {
std::lock_guard<std::mutex> lock(mutex);
ResetLocked(file);
}
FileConsumer::FileConsumer(VFS *vfs, const std::string &parse_file)
: vfs_(vfs), parse_file_(parse_file), thread_id_(g_thread_id) {}
: vfs_(vfs), parse_file_(parse_file) {}
IndexFile *FileConsumer::TryConsumeFile(
const clang::FileEntry &File,
@ -104,9 +82,9 @@ IndexFile *FileConsumer::TryConsumeFile(
return it->second.get();
std::string file_name = FileName(File);
// We did not take the file from global. Cache that we failed so we don't try
// again and return nullptr.
if (!vfs_->Mark(file_name, thread_id_, 2)) {
int64_t tim = File.getModificationTime();
assert(tim);
if (!vfs_->Stamp(file_name, tim, 0)) {
local_[UniqueID] = nullptr;
return nullptr;
}

View File

@ -31,18 +31,13 @@ struct FileContents {
struct VFS {
struct State {
int64_t timestamp;
int owner;
int stage;
bool loaded = false;
};
mutable std::unordered_map<std::string, State> state;
mutable std::mutex mutex;
State Get(const std::string &file);
bool Mark(const std::string &file, int owner, int stage);
bool Stamp(const std::string &file, int64_t ts);
void ResetLocked(const std::string &file);
void Reset(const std::string &file);
bool Stamp(const std::string &file, int64_t ts, int64_t offset);
};
namespace std {
@ -83,5 +78,4 @@ private:
local_;
VFS *vfs_;
std::string parse_file_;
int thread_id_;
};

View File

@ -1238,9 +1238,10 @@ Index(CompletionManager *completion, WorkingFiles *wfiles, VFS *vfs,
CI->getLangOpts()->RetainCommentsFromSystemHeaders = true;
std::string buf = wfiles->GetContent(file);
std::vector<std::unique_ptr<llvm::MemoryBuffer>> Bufs;
if (g_config->index.onChange && buf.size()) {
if (buf.size()) {
// If there is a completion session, reuse its preamble if exists.
bool done_remap = false;
#if 0
std::shared_ptr<CompletionSession> session =
completion->TryGetSession(file, false, false);
if (session)
@ -1253,6 +1254,7 @@ Index(CompletionManager *completion, WorkingFiles *wfiles, VFS *vfs,
done_remap = true;
}
}
#endif
for (auto &[filename, content] : remapped) {
if (filename == file && done_remap)
continue;

View File

@ -7,18 +7,7 @@
#include "pipeline.hh"
using namespace ccls;
// static
std::optional<Matcher> Matcher::Create(const std::string &search) {
/*
std::string real_search;
real_search.reserve(search.size() * 3 + 2);
for (auto c : search) {
real_search += ".*";
real_search += c;
}
real_search += ".*";
*/
try {
Matcher m;
m.regex_string = search;
@ -61,18 +50,16 @@ GroupMatch::GroupMatch(const std::vector<std::string> &whitelist,
bool GroupMatch::IsMatch(const std::string &value,
std::string *match_failure_reason) const {
for (const Matcher &m : whitelist) {
for (const Matcher &m : whitelist)
if (m.IsMatch(value))
return true;
}
for (const Matcher &m : blacklist) {
for (const Matcher &m : blacklist)
if (m.IsMatch(value)) {
if (match_failure_reason)
*match_failure_reason = "blacklist \"" + m.regex_string + "\"";
return false;
}
}
return true;
}

View File

@ -70,17 +70,6 @@ struct Handler_CclsReload : BaseMessageHandler<In_CclsReload> {
q.pop();
need_index.insert(file->def->path);
std::optional<int64_t> write_time =
pipeline::LastWriteTime(file->def->path);
if (!write_time)
continue;
{
std::lock_guard<std::mutex> lock(vfs->mutex);
VFS::State &st = vfs->state[file->def->path];
if (st.timestamp < write_time)
st.stage = 0;
}
if (request->params.dependencies)
for (const std::string &path : graph[file->def->path]) {
auto it = path_to_file.find(path);

View File

@ -490,7 +490,6 @@ struct Handler_Initialize : BaseMessageHandler<In_InitializeRequest> {
LOG_S(INFO) << "start " << g_config->index.threads << " indexers";
for (int i = 0; i < g_config->index.threads; i++) {
std::thread([=]() {
g_thread_id = i + 1;
std::string name = "indexer" + std::to_string(i);
set_thread_name(name.c_str());
pipeline::Indexer_Main(clang_complete, vfs, project, working_files);

View File

@ -14,7 +14,6 @@
#include "project.h"
#include "query_utils.h"
#include <llvm/ADT/Twine.h>
#include <llvm/Support/Threading.h>
#include <llvm/Support/Timer.h>
using namespace llvm;
@ -63,6 +62,9 @@ void DiagnosticsPublisher::Publish(WorkingFiles *working_files,
}
namespace ccls::pipeline {
int64_t loaded_ts = 0, tick = 0;
namespace {
struct Index_Request {
@ -70,6 +72,7 @@ struct Index_Request {
std::vector<std::string> args;
IndexMode mode;
lsRequestId id;
int64_t ts = tick++;
};
struct Stdout_Request {
@ -160,6 +163,8 @@ std::unique_ptr<IndexFile> RawCacheLoad(const std::string &path) {
bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
Project *project, VFS *vfs, const GroupMatch &matcher) {
const int N_MUTEXES = 256;
static std::mutex mutexes[N_MUTEXES];
std::optional<Index_Request> opt_request = index_request->TryPopFront();
if (!opt_request)
return false;
@ -174,8 +179,8 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
return false;
}
if (std::string reason; !matcher.IsMatch(request.path, &reason)) {
LOG_IF_S(INFO, loud) << "skip " << request.path << " for " << reason;
if (!matcher.IsMatch(request.path)) {
LOG_IF_S(INFO, loud) << "skip " << request.path;
return false;
}
@ -188,45 +193,37 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
std::optional<int64_t> write_time = LastWriteTime(path_to_index);
if (!write_time)
return true;
int reparse = vfs->Stamp(path_to_index, *write_time);
int reparse = vfs->Stamp(path_to_index, *write_time, -1);
if (request.path != path_to_index) {
std::optional<int64_t> mtime1 = LastWriteTime(request.path);
if (!mtime1)
return true;
if (vfs->Stamp(request.path, *mtime1))
reparse = 2;
if (vfs->Stamp(request.path, *mtime1, -1))
reparse = 1;
}
if (g_config->index.onChange)
reparse = 2;
if (!vfs->Mark(path_to_index, g_thread_id, 1) && !reparse)
if (!reparse)
return true;
if (reparse < 2) do {
std::unique_lock lock(
mutexes[std::hash<std::string>()(path_to_index) % N_MUTEXES]);
prev = RawCacheLoad(path_to_index);
if (!prev)
reparse = 2;
else {
if (CacheInvalid(vfs, prev.get(), path_to_index, entry.args, std::nullopt))
reparse = 2;
int reparseForDep = g_config->index.reparseForDependency;
if (reparseForDep > 1 || (reparseForDep == 1 && !Project::loaded))
for (const auto &dep : prev->dependencies) {
if (auto write_time1 = LastWriteTime(dep.first().str())) {
if (dep.second < *write_time1) {
reparse = 2;
std::lock_guard<std::mutex> lock(vfs->mutex);
vfs->state[dep.first().str()].stage = 0;
}
} else
reparse = 2;
}
}
// Grab the ownership
if (reparse) {
std::lock_guard<std::mutex> lock(vfs->mutex);
vfs->state[path_to_index].owner = g_thread_id;
vfs->state[path_to_index].stage = 0;
if (!prev || CacheInvalid(vfs, prev.get(), path_to_index, entry.args,
std::nullopt))
break;
bool update = false;
for (const auto &dep : prev->dependencies)
if (auto mtime1 = LastWriteTime(dep.first.val().str())) {
if (dep.second < *mtime1)
update = true;
} else {
update = true;
}
int forDep = g_config->index.reparseForDependency;
if (update && (forDep > 1 || (forDep == 1 && request.ts < loaded_ts)))
break;
if (reparse < 2) {
LOG_S(INFO) << "load cache for " << path_to_index;
@ -235,27 +232,36 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get());
on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive);
std::lock_guard lock(vfs->mutex);
std::lock_guard lock1(vfs->mutex);
vfs->state[path_to_index].loaded = true;
}
lock.unlock();
for (const auto &dep : dependencies) {
std::string path = dep.first().str();
if (vfs->Mark(path, 0, 2) && (prev = RawCacheLoad(path))) {
std::string path = dep.first.val().str();
std::lock_guard lock1(
mutexes[std::hash<std::string>()(path) % N_MUTEXES]);
prev = RawCacheLoad(path);
if (!prev)
continue;
{
std::lock_guard lock2(vfs->mutex);
VFS::State &st = vfs->state[path];
if (st.loaded)
continue;
st.loaded = true;
st.timestamp = prev->mtime;
}
IndexUpdate update = IndexUpdate::CreateDelta(nullptr, prev.get());
on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive);
{
std::lock_guard lock(vfs->mutex);
vfs->state[path].loaded = true;
}
if (entry.id >= 0) {
std::lock_guard lock(project->mutex_);
std::lock_guard lock2(project->mutex_);
project->path_to_entry_index[path] = entry.id;
}
}
}
return true;
}
} while (0);
LOG_IF_S(INFO, loud) << "parse " << path_to_index;
@ -276,40 +282,29 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
out.error.message = "Failed to index " + path_to_index;
pipeline::WriteStdout(kMethodType_Unknown, out);
}
vfs->Reset(path_to_index);
return true;
}
for (std::unique_ptr<IndexFile> &curr : indexes) {
std::string path = curr->path;
bool do_update = path == path_to_index || path == request.path, loaded;
{
std::lock_guard<std::mutex> lock(vfs->mutex);
VFS::State &st = vfs->state[path];
if (st.timestamp < curr->mtime) {
st.timestamp = curr->mtime;
do_update = true;
}
loaded = st.loaded;
st.loaded = true;
}
if (std::string reason; !matcher.IsMatch(path, &reason)) {
LOG_IF_S(INFO, loud) << "skip emitting and storing index of " << path << " for "
<< reason;
do_update = false;
}
if (!do_update) {
vfs->Reset(path);
if (!matcher.IsMatch(path)) {
LOG_IF_S(INFO, loud) << "skip index for " << path;
continue;
}
prev.reset();
if (loaded)
prev = RawCacheLoad(path);
// Store current index.
LOG_IF_S(INFO, loud) << "store index for " << path << " (delta: " << !!prev
<< ")";
{
std::lock_guard lock(mutexes[std::hash<std::string>()(path) % N_MUTEXES]);
bool loaded;
{
std::lock_guard lock1(vfs->mutex);
loaded = vfs->state[path].loaded;
}
if (loaded)
prev = RawCacheLoad(path);
else
prev.reset();
if (g_config->cacheDirectory.empty()) {
std::lock_guard lock(g_index_mutex);
auto it = g_index.insert_or_assign(
@ -321,19 +316,18 @@ bool Indexer_Parse(CompletionManager *completion, WorkingFiles *wfiles,
WriteToFile(AppendSerializationFormat(cache_path),
Serialize(g_config->cacheFormat, *curr));
}
vfs->Reset(path);
on_indexed->PushBack(IndexUpdate::CreateDelta(prev.get(), curr.get()),
request.mode != IndexMode::NonInteractive);
{
std::lock_guard lock1(vfs->mutex);
vfs->state[path].loaded = true;
}
if (entry.id >= 0) {
std::lock_guard<std::mutex> lock(project->mutex_);
for (auto &dep : curr->dependencies)
project->path_to_entry_index[dep.first()] = entry.id;
}
// Build delta update.
IndexUpdate update = IndexUpdate::CreateDelta(prev.get(), curr.get());
on_indexed->PushBack(std::move(update),
request.mode != IndexMode::NonInteractive);
}
}
return true;
@ -364,7 +358,6 @@ void Indexer_Main(CompletionManager *completion, VFS *vfs, Project *project,
void Main_OnIndexed(DB *db, SemanticHighlightSymbolCache *semantic_cache,
WorkingFiles *working_files, IndexUpdate *update) {
if (update->refresh) {
Project::loaded = true;
LOG_S(INFO)
<< "loaded project. Refresh semantic highlight for all working file.";
std::lock_guard<std::mutex> lock(working_files->files_mutex);

View File

@ -38,6 +38,7 @@ enum class IndexMode {
};
namespace pipeline {
extern int64_t loaded_ts, tick;
void Init();
void LaunchStdin();
void LaunchStdout();

View File

@ -339,10 +339,7 @@ int ComputeGuessScore(std::string_view a, std::string_view b) {
} // namespace
bool Project::loaded = false;
void Project::Load(const std::string &root_directory) {
Project::loaded = false;
ProjectConfig project;
project.extra_flags = g_config->clang.extraArgs;
project.project_dir = root_directory;
@ -458,6 +455,7 @@ void Project::Index(WorkingFiles *wfiles, lsRequestId id) {
interactive ? IndexMode::Normal : IndexMode::NonInteractive,
id);
});
pipeline::loaded_ts = pipeline::tick;
// Dummy request to indicate that project is loaded and
// trigger refreshing semantic highlight for all working files.
pipeline::Index("", {}, IndexMode::NonInteractive);

View File

@ -60,6 +60,4 @@ struct Project {
ForAllFilteredFiles(std::function<void(int i, const Entry &entry)> action);
void Index(WorkingFiles *wfiles, lsRequestId id);
static bool loaded;
};