This commit is contained in:
Fangrui Song 2018-05-01 22:52:19 -07:00
parent eb057c7acd
commit 13c451a7cd
5 changed files with 7 additions and 22 deletions

View File

@ -438,12 +438,12 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {
IndexUpdate update = IndexUpdate::CreateDelta(response->previous.get(), IndexUpdate update = IndexUpdate::CreateDelta(response->previous.get(),
response->current.get()); response->current.get());
response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset(); response->perf.index_make_delta = time.ElapsedMicrosecondsAndReset();
LOG_S(INFO) << "Built index update for " << response->current->path LOG_S(INFO) << "built index for " << response->current->path
<< " (is_delta=" << !!response->previous << ")"; << " (is_delta=" << !!response->previous << ")";
// Write current index to disk if requested. // Write current index to disk if requested.
if (response->write_to_disk) { if (response->write_to_disk) {
LOG_S(INFO) << "Writing index to disk for " << response->current->path; LOG_S(INFO) << "store index for " << response->current->path;
time.Reset(); time.Reset();
response->cache_manager->WriteToCache(*response->current); response->cache_manager->WriteToCache(*response->current);
response->perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset(); response->perf.index_save_to_disk = time.ElapsedMicrosecondsAndReset();

View File

@ -2,8 +2,6 @@
#include "serializer.h" #include "serializer.h"
#include <cstdint>
// Contains timing information for the entire pipeline for importing a file // Contains timing information for the entire pipeline for importing a file
// into the querydb. // into the querydb.
struct PerformanceImportFile { struct PerformanceImportFile {

View File

@ -50,10 +50,9 @@ QueueManager::QueueManager(MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* stdout_waiter) MultiQueueWaiter* stdout_waiter)
: for_stdout(stdout_waiter), : for_stdout(stdout_waiter),
for_querydb(querydb_waiter), for_querydb(querydb_waiter),
on_indexed(querydb_waiter),
index_request(indexer_waiter), index_request(indexer_waiter),
on_id_mapped(indexer_waiter), on_id_mapped(indexer_waiter) {}
// TODO on_indexed is shared by "querydb" and "indexer"
on_indexed(querydb_waiter, indexer_waiter) {}
bool QueueManager::HasWork() { bool QueueManager::HasWork() {
return !index_request.IsEmpty() || !on_id_mapped.IsEmpty() || return !index_request.IsEmpty() || !on_id_mapped.IsEmpty() ||

View File

@ -79,15 +79,12 @@ class QueueManager {
// Runs on querydb thread. // Runs on querydb thread.
ThreadedQueue<std::unique_ptr<InMessage>> for_querydb; ThreadedQueue<std::unique_ptr<InMessage>> for_querydb;
ThreadedQueue<Index_OnIndexed> on_indexed;
// Runs on indexer threads. // Runs on indexer threads.
ThreadedQueue<Index_Request> index_request; ThreadedQueue<Index_Request> index_request;
ThreadedQueue<Index_OnIdMapped> on_id_mapped; ThreadedQueue<Index_OnIdMapped> on_id_mapped;
// Shared by querydb and indexer.
// TODO split on_indexed
ThreadedQueue<Index_OnIndexed> on_indexed;
private: private:
explicit QueueManager(MultiQueueWaiter* querydb_waiter, explicit QueueManager(MultiQueueWaiter* querydb_waiter,
MultiQueueWaiter* indexer_waiter, MultiQueueWaiter* indexer_waiter,

View File

@ -72,14 +72,10 @@ struct ThreadedQueue : public BaseThreadQueue {
ThreadedQueue() : total_count_(0) { ThreadedQueue() : total_count_(0) {
owned_waiter_ = std::make_unique<MultiQueueWaiter>(); owned_waiter_ = std::make_unique<MultiQueueWaiter>();
waiter_ = owned_waiter_.get(); waiter_ = owned_waiter_.get();
owned_waiter1_ = std::make_unique<MultiQueueWaiter>();
waiter1_ = owned_waiter1_.get();
} }
// TODO remove waiter1 after split of on_indexed explicit ThreadedQueue(MultiQueueWaiter* waiter)
explicit ThreadedQueue(MultiQueueWaiter* waiter, : total_count_(0), waiter_(waiter) {}
MultiQueueWaiter* waiter1 = nullptr)
: total_count_(0), waiter_(waiter), waiter1_(waiter1) {}
// Returns the number of elements in the queue. This is lock-free. // Returns the number of elements in the queue. This is lock-free.
size_t Size() const { return total_count_; } size_t Size() const { return total_count_; }
@ -94,8 +90,6 @@ struct ThreadedQueue : public BaseThreadQueue {
(queue_.*push)(std::move(t)); (queue_.*push)(std::move(t));
++total_count_; ++total_count_;
waiter_->cv.notify_one(); waiter_->cv.notify_one();
if (waiter1_)
waiter1_->cv.notify_one();
} }
void PushFront(T&& t, bool priority = false) { void PushFront(T&& t, bool priority = false) {
@ -222,7 +216,4 @@ struct ThreadedQueue : public BaseThreadQueue {
std::deque<T> queue_; std::deque<T> queue_;
MultiQueueWaiter* waiter_; MultiQueueWaiter* waiter_;
std::unique_ptr<MultiQueueWaiter> owned_waiter_; std::unique_ptr<MultiQueueWaiter> owned_waiter_;
// TODO remove waiter1 after split of on_indexed
MultiQueueWaiter* waiter1_;
std::unique_ptr<MultiQueueWaiter> owned_waiter1_;
}; };