#pragma once #include "utils.h" #include "work_thread.h" #include #include #include #include #include #include #include // TODO: cleanup includes. struct BaseThreadQueue { virtual bool IsEmpty() = 0; virtual ~BaseThreadQueue() = default; }; // TODO Remove after migration to C++14 namespace { template struct index_sequence {}; template struct make_index_sequence { using type = typename make_index_sequence::type; }; template struct make_index_sequence<0, Is...> { using type = index_sequence; }; } // namespace // std::lock accepts two or more arguments. We define an overload for one // argument. namespace std { template void lock(Lockable& l) { l.lock(); } } // namespace std template struct MultiQueueLock { MultiQueueLock(Queue... lockable) : tuple_{lockable...} { lock(); } ~MultiQueueLock() { unlock(); } void lock() { lock_impl(typename make_index_sequence::type{}); } void unlock() { unlock_impl(typename make_index_sequence::type{}); } private: template void lock_impl(index_sequence) { std::lock(std::get(tuple_)->mutex_...); } template void unlock_impl(index_sequence) { (void)std::initializer_list{ (std::get(tuple_)->mutex_.unlock(), 0)...}; } std::tuple tuple_; }; struct MultiQueueWaiter { std::condition_variable_any cv; static bool HasState(std::initializer_list queues) { for (BaseThreadQueue* queue : queues) { if (!queue->IsEmpty()) return true; } return false; } template void Wait(BaseThreadQueue... queues) { MultiQueueLock l(queues...); while (!HasState({queues...})) cv.wait(l); } }; // A threadsafe-queue. http://stackoverflow.com/a/16075550 template struct ThreadedQueue : public BaseThreadQueue { public: ThreadedQueue() : total_count_(0) { owned_waiter_ = MakeUnique(); waiter_ = owned_waiter_.get(); owned_waiter1_ = MakeUnique(); waiter1_ = owned_waiter1_.get(); } // TODO remove waiter1 after split of on_indexed explicit ThreadedQueue(MultiQueueWaiter* waiter, MultiQueueWaiter* waiter1 = nullptr) : total_count_(0), waiter_(waiter), waiter1_(waiter1) {} // Returns the number of elements in the queue. This is lock-free. size_t Size() const { return total_count_; } // Add an element to the front of the queue. void PriorityEnqueue(T&& t) { std::lock_guard lock(mutex_); priority_.push(std::move(t)); ++total_count_; waiter_->cv.notify_one(); if (waiter1_) waiter1_->cv.notify_one(); } // Add an element to the queue. void Enqueue(T&& t) { std::lock_guard lock(mutex_); queue_.push(std::move(t)); ++total_count_; waiter_->cv.notify_one(); if (waiter1_) waiter1_->cv.notify_one(); } // Add a set of elements to the queue. void EnqueueAll(std::vector&& elements) { if (elements.empty()) return; std::lock_guard lock(mutex_); total_count_ += elements.size(); for (T& element : elements) { queue_.push(std::move(element)); } elements.clear(); waiter_->cv.notify_all(); } // Return all elements in the queue. std::vector DequeueAll() { std::lock_guard lock(mutex_); total_count_ = 0; std::vector result; result.reserve(priority_.size() + queue_.size()); while (!priority_.empty()) { result.emplace_back(std::move(priority_.front())); priority_.pop(); } while (!queue_.empty()) { result.emplace_back(std::move(queue_.front())); queue_.pop(); } return result; } // Returns true if the queue is empty. This is lock-free. bool IsEmpty() { return total_count_ == 0; } // TODO: Unify code between DequeuePlusAction with TryDequeuePlusAction. // Probably have opt Dequeue(bool wait_for_element); // Get the first element from the queue. Blocks until one is available. // Executes |action| with an acquired |mutex_|. template T DequeuePlusAction(TAction action) { std::unique_lock lock(mutex_); waiter_->cv.wait(lock, [&]() { return !priority_.empty() || !queue_.empty(); }); auto execute = [&](std::queue* q) { auto val = std::move(q->front()); q->pop(); --total_count_; action(); return std::move(val); }; if (!priority_.empty()) return execute(&priority_); return execute(&queue_); } // Get the first element from the queue. Blocks until one is available. T Dequeue() { return DequeuePlusAction([]() {}); } // Get the first element from the queue without blocking. Returns a null // value if the queue is empty. template optional TryDequeuePlusAction(TAction action) { std::lock_guard lock(mutex_); if (priority_.empty() && queue_.empty()) return nullopt; auto execute = [&](std::queue* q) { auto val = std::move(q->front()); q->pop(); --total_count_; action(val); return std::move(val); }; if (!priority_.empty()) return execute(&priority_); return execute(&queue_); } optional TryDequeue() { return TryDequeuePlusAction([](const T&) {}); } mutable std::mutex mutex_; private: std::atomic total_count_; std::queue priority_; std::queue queue_; MultiQueueWaiter* waiter_; std::unique_ptr owned_waiter_; // TODO remove waiter1 after split of on_indexed MultiQueueWaiter* waiter1_; std::unique_ptr owned_waiter1_; };