From 58fbd0414291c8e8b7cc30b914fc6eb4e582cbc5 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Wed, 16 Aug 2017 19:14:54 -0700 Subject: [PATCH] Add ThreadedQueue::EnqueueAll to reduce lock contention. --- src/command_line.cc | 10 +++++----- src/threaded_queue.h | 9 +++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/command_line.cc b/src/command_line.cc index 88773838..aa843512 100644 --- a/src/command_line.cc +++ b/src/command_line.cc @@ -998,18 +998,18 @@ bool IndexMain_DoParse( entry.args = request->args; std::vector responses = ParseFile(config, index, file_consumer_shared, timestamp_manager, entry); - // TODO/FIXME: bulk enqueue so we don't lock so many times - for (Index_DoIdMap& response : responses) - queue->do_id_map.Enqueue(std::move(response)); + if (responses.empty()) + return false; - return !responses.empty(); + // EnqueueAll will clear |responses|. + queue->do_id_map.EnqueueAll(std::move(responses)); + return true; } bool IndexMain_DoCreateIndexUpdate( Config* config, QueueManager* queue, TimestampManager* timestamp_manager) { - // TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed. optional response = queue->on_id_mapped.TryDequeue(); if (!response) return false; diff --git a/src/threaded_queue.h b/src/threaded_queue.h index 3240c801..93f7a1e3 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -71,6 +71,15 @@ public: waiter_->cv.notify_all(); } + // Add a set of elements to the queue. + void EnqueueAll(std::vector&& elements) { + std::lock_guard lock(mutex_); + for (T& element : elements) + queue_.push(std::move(element)); + elements.clear(); + waiter_->cv.notify_all(); + } + // Return all elements in the queue. std::vector DequeueAll() { std::lock_guard lock(mutex_);