threaded_queue: queue -> deque

IndexMergeIndexUpdates: use TryPopBack() and see
This commit is contained in:
Fangrui Song 2018-02-04 22:03:22 -08:00
parent 3839d1e5ab
commit cd96cb9570
11 changed files with 76 additions and 65 deletions

View File

@ -689,7 +689,7 @@ void ClangCompleteManager::NotifyView(const std::string& filename) {
// Only reparse the file if we create a new CompletionSession. // Only reparse the file if we create a new CompletionSession.
if (EnsureCompletionOrCreatePreloadSession(filename)) if (EnsureCompletionOrCreatePreloadSession(filename))
parse_requests_.PriorityEnqueue(ParseRequest(filename)); parse_requests_.PushBack(ParseRequest(filename), true);
} }
void ClangCompleteManager::NotifyEdit(const std::string& filename) { void ClangCompleteManager::NotifyEdit(const std::string& filename) {
@ -708,7 +708,7 @@ void ClangCompleteManager::NotifySave(const std::string& filename) {
// //
EnsureCompletionOrCreatePreloadSession(filename); EnsureCompletionOrCreatePreloadSession(filename);
parse_requests_.PriorityEnqueue(ParseRequest(filename)); parse_requests_.PushBack(ParseRequest(filename), true);
} }
void ClangCompleteManager::NotifyClose(const std::string& filename) { void ClangCompleteManager::NotifyClose(const std::string& filename) {

View File

@ -344,7 +344,7 @@ void LaunchStdinLoop(Config* config,
case IpcId::CqueryDerived: case IpcId::CqueryDerived:
case IpcId::CqueryIndexFile: case IpcId::CqueryIndexFile:
case IpcId::CqueryWait: { case IpcId::CqueryWait: {
queue->for_querydb.Enqueue(std::move(message)); queue->for_querydb.PushBack(std::move(message));
break; break;
} }

View File

@ -393,7 +393,7 @@ bool IndexMain_DoParse(
ImportManager* import_manager, ImportManager* import_manager,
IIndexer* indexer) { IIndexer* indexer) {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
optional<Index_Request> request = queue->index_request.TryDequeue(); optional<Index_Request> request = queue->index_request.TryPopFront();
if (!request) if (!request)
return false; return false;
@ -408,7 +408,7 @@ bool IndexMain_DoParse(
bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) { bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue(); optional<Index_OnIdMapped> response = queue->on_id_mapped.TryPopFront();
if (!response) if (!response)
return false; return false;
@ -466,14 +466,14 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {
#endif #endif
Index_OnIndexed reply(update, response->perf); Index_OnIndexed reply(update, response->perf);
queue->on_indexed.Enqueue(std::move(reply), response->is_interactive); queue->on_indexed.PushBack(std::move(reply), response->is_interactive);
return true; return true;
} }
bool IndexMain_LoadPreviousIndex() { bool IndexMain_LoadPreviousIndex() {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
optional<Index_DoIdMap> response = queue->load_previous_index.TryDequeue(); optional<Index_DoIdMap> response = queue->load_previous_index.TryPopFront();
if (!response) if (!response)
return false; return false;
@ -482,21 +482,21 @@ bool IndexMain_LoadPreviousIndex() {
<< "Unable to load previous index for already imported index " << "Unable to load previous index for already imported index "
<< response->current->path; << response->current->path;
queue->do_id_map.Enqueue(std::move(*response)); queue->do_id_map.PushBack(std::move(*response));
return true; return true;
} }
bool IndexMergeIndexUpdates() { bool IndexMergeIndexUpdates() {
auto* queue = QueueManager::instance(); auto* queue = QueueManager::instance();
optional<Index_OnIndexed> root = queue->on_indexed.TryDequeue(); optional<Index_OnIndexed> root = queue->on_indexed.TryPopBack();
if (!root) if (!root)
return false; return false;
bool did_merge = false; bool did_merge = false;
while (true) { while (true) {
optional<Index_OnIndexed> to_join = queue->on_indexed.TryDequeue(); optional<Index_OnIndexed> to_join = queue->on_indexed.TryPopBack();
if (!to_join) { if (!to_join) {
queue->on_indexed.Enqueue(std::move(*root)); queue->on_indexed.PushFront(std::move(*root));
return did_merge; return did_merge;
} }
@ -633,7 +633,7 @@ void QueryDb_DoIdMap(QueueManager* queue,
db->usr_to_file.end()) { db->usr_to_file.end()) {
assert(!request->load_previous); assert(!request->load_previous);
request->load_previous = true; request->load_previous = true;
queue->load_previous_index.Enqueue(std::move(*request)); queue->load_previous_index.PushBack(std::move(*request));
return; return;
} }
@ -665,7 +665,7 @@ void QueryDb_DoIdMap(QueueManager* queue,
response.previous = make_map(std::move(request->previous)); response.previous = make_map(std::move(request->previous));
response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset(); response.perf.querydb_id_map = time.ElapsedMicrosecondsAndReset();
queue->on_id_mapped.Enqueue(std::move(response)); queue->on_id_mapped.PushBack(std::move(response));
} }
void QueryDb_OnIndexed(QueueManager* queue, void QueryDb_OnIndexed(QueueManager* queue,
@ -721,7 +721,7 @@ bool QueryDb_ImportMain(Config* config,
bool did_work = false; bool did_work = false;
while (true) { while (true) {
optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue(); optional<Index_DoIdMap> request = queue->do_id_map.TryPopFront();
if (!request) if (!request)
break; break;
did_work = true; did_work = true;
@ -729,7 +729,7 @@ bool QueryDb_ImportMain(Config* config,
} }
while (true) { while (true) {
optional<Index_OnIndexed> response = queue->on_indexed.TryDequeue(); optional<Index_OnIndexed> response = queue->on_indexed.TryPopFront();
if (!response) if (!response)
break; break;
did_work = true; did_work = true;
@ -762,7 +762,7 @@ TEST_SUITE("ImportPipeline") {
const std::vector<std::string>& args = {}, const std::vector<std::string>& args = {},
bool is_interactive = false, bool is_interactive = false,
const std::string& contents = "void foo();") { const std::string& contents = "void foo();") {
queue->index_request.Enqueue( queue->index_request.PushBack(
Index_Request(path, args, is_interactive, contents, cache_manager)); Index_Request(path, args, is_interactive, contents, cache_manager));
} }

View File

@ -97,8 +97,9 @@ struct CqueryFreshenIndexHandler : BaseMessageHandler<Ipc_CqueryFreshenIndex> {
} }
bool is_interactive = bool is_interactive =
working_files->GetFileByFilename(entry.filename) != nullptr; working_files->GetFileByFilename(entry.filename) != nullptr;
queue->index_request.Enqueue(Index_Request(entry.filename, entry.args, queue->index_request.PushBack(
is_interactive, *content, ICacheManager::Make(config))); Index_Request(entry.filename, entry.args, is_interactive,
*content, ICacheManager::Make(config)));
}); });
} }
}; };

View File

@ -27,7 +27,7 @@ REGISTER_IPC_MESSAGE(Ipc_CqueryIndexFile);
struct CqueryIndexFileHandler : BaseMessageHandler<Ipc_CqueryIndexFile> { struct CqueryIndexFileHandler : BaseMessageHandler<Ipc_CqueryIndexFile> {
void Run(Ipc_CqueryIndexFile* request) override { void Run(Ipc_CqueryIndexFile* request) override {
LOG_S(INFO) << "Indexing file " << request->params.path; LOG_S(INFO) << "Indexing file " << request->params.path;
QueueManager::instance()->index_request.Enqueue(Index_Request( QueueManager::instance()->index_request.PushBack(Index_Request(
NormalizePath(request->params.path), request->params.args, NormalizePath(request->params.path), request->params.args,
request->params.is_interactive, request->params.contents, ICacheManager::Make(config))); request->params.is_interactive, request->params.contents, ICacheManager::Make(config)));
} }

View File

@ -624,7 +624,7 @@ struct InitializeHandler : BaseMessageHandler<Ipc_InitializeRequest> {
} }
bool is_interactive = bool is_interactive =
working_files->GetFileByFilename(entry.filename) != nullptr; working_files->GetFileByFilename(entry.filename) != nullptr;
queue->index_request.Enqueue(Index_Request( queue->index_request.PushBack(Index_Request(
entry.filename, entry.args, is_interactive, *content, ICacheManager::Make(config), request->id)); entry.filename, entry.args, is_interactive, *content, ICacheManager::Make(config), request->id));
}); });

View File

@ -56,9 +56,10 @@ struct TextDocumentDidOpenHandler
// Submit new index request. // Submit new index request.
const Project::Entry& entry = project->FindCompilationEntryForFile(path); const Project::Entry& entry = project->FindCompilationEntryForFile(path);
QueueManager::instance()->index_request.PriorityEnqueue( QueueManager::instance()->index_request.PushBack(
Index_Request(entry.filename, entry.args, true /*is_interactive*/, Index_Request(entry.filename, entry.args, true /*is_interactive*/,
request->params.textDocument.text, cache_manager)); request->params.textDocument.text, cache_manager),
true /* priority */);
} }
}; };
REGISTER_MESSAGE_HANDLER(TextDocumentDidOpenHandler); REGISTER_MESSAGE_HANDLER(TextDocumentDidOpenHandler);

