Fix threaded_queue

This commit is contained in:
Jacob Dufault 2017-10-31 12:37:52 -07:00
parent 4156be09c1
commit 5f04e390a2

View File

@ -12,11 +12,6 @@
using std::experimental::nullopt; using std::experimental::nullopt;
using std::experimental::optional; using std::experimental::optional;
// If defined, some correctness checks will be enabled. These have a runtime
// performance cost and are disabled by default.
//
// #define ENABLE_QUEUE_CHECKS
// TODO: cleanup includes. // TODO: cleanup includes.
struct BaseThreadQueue { struct BaseThreadQueue {
@ -62,22 +57,16 @@ struct MultiQueueWaiter {
template <class T> template <class T>
struct ThreadedQueue : public BaseThreadQueue { struct ThreadedQueue : public BaseThreadQueue {
public: public:
ThreadedQueue() { ThreadedQueue() : total_count_(0) {
owned_waiter_ = MakeUnique<MultiQueueWaiter>(); owned_waiter_ = MakeUnique<MultiQueueWaiter>();
waiter_ = owned_waiter_.get(); waiter_ = owned_waiter_.get();
} }
explicit ThreadedQueue(MultiQueueWaiter* waiter) : waiter_(waiter) {} explicit ThreadedQueue(MultiQueueWaiter* waiter)
: total_count_(0), waiter_(waiter) {}
// Returns the number of elements in the queue. This acquires a lock. // Returns the number of elements in the queue. This is lock-free.
size_t Size() const { size_t Size() const { return total_count_; }
#ifdef ENABLE_QUEUE_CHECKS
std::lock_guard<std::mutex> lock(mutex_);
int count = priority_.size() + queue_.size();
assert(total_count_ == count);
#endif
return total_count_;
}
// Add an element to the front of the queue. // Add an element to the front of the queue.
void PriorityEnqueue(T&& t) { void PriorityEnqueue(T&& t) {
@ -98,11 +87,14 @@ struct ThreadedQueue : public BaseThreadQueue {
// Add a set of elements to the queue. // Add a set of elements to the queue.
void EnqueueAll(std::vector<T>&& elements) { void EnqueueAll(std::vector<T>&& elements) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
total_count_ += elements.size();
for (T& element : elements) { for (T& element : elements) {
queue_.push(std::move(element)); queue_.push(std::move(element));
++total_count_;
} }
elements.clear(); elements.clear();
waiter_->cv.notify_all(); waiter_->cv.notify_all();
} }
@ -110,29 +102,27 @@ struct ThreadedQueue : public BaseThreadQueue {
std::vector<T> DequeueAll() { std::vector<T> DequeueAll() {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
total_count_ = 0;
std::vector<T> result; std::vector<T> result;
result.reserve(priority_.size() + queue_.size()); result.reserve(priority_.size() + queue_.size());
while (!priority_.empty()) { while (!priority_.empty()) {
result.emplace_back(std::move(priority_.front())); result.emplace_back(std::move(priority_.front()));
priority_.pop(); priority_.pop();
--total_count_;
} }
while (!queue_.empty()) { while (!queue_.empty()) {
result.emplace_back(std::move(queue_.front())); result.emplace_back(std::move(queue_.front()));
queue_.pop(); queue_.pop();
--total_count_;
} }
return result; return result;
} }
bool IsEmpty() { // Returns true if the queue is empty. This is lock-free.
#ifdef ENABLE_QUEUE_CHECKS bool IsEmpty() { return total_count_ == 0; }
std::lock_guard<std::mutex> lock(mutex_);
bool empty = priority_.empty() && queue_.empty(); // TODO: Unify code between DequeuePlusAction with TryDequeuePlusAction.
assert(empty == (total_count_ == 0)); // Probably have opt<T> Dequeue(bool wait_for_element);
#endif
return total_count_ == 0;
}
// Get the first element from the queue. Blocks until one is available. // Get the first element from the queue. Blocks until one is available.
// Executes |action| with an acquired |mutex_|. // Executes |action| with an acquired |mutex_|.
@ -142,19 +132,18 @@ struct ThreadedQueue : public BaseThreadQueue {
waiter_->cv.wait(lock, waiter_->cv.wait(lock,
[&]() { return !priority_.empty() || !queue_.empty(); }); [&]() { return !priority_.empty() || !queue_.empty(); });
if (!priority_.empty()) { auto execute = [&](std::queue<T>* q) {
auto val = std::move(priority_.front()); auto val = std::move(q->front());
priority_.pop(); q->pop();
--total_count_; --total_count_;
return std::move(val);
}
auto val = std::move(queue_.front());
queue_.pop();
action(); action();
return std::move(val); return std::move(val);
};
if (!priority_.empty())
return execute(&priority_);
return execute(&queue_);
} }
// Get the first element from the queue. Blocks until one is available. // Get the first element from the queue. Blocks until one is available.
@ -170,20 +159,18 @@ struct ThreadedQueue : public BaseThreadQueue {
if (priority_.empty() && queue_.empty()) if (priority_.empty() && queue_.empty())
return nullopt; return nullopt;
if (!priority_.empty()) { auto execute = [&](std::queue<T>* q) {
auto val = std::move(priority_.front()); auto val = std::move(q->front());
priority_.pop(); q->pop();
--total_count_;
return std::move(val);
}
auto val = std::move(queue_.front());
queue_.pop();
--total_count_; --total_count_;
action(val); action(val);
return std::move(val); return std::move(val);
};
if (!priority_.empty())
return execute(&priority_);
return execute(&queue_);
} }
optional<T> TryDequeue() { optional<T> TryDequeue() {
@ -191,7 +178,7 @@ struct ThreadedQueue : public BaseThreadQueue {
} }
private: private:
std::atomic<int> total_count_ = 0; std::atomic<int> total_count_;
std::queue<T> priority_; std::queue<T> priority_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::queue<T> queue_; std::queue<T> queue_;