From c694b56bc157b837cfa661cf18a2a75561930277 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Sat, 28 Oct 2017 14:50:57 -0700 Subject: [PATCH] Reduce some locking in ThreadedQueue for status updates --- src/threaded_queue.h | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/threaded_queue.h b/src/threaded_queue.h index 74061c4c..65e7a859 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -12,6 +12,11 @@ using std::experimental::nullopt; using std::experimental::optional; +// If defined, some correctness checks will be enabled. These have a runtime +// performance cost and are disabled by default. +// +// #define ENABLE_QUEUE_CHECKS + // TODO: cleanup includes. struct BaseThreadQueue { @@ -66,14 +71,19 @@ struct ThreadedQueue : public BaseThreadQueue { // Returns the number of elements in the queue. This acquires a lock. size_t Size() const { +#ifdef ENABLE_QUEUE_CHECKS std::lock_guard lock(mutex_); - return priority_.size() + queue_.size(); + int count = priority_.size() + queue_.size(); + assert(total_count_ == count); +#endif + return total_count_; } // Add an element to the front of the queue. void PriorityEnqueue(T&& t) { std::lock_guard lock(mutex_); priority_.push(std::move(t)); + ++total_count_; waiter_->cv.notify_all(); } @@ -81,14 +91,17 @@ struct ThreadedQueue : public BaseThreadQueue { void Enqueue(T&& t) { std::lock_guard lock(mutex_); queue_.push(std::move(t)); + ++total_count_; waiter_->cv.notify_all(); } // Add a set of elements to the queue. void EnqueueAll(std::vector&& elements) { std::lock_guard lock(mutex_); - for (T& element : elements) + for (T& element : elements) { queue_.push(std::move(element)); + ++total_count_; + } elements.clear(); waiter_->cv.notify_all(); } @@ -102,17 +115,23 @@ struct ThreadedQueue : public BaseThreadQueue { while (!priority_.empty()) { result.emplace_back(std::move(priority_.front())); priority_.pop(); + --total_count_; } while (!queue_.empty()) { result.emplace_back(std::move(queue_.front())); queue_.pop(); + --total_count_; } return result; } bool IsEmpty() { +#ifdef ENABLE_QUEUE_CHECKS std::lock_guard lock(mutex_); - return priority_.empty() && queue_.empty(); + bool empty = priority_.empty() && queue_.empty(); + assert(empty == (total_count_ == 0)); +#endif + return total_count_ == 0; } // Get the first element from the queue. Blocks until one is available. @@ -126,6 +145,7 @@ struct ThreadedQueue : public BaseThreadQueue { if (!priority_.empty()) { auto val = std::move(priority_.front()); priority_.pop(); + --total_count_; return std::move(val); } @@ -153,11 +173,13 @@ struct ThreadedQueue : public BaseThreadQueue { if (!priority_.empty()) { auto val = std::move(priority_.front()); priority_.pop(); + --total_count_; return std::move(val); } auto val = std::move(queue_.front()); queue_.pop(); + --total_count_; action(val); @@ -169,6 +191,7 @@ struct ThreadedQueue : public BaseThreadQueue { } private: + std::atomic total_count_ = 0; std::queue priority_; mutable std::mutex mutex_; std::queue queue_;