diff --git a/src/command_line.cc b/src/command_line.cc index 57446403..e3fbd9b4 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -199,8 +199,7 @@ void RunQueryDbThread(const std::string& bin_name, if (!did_work) { auto* queue = QueueManager::instance(); - waiter->Wait({&QueueManager::instance()->for_querydb, &queue->do_id_map, - &queue->on_indexed}); + waiter->Wait(&queue->on_indexed, &queue->for_querydb, &queue->do_id_map); } } } @@ -319,7 +318,7 @@ void LaunchStdoutThread(std::unordered_map* request_times, while (true) { std::vector messages = queue->for_stdout.DequeueAll(); if (messages.empty()) { - waiter->Wait({&queue->for_stdout}); + waiter->Wait(&queue->for_stdout); continue; } diff --git a/src/import_pipeline.cc b/src/import_pipeline.cc index a8e34a50..27ca3215 100644 --- a/src/import_pipeline.cc +++ b/src/import_pipeline.cc @@ -454,8 +454,8 @@ void Indexer_Main(Config* config, // We didn't do any work, so wait for a notification. if (!did_parse && !did_create_update && !did_merge && !did_load_previous) { - waiter->Wait({&queue->index_request, &queue->on_id_mapped, - &queue->load_previous_index, &queue->on_indexed}); + waiter->Wait(&queue->on_indexed, &queue->index_request, + &queue->on_id_mapped, &queue->load_previous_index); } } } @@ -610,4 +610,4 @@ TEST_SUITE("ImportPipeline") { REQUIRE(queue->do_id_map.Size() == 1); } } -#endif \ No newline at end of file +#endif diff --git a/src/threaded_queue.h b/src/threaded_queue.h index 7b6c1f1f..414ec933 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -10,6 +10,7 @@ #include #include #include +#include // TODO: cleanup includes. @@ -18,11 +19,67 @@ struct BaseThreadQueue { virtual ~BaseThreadQueue() = default; }; -struct MultiQueueWaiter { - std::mutex m; - std::condition_variable cv; +// TODO Remove after migration to C++14 +namespace { - bool HasState(std::initializer_list queues) { +template +struct index_sequence {}; + +template +struct make_index_sequence { + using type = typename make_index_sequence::type; +}; + +template +struct make_index_sequence<0, Is...> { + using type = index_sequence; +}; + +} + +// std::lock accepts two or more arguments. We define an overload for one +// argument. +namespace std { +template +void lock(Lockable& l) { + l.lock(); +} +} + +template +struct MultiQueueLock { + MultiQueueLock(Queue... lockable) : tuple_{lockable...} { + lock(); + } + ~MultiQueueLock() { + unlock(); + } + void lock() { + lock_impl(typename make_index_sequence::type{}); + } + void unlock() { + unlock_impl(typename make_index_sequence::type{}); + } + + private: + template + void lock_impl(index_sequence) { + std::lock(std::get(tuple_)->mutex_...); + } + + template + void unlock_impl(index_sequence) { + (void)std::initializer_list{ + (std::get(tuple_)->mutex_.unlock(), 0)...}; + } + + std::tuple tuple_; +}; + +struct MultiQueueWaiter { + std::condition_variable_any cv; + + static bool HasState(std::initializer_list queues) { for (BaseThreadQueue* queue : queues) { if (!queue->IsEmpty()) return true; @@ -30,23 +87,11 @@ struct MultiQueueWaiter { return false; } - void Wait(std::initializer_list queues) { - // We cannot have a single condition variable wait on all of the different - // mutexes, so we have a global condition variable that every queue will - // notify. After it is notified we check to see if any of the queues have - // data; if they do, we return. - // - // We repoll every 5 seconds because it's not possible to atomically check - // the state of every queue and then setup the condition variable. So, if - // Wait() is called, HasState() returns false, and then in the time after - // HasState() is called data gets posted but before we begin waiting for - // the condition variable, we will miss the notification. The timeout of 5 - // means that if this happens we will delay operation for 5 seconds. - - while (!HasState(queues)) { - std::unique_lock l(m); - cv.wait_for(l, std::chrono::seconds(5)); - } + template + void Wait(BaseThreadQueue... queues) { + MultiQueueLock l(queues...); + while (!HasState({queues...})) + cv.wait(l); } }; @@ -174,10 +219,11 @@ struct ThreadedQueue : public BaseThreadQueue { return TryDequeuePlusAction([](const T&) {}); } + mutable std::mutex mutex_; + private: std::atomic total_count_; std::queue priority_; - mutable std::mutex mutex_; std::queue queue_; MultiQueueWaiter* waiter_; std::unique_ptr owned_waiter_;