Replace MultiQueueWaiter timeout with condition_variable_any on multiple mutexes

Inspired by https://github.com/jacobdufault/cquery/pull/213
This commit is contained in:
Fangrui Song 2018-01-01 15:09:46 -08:00
parent 6fa92f7968
commit f68e70f5b7
3 changed files with 73 additions and 28 deletions

View File

@ -199,8 +199,7 @@ void RunQueryDbThread(const std::string& bin_name,
if (!did_work) {
auto* queue = QueueManager::instance();
waiter->Wait({&QueueManager::instance()->for_querydb, &queue->do_id_map,
&queue->on_indexed});
waiter->Wait(&queue->on_indexed, &queue->for_querydb, &queue->do_id_map);
}
}
}
@ -319,7 +318,7 @@ void LaunchStdoutThread(std::unordered_map<IpcId, Timer>* request_times,
while (true) {
std::vector<Stdout_Request> messages = queue->for_stdout.DequeueAll();
if (messages.empty()) {
waiter->Wait({&queue->for_stdout});
waiter->Wait(&queue->for_stdout);
continue;
}

View File

@ -454,8 +454,8 @@ void Indexer_Main(Config* config,
// We didn't do any work, so wait for a notification.
if (!did_parse && !did_create_update && !did_merge && !did_load_previous) {
waiter->Wait({&queue->index_request, &queue->on_id_mapped,
&queue->load_previous_index, &queue->on_indexed});
waiter->Wait(&queue->on_indexed, &queue->index_request,
&queue->on_id_mapped, &queue->load_previous_index);
}
}
}
@ -610,4 +610,4 @@ TEST_SUITE("ImportPipeline") {
REQUIRE(queue->do_id_map.Size() == 1);
}
}
#endif
#endif

View File

@ -10,6 +10,7 @@
#include <condition_variable>
#include <mutex>
#include <queue>
#include <tuple>
// TODO: cleanup includes.
@ -18,11 +19,67 @@ struct BaseThreadQueue {
virtual ~BaseThreadQueue() = default;
};
struct MultiQueueWaiter {
std::mutex m;
std::condition_variable cv;
// TODO Remove after migration to C++14
namespace {
bool HasState(std::initializer_list<BaseThreadQueue*> queues) {
template <size_t... Is>
struct index_sequence {};
template <size_t I, size_t... Is>
struct make_index_sequence {
using type = typename make_index_sequence<I-1, I-1, Is...>::type;
};
template <size_t... Is>
struct make_index_sequence<0, Is...> {
using type = index_sequence<Is...>;
};
}
// std::lock accepts two or more arguments. We define an overload for one
// argument.
namespace std {
template <typename Lockable>
void lock(Lockable& l) {
l.lock();
}
}
template <typename... Queue>
struct MultiQueueLock {
MultiQueueLock(Queue... lockable) : tuple_{lockable...} {
lock();
}
~MultiQueueLock() {
unlock();
}
void lock() {
lock_impl(typename make_index_sequence<sizeof...(Queue)>::type{});
}
void unlock() {
unlock_impl(typename make_index_sequence<sizeof...(Queue)>::type{});
}
private:
template <size_t... Is>
void lock_impl(index_sequence<Is...>) {
std::lock(std::get<Is>(tuple_)->mutex_...);
}
template <size_t... Is>
void unlock_impl(index_sequence<Is...>) {
(void)std::initializer_list<int>{
(std::get<Is>(tuple_)->mutex_.unlock(), 0)...};
}
std::tuple<Queue...> tuple_;
};
struct MultiQueueWaiter {
std::condition_variable_any cv;
static bool HasState(std::initializer_list<BaseThreadQueue*> queues) {
for (BaseThreadQueue* queue : queues) {
if (!queue->IsEmpty())
return true;
@ -30,23 +87,11 @@ struct MultiQueueWaiter {
return false;
}
void Wait(std::initializer_list<BaseThreadQueue*> queues) {
// We cannot have a single condition variable wait on all of the different
// mutexes, so we have a global condition variable that every queue will
// notify. After it is notified we check to see if any of the queues have
// data; if they do, we return.
//
// We repoll every 5 seconds because it's not possible to atomically check
// the state of every queue and then setup the condition variable. So, if
// Wait() is called, HasState() returns false, and then in the time after
// HasState() is called data gets posted but before we begin waiting for
// the condition variable, we will miss the notification. The timeout of 5
// means that if this happens we will delay operation for 5 seconds.
while (!HasState(queues)) {
std::unique_lock<std::mutex> l(m);
cv.wait_for(l, std::chrono::seconds(5));
}
template <typename... BaseThreadQueue>
void Wait(BaseThreadQueue... queues) {
MultiQueueLock<BaseThreadQueue...> l(queues...);
while (!HasState({queues...}))
cv.wait(l);
}
};
@ -174,10 +219,11 @@ struct ThreadedQueue : public BaseThreadQueue {
return TryDequeuePlusAction([](const T&) {});
}
mutable std::mutex mutex_;
private:
std::atomic<int> total_count_;
std::queue<T> priority_;
mutable std::mutex mutex_;
std::queue<T> queue_;
MultiQueueWaiter* waiter_;
std::unique_ptr<MultiQueueWaiter> owned_waiter_;