Add ThreadedQueue::EnqueueAll to reduce lock contention.

This commit is contained in:
Jacob Dufault 2017-08-16 19:14:54 -07:00
parent 28ddc9f346
commit 58fbd04142
2 changed files with 14 additions and 5 deletions

View File

@ -998,18 +998,18 @@ bool IndexMain_DoParse(
entry.args = request->args; entry.args = request->args;
std::vector<Index_DoIdMap> responses = ParseFile(config, index, file_consumer_shared, timestamp_manager, entry); std::vector<Index_DoIdMap> responses = ParseFile(config, index, file_consumer_shared, timestamp_manager, entry);
// TODO/FIXME: bulk enqueue so we don't lock so many times if (responses.empty())
for (Index_DoIdMap& response : responses) return false;
queue->do_id_map.Enqueue(std::move(response));
return !responses.empty(); // EnqueueAll will clear |responses|.
queue->do_id_map.EnqueueAll(std::move(responses));
return true;
} }
bool IndexMain_DoCreateIndexUpdate( bool IndexMain_DoCreateIndexUpdate(
Config* config, Config* config,
QueueManager* queue, QueueManager* queue,
TimestampManager* timestamp_manager) { TimestampManager* timestamp_manager) {
// TODO: Index_OnIdMapped dtor is failing because it seems that its contents have already been destroyed.
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue(); optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue();
if (!response) if (!response)
return false; return false;

View File

@ -71,6 +71,15 @@ public:
waiter_->cv.notify_all(); waiter_->cv.notify_all();
} }
// Add a set of elements to the queue.
void EnqueueAll(std::vector<T>&& elements) {
std::lock_guard<std::mutex> lock(mutex_);
for (T& element : elements)
queue_.push(std::move(element));
elements.clear();
waiter_->cv.notify_all();
}
// Return all elements in the queue. // Return all elements in the queue.
std::vector<T> DequeueAll() { std::vector<T> DequeueAll() {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);