2017-03-25 19:18:25 +00:00
|
|
|
#pragma once
|
|
|
|
|
2017-12-28 16:55:46 +00:00
|
|
|
#include "utils.h"
|
2017-09-22 01:14:57 +00:00
|
|
|
|
2018-03-31 03:16:33 +00:00
|
|
|
#include <optional>
|
2017-12-28 16:55:46 +00:00
|
|
|
|
2017-03-25 19:18:25 +00:00
|
|
|
#include <algorithm>
|
2017-04-23 22:45:40 +00:00
|
|
|
#include <atomic>
|
2017-03-25 19:18:25 +00:00
|
|
|
#include <condition_variable>
|
2018-02-05 06:03:22 +00:00
|
|
|
#include <deque>
|
2017-09-22 01:14:57 +00:00
|
|
|
#include <mutex>
|
2018-01-01 23:09:46 +00:00
|
|
|
#include <tuple>
|
2018-03-10 23:40:27 +00:00
|
|
|
#include <utility>
|
2017-09-22 01:14:57 +00:00
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
// TODO: cleanup includes.
|
|
|
|
|
2017-04-23 22:45:40 +00:00
|
|
|
struct BaseThreadQueue {
|
|
|
|
virtual bool IsEmpty() = 0;
|
2017-11-19 03:50:22 +00:00
|
|
|
virtual ~BaseThreadQueue() = default;
|
2017-04-23 22:45:40 +00:00
|
|
|
};
|
|
|
|
|
2018-01-01 23:09:46 +00:00
|
|
|
// std::lock accepts two or more arguments. We define an overload for one
|
|
|
|
// argument.
|
|
|
|
namespace std {
|
|
|
|
template <typename Lockable>
|
|
|
|
void lock(Lockable& l) {
|
|
|
|
l.lock();
|
|
|
|
}
|
2018-01-11 02:43:01 +00:00
|
|
|
} // namespace std
|
2018-01-01 23:09:46 +00:00
|
|
|
|
|
|
|
template <typename... Queue>
|
|
|
|
struct MultiQueueLock {
|
2018-01-11 02:43:01 +00:00
|
|
|
MultiQueueLock(Queue... lockable) : tuple_{lockable...} { lock(); }
|
|
|
|
~MultiQueueLock() { unlock(); }
|
2018-03-20 02:51:42 +00:00
|
|
|
void lock() { lock_impl(typename std::index_sequence_for<Queue...>{}); }
|
|
|
|
void unlock() { unlock_impl(typename std::index_sequence_for<Queue...>{}); }
|
2018-01-01 23:09:46 +00:00
|
|
|
|
|
|
|
private:
|
|
|
|
template <size_t... Is>
|
2018-03-10 23:40:27 +00:00
|
|
|
void lock_impl(std::index_sequence<Is...>) {
|
2018-01-01 23:09:46 +00:00
|
|
|
std::lock(std::get<Is>(tuple_)->mutex_...);
|
|
|
|
}
|
|
|
|
|
|
|
|
template <size_t... Is>
|
2018-03-10 23:40:27 +00:00
|
|
|
void unlock_impl(std::index_sequence<Is...>) {
|
2018-01-01 23:09:46 +00:00
|
|
|
(void)std::initializer_list<int>{
|
|
|
|
(std::get<Is>(tuple_)->mutex_.unlock(), 0)...};
|
|
|
|
}
|
|
|
|
|
|
|
|
std::tuple<Queue...> tuple_;
|
|
|
|
};
|
|
|
|
|
2017-04-23 22:45:40 +00:00
|
|
|
struct MultiQueueWaiter {
|
2018-01-01 23:09:46 +00:00
|
|
|
std::condition_variable_any cv;
|
2017-04-23 22:45:40 +00:00
|
|
|
|
2018-01-01 23:09:46 +00:00
|
|
|
static bool HasState(std::initializer_list<BaseThreadQueue*> queues) {
|
2017-04-23 22:45:40 +00:00
|
|
|
for (BaseThreadQueue* queue : queues) {
|
|
|
|
if (!queue->IsEmpty())
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2018-01-01 23:09:46 +00:00
|
|
|
template <typename... BaseThreadQueue>
|
|
|
|
void Wait(BaseThreadQueue... queues) {
|
|
|
|
MultiQueueLock<BaseThreadQueue...> l(queues...);
|
|
|
|
while (!HasState({queues...}))
|
|
|
|
cv.wait(l);
|
2017-04-23 22:45:40 +00:00
|
|
|
}
|
|
|
|
};
|
2017-03-25 19:18:25 +00:00
|
|
|
|
|
|
|
// A threadsafe-queue. http://stackoverflow.com/a/16075550
|
|
|
|
template <class T>
|
2017-04-23 22:45:40 +00:00
|
|
|
struct ThreadedQueue : public BaseThreadQueue {
|
2017-09-22 01:14:57 +00:00
|
|
|
public:
|
2017-10-31 19:37:52 +00:00
|
|
|
ThreadedQueue() : total_count_(0) {
|
2018-03-10 23:40:27 +00:00
|
|
|
owned_waiter_ = std::make_unique<MultiQueueWaiter>();
|
2017-05-26 06:40:38 +00:00
|
|
|
waiter_ = owned_waiter_.get();
|
2018-03-10 23:40:27 +00:00
|
|
|
owned_waiter1_ = std::make_unique<MultiQueueWaiter>();
|
2018-01-02 07:40:36 +00:00
|
|
|
waiter1_ = owned_waiter1_.get();
|
2017-05-26 06:40:38 +00:00
|
|
|
}
|
|
|
|
|
2018-01-02 07:40:36 +00:00
|
|
|
// TODO remove waiter1 after split of on_indexed
|
|
|
|
explicit ThreadedQueue(MultiQueueWaiter* waiter,
|
|
|
|
MultiQueueWaiter* waiter1 = nullptr)
|
|
|
|
: total_count_(0), waiter_(waiter), waiter1_(waiter1) {}
|
2017-04-23 22:45:40 +00:00
|
|
|
|
2017-10-31 19:37:52 +00:00
|
|
|
// Returns the number of elements in the queue. This is lock-free.
|
|
|
|
size_t Size() const { return total_count_; }
|
2017-10-25 07:12:11 +00:00
|
|
|
|
2018-02-05 03:38:57 +00:00
|
|
|
// Add an element to the queue.
|
2018-02-05 06:03:22 +00:00
|
|
|
template <void (std::deque<T>::*push)(T&&)>
|
|
|
|
void Push(T&& t, bool priority) {
|
2017-04-20 05:46:10 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
2018-02-05 03:38:57 +00:00
|
|
|
if (priority)
|
2018-02-05 06:03:22 +00:00
|
|
|
(priority_.*push)(std::move(t));
|
2018-02-05 03:38:57 +00:00
|
|
|
else
|
2018-02-05 06:03:22 +00:00
|
|
|
(queue_.*push)(std::move(t));
|
2017-10-28 21:50:57 +00:00
|
|
|
++total_count_;
|
2018-01-02 07:40:36 +00:00
|
|
|
waiter_->cv.notify_one();
|
|
|
|
if (waiter1_)
|
|
|
|
waiter1_->cv.notify_one();
|
2017-04-20 05:46:10 +00:00
|
|
|
}
|
|
|
|
|
2018-02-05 06:03:22 +00:00
|
|
|
void PushFront(T&& t, bool priority = false) {
|
|
|
|
Push<&std::deque<T>::push_front>(std::move(t), priority);
|
|
|
|
}
|
|
|
|
|
|
|
|
void PushBack(T&& t, bool priority = false) {
|
|
|
|
Push<&std::deque<T>::push_back>(std::move(t), priority);
|
2017-03-25 19:18:25 +00:00
|
|
|
}
|
|
|
|
|
2017-08-17 02:14:54 +00:00
|
|
|
// Add a set of elements to the queue.
|
2018-02-05 03:38:57 +00:00
|
|
|
void EnqueueAll(std::vector<T>&& elements, bool priority = false) {
|
2018-01-18 07:59:48 +00:00
|
|
|
if (elements.empty())
|
|
|
|
return;
|
|
|
|
|
2017-08-17 02:14:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
2017-10-31 19:37:52 +00:00
|
|
|
|
|
|
|
total_count_ += elements.size();
|
|
|
|
|
2017-10-28 21:50:57 +00:00
|
|
|
for (T& element : elements) {
|
2018-02-05 03:38:57 +00:00
|
|
|
if (priority)
|
2018-02-05 06:03:22 +00:00
|
|
|
priority_.push_back(std::move(element));
|
2018-02-05 03:38:57 +00:00
|
|
|
else
|
2018-02-05 06:03:22 +00:00
|
|
|
queue_.push_back(std::move(element));
|
2017-10-28 21:50:57 +00:00
|
|
|
}
|
2017-08-17 02:14:54 +00:00
|
|
|
elements.clear();
|
2017-10-31 19:37:52 +00:00
|
|
|
|
2017-08-17 02:14:54 +00:00
|
|
|
waiter_->cv.notify_all();
|
|
|
|
}
|
|
|
|
|
2017-04-16 21:49:48 +00:00
|
|
|
// Return all elements in the queue.
|
|
|
|
std::vector<T> DequeueAll() {
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
|
2017-10-31 19:37:52 +00:00
|
|
|
total_count_ = 0;
|
|
|
|
|
2017-04-16 21:49:48 +00:00
|
|
|
std::vector<T> result;
|
2017-04-20 05:46:10 +00:00
|
|
|
result.reserve(priority_.size() + queue_.size());
|
|
|
|
while (!priority_.empty()) {
|
|
|
|
result.emplace_back(std::move(priority_.front()));
|
2018-02-05 06:03:22 +00:00
|
|
|
priority_.pop_front();
|
2017-04-20 05:46:10 +00:00
|
|
|
}
|
2017-04-16 21:49:48 +00:00
|
|
|
while (!queue_.empty()) {
|
|
|
|
result.emplace_back(std::move(queue_.front()));
|
2018-02-05 06:03:22 +00:00
|
|
|
queue_.pop_front();
|
2017-04-16 21:49:48 +00:00
|
|
|
}
|
2017-10-31 19:37:52 +00:00
|
|
|
|
2017-04-16 21:49:48 +00:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2017-10-31 19:37:52 +00:00
|
|
|
// Returns true if the queue is empty. This is lock-free.
|
|
|
|
bool IsEmpty() { return total_count_ == 0; }
|
|
|
|
|
2017-05-26 06:40:38 +00:00
|
|
|
// Get the first element from the queue. Blocks until one is available.
|
2018-02-05 06:03:22 +00:00
|
|
|
T Dequeue() {
|
2017-03-25 19:18:25 +00:00
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
2017-09-22 01:14:57 +00:00
|
|
|
waiter_->cv.wait(lock,
|
|
|
|
[&]() { return !priority_.empty() || !queue_.empty(); });
|
2017-07-29 00:07:27 +00:00
|
|
|
|
2018-02-05 06:03:22 +00:00
|
|
|
auto execute = [&](std::deque<T>* q) {
|
2017-10-31 19:37:52 +00:00
|
|
|
auto val = std::move(q->front());
|
2018-02-05 06:03:22 +00:00
|
|
|
q->pop_front();
|
2017-10-28 21:50:57 +00:00
|
|
|
--total_count_;
|
2017-10-31 19:37:52 +00:00
|
|
|
return std::move(val);
|
|
|
|
};
|
|
|
|
if (!priority_.empty())
|
|
|
|
return execute(&priority_);
|
|
|
|
return execute(&queue_);
|
2017-03-25 19:18:25 +00:00
|
|
|
}
|
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
// Get the first element from the queue without blocking. Returns a null
|
|
|
|
// value if the queue is empty.
|
2018-03-31 03:16:33 +00:00
|
|
|
std::optional<T> TryPopFrontHelper(int which) {
|
2017-04-16 21:49:48 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
2018-02-05 06:03:22 +00:00
|
|
|
auto execute = [&](std::deque<T>* q) {
|
2017-10-31 19:37:52 +00:00
|
|
|
auto val = std::move(q->front());
|
2018-02-05 06:03:22 +00:00
|
|
|
q->pop_front();
|
2017-10-28 21:50:57 +00:00
|
|
|
--total_count_;
|
2017-10-31 19:37:52 +00:00
|
|
|
return std::move(val);
|
|
|
|
};
|
2018-02-05 03:38:57 +00:00
|
|
|
if (which & 2 && priority_.size())
|
2017-10-31 19:37:52 +00:00
|
|
|
return execute(&priority_);
|
2018-02-05 03:38:57 +00:00
|
|
|
if (which & 1 && queue_.size())
|
|
|
|
return execute(&queue_);
|
2018-03-31 03:16:33 +00:00
|
|
|
return std::nullopt;
|
2017-03-25 19:18:25 +00:00
|
|
|
}
|
|
|
|
|
2018-03-31 03:16:33 +00:00
|
|
|
std::optional<T> TryPopFront() { return TryPopFrontHelper(3); }
|
2018-02-05 06:03:22 +00:00
|
|
|
|
2018-03-31 03:16:33 +00:00
|
|
|
std::optional<T> TryPopBack() {
|
2018-02-05 06:03:22 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
auto execute = [&](std::deque<T>* q) {
|
|
|
|
auto val = std::move(q->back());
|
|
|
|
q->pop_back();
|
|
|
|
--total_count_;
|
|
|
|
return std::move(val);
|
|
|
|
};
|
|
|
|
// Reversed
|
|
|
|
if (queue_.size())
|
|
|
|
return execute(&queue_);
|
|
|
|
if (priority_.size())
|
|
|
|
return execute(&priority_);
|
2018-03-31 03:16:33 +00:00
|
|
|
return std::nullopt;
|
2018-02-05 03:38:57 +00:00
|
|
|
}
|
|
|
|
|
2018-03-31 03:16:33 +00:00
|
|
|
std::optional<T> TryPopFrontLow() { return TryPopFrontHelper(1); }
|
2018-02-05 03:38:57 +00:00
|
|
|
|
2018-03-31 03:16:33 +00:00
|
|
|
std::optional<T> TryPopFrontHigh() { return TryPopFrontHelper(2); }
|
2017-09-13 03:35:27 +00:00
|
|
|
|
2018-01-01 23:09:46 +00:00
|
|
|
mutable std::mutex mutex_;
|
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
private:
|
2017-10-31 19:37:52 +00:00
|
|
|
std::atomic<int> total_count_;
|
2018-02-05 06:03:22 +00:00
|
|
|
std::deque<T> priority_;
|
|
|
|
std::deque<T> queue_;
|
2017-04-23 22:45:40 +00:00
|
|
|
MultiQueueWaiter* waiter_;
|
2017-05-26 06:40:38 +00:00
|
|
|
std::unique_ptr<MultiQueueWaiter> owned_waiter_;
|
2018-01-02 07:40:36 +00:00
|
|
|
// TODO remove waiter1 after split of on_indexed
|
|
|
|
MultiQueueWaiter* waiter1_;
|
|
|
|
std::unique_ptr<MultiQueueWaiter> owned_waiter1_;
|
2017-03-25 19:18:25 +00:00
|
|
|
};
|