View File

@ -49,8 +49,10 @@ struct TextDocumentDidSaveHandler
LOG_S(ERROR) << "Unable to read file content after saving " << path; LOG_S(ERROR) << "Unable to read file content after saving " << path;
} else { } else {
Project::Entry entry = project->FindCompilationEntryForFile(path); Project::Entry entry = project->FindCompilationEntryForFile(path);
QueueManager::instance()->index_request.Enqueue(Index_Request( QueueManager::instance()->index_request.PushBack(
entry.filename, entry.args, true /*is_interactive*/, *content, ICacheManager::Make(config))); Index_Request(entry.filename, entry.args, true /*is_interactive*/,
*content, ICacheManager::Make(config)),
true);
} }
clang_complete->NotifySave(path); clang_complete->NotifySave(path);

View File

@ -52,7 +52,7 @@ struct WorkspaceDidChangeWatchedFilesHandler
if (!content) if (!content)
LOG_S(ERROR) << "Unable to read file content after saving " << path; LOG_S(ERROR) << "Unable to read file content after saving " << path;
else { else {
QueueManager::instance()->index_request.Enqueue( QueueManager::instance()->index_request.PushBack(
Index_Request(path, entry.args, is_interactive, *content, ICacheManager::Make(config))); Index_Request(path, entry.args, is_interactive, *content, ICacheManager::Make(config)));
if (is_interactive) if (is_interactive)
clang_complete->NotifySave(path); clang_complete->NotifySave(path);
@ -60,7 +60,7 @@ struct WorkspaceDidChangeWatchedFilesHandler
break; break;
} }
case lsFileChangeType::Deleted: case lsFileChangeType::Deleted:
QueueManager::instance()->index_request.Enqueue( QueueManager::instance()->index_request.PushBack(
Index_Request(path, entry.args, is_interactive, std::string(), ICacheManager::Make(config))); Index_Request(path, entry.args, is_interactive, std::string(), ICacheManager::Make(config)));
break; break;
} }

