From 13c451a7cd2255829f00291a8e8ba8658af1a7a1 Mon Sep 17 00:00:00 2001 From: Fangrui Song Date: Tue, 1 May 2018 22:52:19 -0700 Subject: [PATCH] . --- src/import_pipeline.cc | 4 ++-- src/performance.h | 2 -- src/queue_manager.cc | 5 ++--- src/queue_manager.h | 5 +---- src/threaded_queue.h | 13 ++----------- 5 files changed, 7 insertions(+), 22 deletions(-) diff --git a/src/import_pipeline.cc b/src/import_pipeline.cc index ea3448d9..0d1f2cd9 100644 --- a/src/import_pipeline.cc +++ b/src/import_pipeline.cc @@ -438,12 +438,12 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) { IndexUpdate update = IndexUpdate::CreateDelta(response->previous.get(), response->current.get()); response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset(); - LOG_S(INFO) << "Built index update for " << response->current->path + LOG_S(INFO) << "built index for " << response->current->path << " (is_delta=" << !!response->previous << ")"; // Write current index to disk if requested. if (response->write_to_disk) { - LOG_S(INFO) << "Writing index to disk for " << response->current->path; + LOG_S(INFO) << "store index for " << response->current->path; time.Reset(); response->cache_manager->WriteToCache(*response->current); response->perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset(); diff --git a/src/performance.h b/src/performance.h index e79b810a..28579f9c 100644 --- a/src/performance.h +++ b/src/performance.h @@ -2,8 +2,6 @@ #include "serializer.h" -#include - // Contains timing information for the entire pipeline for importing a file // into the querydb. struct PerformanceImportFile { diff --git a/src/queue_manager.cc b/src/queue_manager.cc index 0a06c8e2..b4ec6ad1 100644 --- a/src/queue_manager.cc +++ b/src/queue_manager.cc @@ -50,10 +50,9 @@ QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter, MultiQueueWaiter* stdout_waiter) : for_stdout(stdout_waiter), for_querydb(querydb_waiter), + on_indexed(querydb_waiter), index_request(indexer_waiter), - on_id_mapped(indexer_waiter), - // TODO on_indexed is shared by "querydb" and "indexer" - on_indexed(querydb_waiter, indexer_waiter) {} + on_id_mapped(indexer_waiter) {} bool QueueManager::HasWork() { return !index_request.IsEmpty() || !on_id_mapped.IsEmpty() || diff --git a/src/queue_manager.h b/src/queue_manager.h index 793a7655..10c9d121 100644 --- a/src/queue_manager.h +++ b/src/queue_manager.h @@ -79,15 +79,12 @@ class QueueManager { // Runs on querydb thread. ThreadedQueue> for_querydb; + ThreadedQueue on_indexed; // Runs on indexer threads. ThreadedQueue index_request; ThreadedQueue on_id_mapped; - // Shared by querydb and indexer. - // TODO split on_indexed - ThreadedQueue on_indexed; - private: explicit QueueManager(MultiQueueWaiter* querydb_waiter, MultiQueueWaiter* indexer_waiter, diff --git a/src/threaded_queue.h b/src/threaded_queue.h index c14fed3b..70e50252 100644 --- a/src/threaded_queue.h +++ b/src/threaded_queue.h @@ -72,14 +72,10 @@ struct ThreadedQueue : public BaseThreadQueue { ThreadedQueue() : total_count_(0) { owned_waiter_ = std::make_unique(); waiter_ = owned_waiter_.get(); - owned_waiter1_ = std::make_unique(); - waiter1_ = owned_waiter1_.get(); } - // TODO remove waiter1 after split of on_indexed - explicit ThreadedQueue(MultiQueueWaiter* waiter, - MultiQueueWaiter* waiter1 = nullptr) - : total_count_(0), waiter_(waiter), waiter1_(waiter1) {} + explicit ThreadedQueue(MultiQueueWaiter* waiter) + : total_count_(0), waiter_(waiter) {} // Returns the number of elements in the queue. This is lock-free. size_t Size() const { return total_count_; } @@ -94,8 +90,6 @@ struct ThreadedQueue : public BaseThreadQueue { (queue_.*push)(std::move(t)); ++total_count_; waiter_->cv.notify_one(); - if (waiter1_) - waiter1_->cv.notify_one(); } void PushFront(T&& t, bool priority = false) { @@ -222,7 +216,4 @@ struct ThreadedQueue : public BaseThreadQueue { std::deque queue_; MultiQueueWaiter* waiter_; std::unique_ptr owned_waiter_; - // TODO remove waiter1 after split of on_indexed - MultiQueueWaiter* waiter1_; - std::unique_ptr owned_waiter1_; };