ccls/src/threaded_queue.h

241 lines
5.9 KiB
C
Raw Normal View History

2017-03-25 19:18:25 +00:00
#pragma once
2017-12-28 16:55:46 +00:00
#include "utils.h"
2017-09-22 01:14:57 +00:00
#include "work_thread.h"
2017-12-28 16:55:46 +00:00
#include <optional.h>
2017-03-25 19:18:25 +00:00
#include <algorithm>
#include <atomic>
2017-03-25 19:18:25 +00:00
#include <condition_variable>
2017-09-22 01:14:57 +00:00
#include <mutex>
#include <queue>
#include <tuple>
2017-09-22 01:14:57 +00:00
2017-03-25 20:32:44 +00:00
// TODO: cleanup includes.
struct BaseThreadQueue {
virtual bool IsEmpty() = 0;
virtual ~BaseThreadQueue() = default;
};
// TODO Remove after migration to C++14
namespace {
template <size_t... Is>
struct index_sequence {};
template <size_t I, size_t... Is>
struct make_index_sequence {
2018-01-11 02:43:01 +00:00
using type = typename make_index_sequence<I - 1, I - 1, Is...>::type;
};
template <size_t... Is>
struct make_index_sequence<0, Is...> {
using type = index_sequence<Is...>;
};
2018-01-11 02:43:01 +00:00
} // namespace
// std::lock accepts two or more arguments. We define an overload for one
// argument.
namespace std {
template <typename Lockable>
void lock(Lockable& l) {
l.lock();
}
2018-01-11 02:43:01 +00:00
} // namespace std
template <typename... Queue>
struct MultiQueueLock {
2018-01-11 02:43:01 +00:00
MultiQueueLock(Queue... lockable) : tuple_{lockable...} { lock(); }
~MultiQueueLock() { unlock(); }
void lock() {
lock_impl(typename make_index_sequence<sizeof...(Queue)>::type{});
}
void unlock() {
unlock_impl(typename make_index_sequence<sizeof...(Queue)>::type{});
}
private:
template <size_t... Is>
void lock_impl(index_sequence<Is...>) {
std::lock(std::get<Is>(tuple_)->mutex_...);
}
template <size_t... Is>
void unlock_impl(index_sequence<Is...>) {
(void)std::initializer_list<int>{
(std::get<Is>(tuple_)->mutex_.unlock(), 0)...};
}
std::tuple<Queue...> tuple_;
};
struct MultiQueueWaiter {
std::condition_variable_any cv;
static bool HasState(std::initializer_list<BaseThreadQueue*> queues) {
for (BaseThreadQueue* queue : queues) {
if (!queue->IsEmpty())
return true;
}
return false;
}
template <typename... BaseThreadQueue>
void Wait(BaseThreadQueue... queues) {
MultiQueueLock<BaseThreadQueue...> l(queues...);
while (!HasState({queues...}))
cv.wait(l);
}
};
2017-03-25 19:18:25 +00:00
// A threadsafe-queue. http://stackoverflow.com/a/16075550
template <class T>
struct ThreadedQueue : public BaseThreadQueue {
2017-09-22 01:14:57 +00:00
public:
2017-10-31 19:37:52 +00:00
ThreadedQueue() : total_count_(0) {
owned_waiter_ = MakeUnique<MultiQueueWaiter>();
waiter_ = owned_waiter_.get();
owned_waiter1_ = MakeUnique<MultiQueueWaiter>();
waiter1_ = owned_waiter1_.get();
}
// TODO remove waiter1 after split of on_indexed
explicit ThreadedQueue(MultiQueueWaiter* waiter,
MultiQueueWaiter* waiter1 = nullptr)
: total_count_(0), waiter_(waiter), waiter1_(waiter1) {}
2017-10-31 19:37:52 +00:00
// Returns the number of elements in the queue. This is lock-free.
size_t Size() const { return total_count_; }
2017-10-25 07:12:11 +00:00
// Add an element to the front of the queue.
void PriorityEnqueue(T&& t) {
std::lock_guard<std::mutex> lock(mutex_);
priority_.push(std::move(t));
++total_count_;
waiter_->cv.notify_one();
if (waiter1_)
waiter1_->cv.notify_one();
}
2017-03-25 19:18:25 +00:00
// Add an element to the queue.
void Enqueue(T&& t) {
2017-03-25 19:18:25 +00:00
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(t));
++total_count_;
waiter_->cv.notify_one();
if (waiter1_)
waiter1_->cv.notify_one();
2017-03-25 19:18:25 +00:00
}
// Add a set of elements to the queue.
void EnqueueAll(std::vector<T>&& elements) {
2018-01-18 07:59:48 +00:00
if (elements.empty())
return;
std::lock_guard<std::mutex> lock(mutex_);
2017-10-31 19:37:52 +00:00
total_count_ += elements.size();
for (T& element : elements) {
queue_.push(std::move(element));
}
elements.clear();
2017-10-31 19:37:52 +00:00
waiter_->cv.notify_all();
}
// Return all elements in the queue.
std::vector<T> DequeueAll() {
std::lock_guard<std::mutex> lock(mutex_);
2017-10-31 19:37:52 +00:00
total_count_ = 0;
std::vector<T> result;
result.reserve(priority_.size() + queue_.size());
while (!priority_.empty()) {
result.emplace_back(std::move(priority_.front()));
priority_.pop();
}
while (!queue_.empty()) {
result.emplace_back(std::move(queue_.front()));
queue_.pop();
}
2017-10-31 19:37:52 +00:00
return result;
}
2017-10-31 19:37:52 +00:00
// Returns true if the queue is empty. This is lock-free.
bool IsEmpty() { return total_count_ == 0; }
// TODO: Unify code between DequeuePlusAction with TryDequeuePlusAction.
// Probably have opt<T> Dequeue(bool wait_for_element);
// Get the first element from the queue. Blocks until one is available.
2017-07-29 00:07:27 +00:00
// Executes |action| with an acquired |mutex_|.
2017-09-22 01:14:57 +00:00
template <typename TAction>
2017-07-30 04:46:21 +00:00
T DequeuePlusAction(TAction action) {
2017-03-25 19:18:25 +00:00
std::unique_lock<std::mutex> lock(mutex_);
2017-09-22 01:14:57 +00:00
waiter_->cv.wait(lock,
[&]() { return !priority_.empty() || !queue_.empty(); });
2017-07-29 00:07:27 +00:00
2017-10-31 19:37:52 +00:00
auto execute = [&](std::queue<T>* q) {
auto val = std::move(q->front());
q->pop();
--total_count_;
2017-07-29 00:07:27 +00:00
2017-10-31 19:37:52 +00:00
action();
2017-07-29 00:07:27 +00:00
2017-10-31 19:37:52 +00:00
return std::move(val);
};
if (!priority_.empty())
return execute(&priority_);
return execute(&queue_);
2017-03-25 19:18:25 +00:00
}
2017-07-29 00:07:27 +00:00
// Get the first element from the queue. Blocks until one is available.
T Dequeue() {
return DequeuePlusAction([]() {});
}
2017-03-25 20:32:44 +00:00
// Get the first element from the queue without blocking. Returns a null
// value if the queue is empty.
2017-09-22 01:14:57 +00:00
template <typename TAction>
2017-09-13 03:35:27 +00:00
optional<T> TryDequeuePlusAction(TAction action) {
std::lock_guard<std::mutex> lock(mutex_);
if (priority_.empty() && queue_.empty())
2017-03-25 19:18:25 +00:00
return nullopt;
2017-10-31 19:37:52 +00:00
auto execute = [&](std::queue<T>* q) {
auto val = std::move(q->front());
q->pop();
--total_count_;
2017-10-31 19:37:52 +00:00
action(val);
2017-09-13 03:35:27 +00:00
2017-10-31 19:37:52 +00:00
return std::move(val);
};
if (!priority_.empty())
return execute(&priority_);
return execute(&queue_);
2017-03-25 19:18:25 +00:00
}
2017-09-13 03:35:27 +00:00
optional<T> TryDequeue() {
return TryDequeuePlusAction([](const T&) {});
}
mutable std::mutex mutex_;
2017-03-25 20:32:44 +00:00
private:
2017-10-31 19:37:52 +00:00
std::atomic<int> total_count_;
std::queue<T> priority_;
std::queue<T> queue_;
MultiQueueWaiter* waiter_;
std::unique_ptr<MultiQueueWaiter> owned_waiter_;
// TODO remove waiter1 after split of on_indexed
MultiQueueWaiter* waiter1_;
std::unique_ptr<MultiQueueWaiter> owned_waiter1_;
2017-03-25 19:18:25 +00:00
};