View File

@ -73,7 +73,7 @@ void QueueManager::WriteStdout(IpcId id, lsBaseOutMessage& response) {
Stdout_Request out; Stdout_Request out;
out.content = sstream.str(); out.content = sstream.str();
out.id = id; out.id = id;
instance()->for_stdout.Enqueue(std::move(out)); instance()->for_stdout.PushBack(std::move(out));
} }
QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter, QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter,

View File

@ -8,8 +8,8 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <deque>
#include <mutex> #include <mutex>
#include <queue>
#include <tuple> #include <tuple>
// TODO: cleanup includes. // TODO: cleanup includes.
@ -111,21 +111,25 @@ struct ThreadedQueue : public BaseThreadQueue {
size_t Size() const { return total_count_; } size_t Size() const { return total_count_; }
// Add an element to the queue. // Add an element to the queue.
void Enqueue(T&& t, bool priority = false) { template <void (std::deque<T>::*push)(T&&)>
void Push(T&& t, bool priority) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (priority) if (priority)
priority_.push(std::move(t)); (priority_.*push)(std::move(t));
else else
queue_.push(std::move(t)); (queue_.*push)(std::move(t));
++total_count_; ++total_count_;
waiter_->cv.notify_one(); waiter_->cv.notify_one();
if (waiter1_) if (waiter1_)
waiter1_->cv.notify_one(); waiter1_->cv.notify_one();
} }
// Add an element to the front of the queue. void PushFront(T&& t, bool priority = false) {
void PriorityEnqueue(T&& t) { Push<&std::deque<T>::push_front>(std::move(t), priority);
Enqueue(std::move(t), true); }
void PushBack(T&& t, bool priority = false) {
Push<&std::deque<T>::push_back>(std::move(t), priority);
} }
// Add a set of elements to the queue. // Add a set of elements to the queue.
@ -139,9 +143,9 @@ struct ThreadedQueue : public BaseThreadQueue {
for (T& element : elements) { for (T& element : elements) {
if (priority) if (priority)
priority_.push(std::move(element)); priority_.push_back(std::move(element));
else else
queue_.push(std::move(element)); queue_.push_back(std::move(element));
} }
elements.clear(); elements.clear();
@ -158,11 +162,11 @@ struct ThreadedQueue : public BaseThreadQueue {
result.reserve(priority_.size() + queue_.size()); result.reserve(priority_.size() + queue_.size());
while (!priority_.empty()) { while (!priority_.empty()) {
result.emplace_back(std::move(priority_.front())); result.emplace_back(std::move(priority_.front()));
priority_.pop(); priority_.pop_front();
} }
while (!queue_.empty()) { while (!queue_.empty()) {
result.emplace_back(std::move(queue_.front())); result.emplace_back(std::move(queue_.front()));
queue_.pop(); queue_.pop_front();
} }
return result; return result;
@ -171,24 +175,16 @@ struct ThreadedQueue : public BaseThreadQueue {
// Returns true if the queue is empty. This is lock-free. // Returns true if the queue is empty. This is lock-free.
bool IsEmpty() { return total_count_ == 0; } bool IsEmpty() { return total_count_ == 0; }
// TODO: Unify code between DequeuePlusAction with TryDequeuePlusAction.
// Probably have opt<T> Dequeue(bool wait_for_element);
// Get the first element from the queue. Blocks until one is available. // Get the first element from the queue. Blocks until one is available.
// Executes |action| with an acquired |mutex_|. T Dequeue() {
template <typename TAction>
T DequeuePlusAction(TAction action) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
waiter_->cv.wait(lock, waiter_->cv.wait(lock,
[&]() { return !priority_.empty() || !queue_.empty(); }); [&]() { return !priority_.empty() || !queue_.empty(); });
auto execute = [&](std::queue<T>* q) { auto execute = [&](std::deque<T>* q) {
auto val = std::move(q->front()); auto val = std::move(q->front());
q->pop(); q->pop_front();
--total_count_; --total_count_;
action();
return std::move(val); return std::move(val);
}; };
if (!priority_.empty()) if (!priority_.empty())
@ -196,18 +192,13 @@ struct ThreadedQueue : public BaseThreadQueue {
return execute(&queue_); return execute(&queue_);
} }
// Get the first element from the queue. Blocks until one is available.
T Dequeue() {
return DequeuePlusAction([]() {});
}
// Get the first element from the queue without blocking. Returns a null // Get the first element from the queue without blocking. Returns a null
// value if the queue is empty. // value if the queue is empty.
optional<T> TryDequeueHelper(int which) { optional<T> TryPopFrontHelper(int which) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto execute = [&](std::queue<T>* q) { auto execute = [&](std::deque<T>* q) {
auto val = std::move(q->front()); auto val = std::move(q->front());
q->pop(); q->pop_front();
--total_count_; --total_count_;
return std::move(val); return std::move(val);
}; };
@ -218,24 +209,40 @@ struct ThreadedQueue : public BaseThreadQueue {
return nullopt; return nullopt;
} }
optional<T> TryDequeue() { optional<T> TryPopFront() {
return TryDequeueHelper(3); return TryPopFrontHelper(3);
} }
optional<T> TryDequeueLow() { optional<T> TryPopBack() {
return TryDequeueHelper(1); std::lock_guard<std::mutex> lock(mutex_);
auto execute = [&](std::deque<T>* q) {
auto val = std::move(q->back());
q->pop_back();
--total_count_;
return std::move(val);
};
// Reversed
if (queue_.size())
return execute(&queue_);
if (priority_.size())
return execute(&priority_);
return nullopt;
} }
optional<T> TryDequeueHigh() { optional<T> TryPopFrontLow() {
return TryDequeueHelper(2); return TryPopFrontHelper(1);
}
optional<T> TryPopFrontHigh() {
return TryPopFrontHelper(2);
} }
mutable std::mutex mutex_; mutable std::mutex mutex_;
private: private:
std::atomic<int> total_count_; std::atomic<int> total_count_;
std::queue<T> priority_; std::deque<T> priority_;
std::queue<T> queue_; std::deque<T> queue_;
MultiQueueWaiter* waiter_; MultiQueueWaiter* waiter_;
std::unique_ptr<MultiQueueWaiter> owned_waiter_; std::unique_ptr<MultiQueueWaiter> owned_waiter_;
// TODO remove waiter1 after split of on_indexed // TODO remove waiter1 after split of on_indexed