2017-03-25 19:18:25 +00:00
|
|
|
#pragma once
|
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
#include <optional.h>
|
|
|
|
|
2017-03-25 19:18:25 +00:00
|
|
|
#include <algorithm>
|
2017-04-23 22:45:40 +00:00
|
|
|
#include <atomic>
|
2017-03-25 19:18:25 +00:00
|
|
|
#include <queue>
|
|
|
|
#include <mutex>
|
|
|
|
#include <condition_variable>
|
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
// TODO: cleanup includes.
|
|
|
|
|
2017-04-23 22:45:40 +00:00
|
|
|
struct BaseThreadQueue {
|
|
|
|
virtual bool IsEmpty() = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
struct MultiQueueWaiter {
|
|
|
|
std::mutex m;
|
|
|
|
std::condition_variable cv;
|
|
|
|
|
|
|
|
bool HasState(std::initializer_list<BaseThreadQueue*> queues) {
|
|
|
|
for (BaseThreadQueue* queue : queues) {
|
|
|
|
if (!queue->IsEmpty())
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Wait(std::initializer_list<BaseThreadQueue*> queues) {
|
|
|
|
// We cannot have a single condition variable wait on all of the different
|
|
|
|
// mutexes, so we have a global condition variable that every queue will
|
|
|
|
// notify. After it is notified we check to see if any of the queues have
|
|
|
|
// data; if they do, we return.
|
|
|
|
//
|
|
|
|
// We repoll every 5 seconds because it's not possible to atomically check
|
|
|
|
// the state of every queue and then setup the condition variable. So, if
|
|
|
|
// Wait() is called, HasState() returns false, and then in the time after
|
|
|
|
// HasState() is called data gets posted but before we begin waiting for
|
|
|
|
// the condition variable, we will miss the notification. The timeout of 5
|
|
|
|
// means that if this happens we will delay operation for 5 seconds.
|
|
|
|
|
|
|
|
while (!HasState(queues)) {
|
|
|
|
std::unique_lock<std::mutex> l(m);
|
|
|
|
cv.wait_for(l, std::chrono::seconds(5));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2017-03-25 19:18:25 +00:00
|
|
|
|
|
|
|
// A threadsafe-queue. http://stackoverflow.com/a/16075550
|
|
|
|
template <class T>
|
2017-04-23 22:45:40 +00:00
|
|
|
struct ThreadedQueue : public BaseThreadQueue {
|
2017-03-25 19:18:25 +00:00
|
|
|
public:
|
2017-04-23 22:45:40 +00:00
|
|
|
ThreadedQueue(MultiQueueWaiter* waiter) : waiter_(waiter) {}
|
|
|
|
|
2017-04-20 05:46:10 +00:00
|
|
|
// Add an element to the front of the queue.
|
|
|
|
void PriorityEnqueue(T&& t) {
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
priority_.push(std::move(t));
|
2017-04-23 22:45:40 +00:00
|
|
|
waiter_->cv.notify_all();
|
2017-04-20 05:46:10 +00:00
|
|
|
}
|
|
|
|
|
2017-03-25 19:18:25 +00:00
|
|
|
// Add an element to the queue.
|
2017-04-08 06:45:28 +00:00
|
|
|
void Enqueue(T&& t) {
|
2017-03-25 19:18:25 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
2017-04-08 06:45:28 +00:00
|
|
|
queue_.push(std::move(t));
|
2017-04-23 22:45:40 +00:00
|
|
|
waiter_->cv.notify_all();
|
2017-03-25 19:18:25 +00:00
|
|
|
}
|
|
|
|
|
2017-04-16 21:49:48 +00:00
|
|
|
// Return all elements in the queue.
|
|
|
|
std::vector<T> DequeueAll() {
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
|
|
|
|
std::vector<T> result;
|
2017-04-20 05:46:10 +00:00
|
|
|
result.reserve(priority_.size() + queue_.size());
|
|
|
|
while (!priority_.empty()) {
|
|
|
|
result.emplace_back(std::move(priority_.front()));
|
|
|
|
priority_.pop();
|
|
|
|
}
|
2017-04-16 21:49:48 +00:00
|
|
|
while (!queue_.empty()) {
|
|
|
|
result.emplace_back(std::move(queue_.front()));
|
|
|
|
queue_.pop();
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2017-04-23 22:45:40 +00:00
|
|
|
bool IsEmpty() {
|
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
|
|
return queue_.empty();
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-03-25 19:18:25 +00:00
|
|
|
// Get the "front"-element.
|
2017-03-25 20:32:44 +00:00
|
|
|
// If the queue is empty, wait untill an element is avaiable.
|
2017-03-25 19:18:25 +00:00
|
|
|
T Dequeue() {
|
|
|
|
std::unique_lock<std::mutex> lock(mutex_);
|
2017-04-20 05:46:10 +00:00
|
|
|
while (priority_.empty() && queue_.empty()) {
|
2017-03-25 19:18:25 +00:00
|
|
|
// release lock as long as the wait and reaquire it afterwards.
|
|
|
|
cv_.wait(lock);
|
|
|
|
}
|
2017-04-08 06:45:28 +00:00
|
|
|
|
2017-04-20 05:46:10 +00:00
|
|
|
if (!priority_.empty()) {
|
|
|
|
auto val = std::move(priority_.front());
|
|
|
|
priority_.pop();
|
|
|
|
return val;
|
|
|
|
}
|
|
|
|
|
2017-04-08 06:45:28 +00:00
|
|
|
auto val = std::move(queue_.front());
|
2017-03-25 19:18:25 +00:00
|
|
|
queue_.pop();
|
|
|
|
return val;
|
|
|
|
}
|
2017-04-23 22:45:40 +00:00
|
|
|
*/
|
2017-03-25 19:18:25 +00:00
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
// Get the first element from the queue without blocking. Returns a null
|
|
|
|
// value if the queue is empty.
|
2017-03-25 19:18:25 +00:00
|
|
|
optional<T> TryDequeue() {
|
2017-04-16 21:49:48 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
2017-04-20 05:46:10 +00:00
|
|
|
if (priority_.empty() && queue_.empty())
|
2017-03-25 19:18:25 +00:00
|
|
|
return nullopt;
|
|
|
|
|
2017-04-20 05:46:10 +00:00
|
|
|
if (!priority_.empty()) {
|
|
|
|
auto val = std::move(priority_.front());
|
|
|
|
priority_.pop();
|
2017-04-21 00:16:54 +00:00
|
|
|
return std::move(val);
|
2017-04-20 05:46:10 +00:00
|
|
|
}
|
|
|
|
|
2017-04-08 06:45:28 +00:00
|
|
|
auto val = std::move(queue_.front());
|
2017-03-25 19:18:25 +00:00
|
|
|
queue_.pop();
|
2017-04-14 22:30:33 +00:00
|
|
|
return std::move(val);
|
2017-03-25 19:18:25 +00:00
|
|
|
}
|
|
|
|
|
2017-03-25 20:32:44 +00:00
|
|
|
private:
|
2017-04-20 05:46:10 +00:00
|
|
|
std::queue<T> priority_;
|
2017-03-25 19:18:25 +00:00
|
|
|
mutable std::mutex mutex_;
|
2017-04-23 22:45:40 +00:00
|
|
|
std::queue<T> queue_;
|
|
|
|
MultiQueueWaiter* waiter_;
|
2017-03-25 19:18:25 +00:00
|
|
|
};
|