ccls/src/threaded_queue.hh

166 lines
4.4 KiB
C++
Raw Normal View History

2018-08-21 05:27:52 +00:00
// Copyright 2017-2018 ccls Authors
// SPDX-License-Identifier: Apache-2.0
2017-03-25 19:18:25 +00:00
#pragma once
2018-10-29 04:21:21 +00:00
#include "utils.hh"
2017-09-22 01:14:57 +00:00
#include <atomic>
2017-03-25 19:18:25 +00:00
#include <condition_variable>
#include <deque>
2017-09-22 01:14:57 +00:00
#include <mutex>
2018-04-05 07:15:21 +00:00
#include <optional>
#include <tuple>
#include <utility>
2017-09-22 01:14:57 +00:00
// std::lock accepts two or more arguments. We define an overload for one
// argument.
namespace std {
2018-08-09 17:08:14 +00:00
template <typename Lockable> void lock(Lockable &l) { l.lock(); }
} // namespace std
namespace ccls {
struct BaseThreadQueue {
virtual bool IsEmpty() = 0;
virtual ~BaseThreadQueue() = default;
};
2018-08-09 17:08:14 +00:00
template <typename... Queue> struct MultiQueueLock {
2018-01-11 02:43:01 +00:00
MultiQueueLock(Queue... lockable) : tuple_{lockable...} { lock(); }
~MultiQueueLock() { unlock(); }
2018-03-20 02:51:42 +00:00
void lock() { lock_impl(typename std::index_sequence_for<Queue...>{}); }
void unlock() { unlock_impl(typename std::index_sequence_for<Queue...>{}); }
2018-08-09 17:08:14 +00:00
private:
template <size_t... Is> void lock_impl(std::index_sequence<Is...>) {
std::lock(std::get<Is>(tuple_)->mutex_...);
}
2018-08-09 17:08:14 +00:00
template <size_t... Is> void unlock_impl(std::index_sequence<Is...>) {
(std::get<Is>(tuple_)->mutex_.unlock(), ...);
}
std::tuple<Queue...> tuple_;
};
struct MultiQueueWaiter {
std::condition_variable_any cv;
2018-08-09 17:08:14 +00:00
static bool HasState(std::initializer_list<BaseThreadQueue *> queues) {
for (BaseThreadQueue *queue : queues) {
if (!queue->IsEmpty())
return true;
}
return false;
}
2018-08-09 17:08:14 +00:00
template <typename... BaseThreadQueue> void Wait(BaseThreadQueue... queues) {
MultiQueueLock<BaseThreadQueue...> l(queues...);
while (!HasState({queues...}))
cv.wait(l);
}
};
2017-03-25 19:18:25 +00:00
// A threadsafe-queue. http://stackoverflow.com/a/16075550
2018-08-09 17:08:14 +00:00
template <class T> struct ThreadedQueue : public BaseThreadQueue {
public:
ThreadedQueue() {
owned_waiter_ = std::make_unique<MultiQueueWaiter>();
waiter_ = owned_waiter_.get();
}
2018-08-09 17:08:14 +00:00
explicit ThreadedQueue(MultiQueueWaiter *waiter) : waiter_(waiter) {}
2017-10-31 19:37:52 +00:00
// Returns the number of elements in the queue. This is lock-free.
size_t Size() const { return total_count_; }
2017-10-25 07:12:11 +00:00
2018-02-05 03:38:57 +00:00
// Add an element to the queue.
2018-08-09 17:08:14 +00:00
template <void (std::deque<T>::*push)(T &&)> void Push(T &&t, bool priority) {
std::lock_guard<std::mutex> lock(mutex_);
2018-02-05 03:38:57 +00:00
if (priority)
(priority_.*push)(std::move(t));
2018-02-05 03:38:57 +00:00
else
(queue_.*push)(std::move(t));
++total_count_;
waiter_->cv.notify_one();
}
2018-08-09 17:08:14 +00:00
void PushBack(T &&t, bool priority = false) {
Push<&std::deque<T>::push_back>(std::move(t), priority);
2017-03-25 19:18:25 +00:00
}
// Return all elements in the queue.
std::vector<T> DequeueAll() {
std::lock_guard<std::mutex> lock(mutex_);
2017-10-31 19:37:52 +00:00
total_count_ = 0;
std::vector<T> result;
result.reserve(priority_.size() + queue_.size());
while (!priority_.empty()) {
result.emplace_back(std::move(priority_.front()));
priority_.pop_front();
}
while (!queue_.empty()) {
result.emplace_back(std::move(queue_.front()));
queue_.pop_front();
}
2017-10-31 19:37:52 +00:00
return result;
}
2017-10-31 19:37:52 +00:00
// Returns true if the queue is empty. This is lock-free.
bool IsEmpty() { return total_count_ == 0; }
// Get the first element from the queue. Blocks until one is available.
T Dequeue() {
2017-03-25 19:18:25 +00:00
std::unique_lock<std::mutex> lock(mutex_);
2017-09-22 01:14:57 +00:00
waiter_->cv.wait(lock,
[&]() { return !priority_.empty() || !queue_.empty(); });
2017-07-29 00:07:27 +00:00
2018-08-09 17:08:14 +00:00
auto execute = [&](std::deque<T> *q) {
2017-10-31 19:37:52 +00:00
auto val = std::move(q->front());
q->pop_front();
--total_count_;
2017-10-31 19:37:52 +00:00
return std::move(val);
};
if (!priority_.empty())
return execute(&priority_);
return execute(&queue_);
2017-03-25 19:18:25 +00:00
}
2017-03-25 20:32:44 +00:00
// Get the first element from the queue without blocking. Returns a null
// value if the queue is empty.
std::optional<T> TryPopFront() {
std::lock_guard<std::mutex> lock(mutex_);
2018-08-09 17:08:14 +00:00
auto execute = [&](std::deque<T> *q) {
2017-10-31 19:37:52 +00:00
auto val = std::move(q->front());
q->pop_front();
--total_count_;
2017-10-31 19:37:52 +00:00
return std::move(val);
};
if (priority_.size())
2017-10-31 19:37:52 +00:00
return execute(&priority_);
if (queue_.size())
return execute(&queue_);
2018-03-31 03:16:33 +00:00
return std::nullopt;
2018-02-05 03:38:57 +00:00
}
2018-08-09 17:08:14 +00:00
template <typename Fn> void Iterate(Fn fn) {
2018-04-14 16:52:17 +00:00
std::lock_guard<std::mutex> lock(mutex_);
2018-08-09 17:08:14 +00:00
for (auto &entry : priority_)
2018-04-14 16:52:17 +00:00
fn(entry);
2018-08-09 17:08:14 +00:00
for (auto &entry : queue_)
2018-04-14 16:52:17 +00:00
fn(entry);
}
mutable std::mutex mutex_;
2018-08-09 17:08:14 +00:00
private:
std::atomic<int> total_count_{0};
std::deque<T> priority_;
std::deque<T> queue_;
2018-08-09 17:08:14 +00:00
MultiQueueWaiter *waiter_;
std::unique_ptr<MultiQueueWaiter> owned_waiter_;
2017-03-25 19:18:25 +00:00
};
} // namespace ccls