From c89f651cd899b63bcb52a05c1d34ae914603e323 Mon Sep 17 00:00:00 2001 From: Jacob Dufault Date: Tue, 1 Aug 2017 20:23:06 -0700 Subject: [PATCH] More iteration on task system --- src/task.cc | 154 ++++++++++++++++++++++++---------------------------- src/task.h | 42 ++++++++++++++ 2 files changed, 114 insertions(+), 82 deletions(-) create mode 100644 src/task.h diff --git a/src/task.cc b/src/task.cc index 83143235..cbf5b81c 100644 --- a/src/task.cc +++ b/src/task.cc @@ -1,105 +1,62 @@ +#include "task.h" + #include "utils.h" #include -#include - -#include -#include -#include -#include -#include - -using std::experimental::optional; -using std::experimental::nullopt; - -enum class TaskTargetThread { - Indexer, - QueryDb, -}; - -// TODO: IdleTask returns a bool indicating if it did work. -// TODO: Hookup IdleTask -// TODO: Move target_thread out of task and into PostTask. - -struct Task { - // The thread the task will execute on. - TaskTargetThread target_thread; - - // The action the task will perform. - using TAction = std::function; - TAction action; - - Task(TaskTargetThread target, const TAction& action); -}; - -Task::Task(TaskTargetThread target, const TAction& action) - : target_thread(target), action(action) {} - - -struct TaskQueue { - optional idle_task; - std::vector tasks; - std::mutex tasks_mutex; -}; - -struct TaskManager { - TaskManager(); - - // Run |task| at some point in the future. This will run the task as soon as possible. - void PostTask(const Task& task); - - // Run |task| whenever there is nothing else to run. - void SetIdleTask(const Task& task); - - // Run pending tasks for |thread|. Stop running tasks after |max_time| has elapsed. - void RunTasks(TaskTargetThread thread, optional> max_time); - - std::unordered_map> pending_tasks_; -}; TaskManager::TaskManager() { - pending_tasks_[TaskTargetThread::Indexer] = MakeUnique(); - pending_tasks_[TaskTargetThread::QueryDb] = MakeUnique(); + pending_tasks_[TaskThread::Indexer] = MakeUnique(); + pending_tasks_[TaskThread::QueryDb] = MakeUnique(); } -void TaskManager::PostTask(const Task& task) { - TaskQueue* queue = pending_tasks_[task.target_thread].get(); +void TaskManager::Post(TaskThread thread, const TTask& task) { + TaskQueue* queue = pending_tasks_[thread].get(); std::lock_guard lock_guard(queue->tasks_mutex); queue->tasks.push_back(task); } -void TaskManager::SetIdleTask(const Task& task) { - TaskQueue* queue = pending_tasks_[task.target_thread].get(); +void TaskManager::SetIdle(TaskThread thread, const TIdleTask& task) { + TaskQueue* queue = pending_tasks_[thread].get(); std::lock_guard lock_guard(queue->tasks_mutex); assert(!queue->idle_task && "There is already an idle task"); queue->idle_task = task; } -void TaskManager::RunTasks(TaskTargetThread thread, optional> max_time) { +bool TaskManager::RunTasks(TaskThread thread, optional> max_time) { auto start = std::chrono::high_resolution_clock::now(); TaskQueue* queue = pending_tasks_[thread].get(); + bool ran_task = false; + while (true) { - optional task; + optional task; // Get a task. { std::lock_guard lock_guard(queue->tasks_mutex); if (queue->tasks.empty()) - return; - task = queue->tasks[queue->tasks.size() - 1]; + break; + task = std::move(queue->tasks[queue->tasks.size() - 1]); queue->tasks.pop_back(); } // Execute task. assert(task); - task->action(); + (*task)(); + ran_task = true; - // If we've run past our max time stop. + // Stop if we've run past our max time. Don't run idle_task. auto elapsed = std::chrono::high_resolution_clock::now() - start; if (max_time && elapsed > *max_time) - return; + return ran_task; } + + if (queue->idle_task) { + // Even if the idle task returns false we still ran something before. + ran_task = (*queue->idle_task)() || ran_task; + } + + return ran_task; } TEST_SUITE("Task"); @@ -110,18 +67,18 @@ TEST_CASE("tasks are run as soon as they are posted") { // Post three tasks. int next = 1; int a = 0, b = 0, c = 0; - tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { + tm.Post(TaskThread::QueryDb, [&] { a = next++; - })); - tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { + }); + tm.Post(TaskThread::QueryDb, [&] { b = next++; - })); - tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { + }); + tm.Post(TaskThread::QueryDb, [&] { c = next++; - })); + }); // Execute all tasks. - tm.RunTasks(TaskTargetThread::QueryDb, nullopt); + tm.RunTasks(TaskThread::QueryDb, nullopt); // Tasks are executed in reverse order. REQUIRE(a == 3); @@ -135,20 +92,20 @@ TEST_CASE("post from inside task manager") { // Post three tasks. int next = 1; int a = 0, b = 0, c = 0; - tm.PostTask(Task(TaskTargetThread::QueryDb, [&] () { + tm.Post(TaskThread::QueryDb, [&] () { a = next++; - tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { + tm.Post(TaskThread::QueryDb, [&] { b = next++; - tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { + tm.Post(TaskThread::QueryDb, [&] { c = next++; - })); - })); - })); + }); + }); + }); // Execute all tasks. - tm.RunTasks(TaskTargetThread::QueryDb, nullopt); + tm.RunTasks(TaskThread::QueryDb, nullopt); // Tasks are executed in normal order because the next task is not posted // until the previous one is executed. @@ -157,4 +114,37 @@ TEST_CASE("post from inside task manager") { REQUIRE(c == 3); } +TEST_CASE("idle task is run after nested tasks") { + TaskManager tm; + + int count = 0; + tm.SetIdle(TaskThread::QueryDb, [&]() { + ++count; + return true; + }); + + // No tasks posted - idle runs once. + REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt)); + REQUIRE(count == 1); + count = 0; + + // Idle runs after other posted tasks. + bool did_run = false; + tm.Post(TaskThread::QueryDb, [&]() { + did_run = true; + }); + REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt)); + REQUIRE(did_run); + REQUIRE(count == 1); +} + +TEST_CASE("RunTasks returns false when idle task returns false and no other tasks were run") { + TaskManager tm; + + REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt) == false); + + tm.SetIdle(TaskThread::QueryDb, []() { return false; }); + REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt) == false); +} + TEST_SUITE_END(); \ No newline at end of file diff --git a/src/task.h b/src/task.h new file mode 100644 index 00000000..9a47f115 --- /dev/null +++ b/src/task.h @@ -0,0 +1,42 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +using std::experimental::optional; +using std::experimental::nullopt; + +enum class TaskThread { + Indexer, + QueryDb, +}; + +struct TaskManager { + using TTask = std::function; + using TIdleTask = std::function; + + TaskManager(); + + // Run |task| at some point in the future. This will run the task as soon as possible. + void Post(TaskThread thread, const TTask& task); + + // Run |task| whenever there is nothing else to run. + void SetIdle(TaskThread thread, const TIdleTask& idle_task); + + // Run pending tasks for |thread|. Stop running tasks after |max_time| has + // elapsed. Returns true if tasks were run. + bool RunTasks(TaskThread thread, optional> max_time); + + struct TaskQueue { + optional idle_task; + std::vector tasks; + std::mutex tasks_mutex; + }; + + std::unordered_map> pending_tasks_; +}; \ No newline at end of file