mirror of
				https://github.com/MaskRay/ccls.git
				synced 2025-10-31 20:53:01 +00:00 
			
		
		
		
	More iteration on task system
This commit is contained in:
		
							parent
							
								
									b9061ccc07
								
							
						
					
					
						commit
						c89f651cd8
					
				
							
								
								
									
										154
									
								
								src/task.cc
									
									
									
									
									
								
							
							
						
						
									
										154
									
								
								src/task.cc
									
									
									
									
									
								
							| @ -1,105 +1,62 @@ | |||||||
|  | #include "task.h" | ||||||
|  | 
 | ||||||
| #include "utils.h" | #include "utils.h" | ||||||
| 
 | 
 | ||||||
| #include <doctest/doctest.h> | #include <doctest/doctest.h> | ||||||
| #include <optional.h> |  | ||||||
| 
 |  | ||||||
| #include <chrono> |  | ||||||
| #include <functional> |  | ||||||
| #include <mutex> |  | ||||||
| #include <unordered_map> |  | ||||||
| #include <vector> |  | ||||||
| 
 |  | ||||||
| using std::experimental::optional; |  | ||||||
| using std::experimental::nullopt; |  | ||||||
| 
 |  | ||||||
| enum class TaskTargetThread { |  | ||||||
|   Indexer, |  | ||||||
|   QueryDb, |  | ||||||
| }; |  | ||||||
| 
 |  | ||||||
| // TODO: IdleTask returns a bool indicating if it did work.
 |  | ||||||
| // TODO: Hookup IdleTask
 |  | ||||||
| // TODO: Move target_thread out of task and into PostTask.
 |  | ||||||
| 
 |  | ||||||
| struct Task { |  | ||||||
|   // The thread the task will execute on.
 |  | ||||||
|   TaskTargetThread target_thread; |  | ||||||
| 
 |  | ||||||
|   // The action the task will perform.
 |  | ||||||
|   using TAction = std::function<void()>; |  | ||||||
|   TAction action; |  | ||||||
| 
 |  | ||||||
|   Task(TaskTargetThread target, const TAction& action); |  | ||||||
| }; |  | ||||||
| 
 |  | ||||||
| Task::Task(TaskTargetThread target, const TAction& action) |  | ||||||
|   : target_thread(target), action(action) {} |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| struct TaskQueue { |  | ||||||
|   optional<Task> idle_task; |  | ||||||
|   std::vector<Task> tasks; |  | ||||||
|   std::mutex tasks_mutex; |  | ||||||
| }; |  | ||||||
| 
 |  | ||||||
| struct TaskManager { |  | ||||||
|   TaskManager(); |  | ||||||
| 
 |  | ||||||
|   // Run |task| at some point in the future. This will run the task as soon as possible.
 |  | ||||||
|   void PostTask(const Task& task); |  | ||||||
| 
 |  | ||||||
|   // Run |task| whenever there is nothing else to run.
 |  | ||||||
|   void SetIdleTask(const Task& task); |  | ||||||
| 
 |  | ||||||
|   // Run pending tasks for |thread|. Stop running tasks after |max_time| has elapsed.
 |  | ||||||
|   void RunTasks(TaskTargetThread thread, optional<std::chrono::duration<long long, std::nano>> max_time); |  | ||||||
| 
 |  | ||||||
|   std::unordered_map<TaskTargetThread, std::unique_ptr<TaskQueue>> pending_tasks_; |  | ||||||
| }; |  | ||||||
| 
 | 
 | ||||||
| TaskManager::TaskManager() { | TaskManager::TaskManager() { | ||||||
|   pending_tasks_[TaskTargetThread::Indexer] = MakeUnique<TaskQueue>(); |   pending_tasks_[TaskThread::Indexer] = MakeUnique<TaskQueue>(); | ||||||
|   pending_tasks_[TaskTargetThread::QueryDb] = MakeUnique<TaskQueue>(); |   pending_tasks_[TaskThread::QueryDb] = MakeUnique<TaskQueue>(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void TaskManager::PostTask(const Task& task) { | void TaskManager::Post(TaskThread thread, const TTask& task) { | ||||||
|   TaskQueue* queue = pending_tasks_[task.target_thread].get(); |   TaskQueue* queue = pending_tasks_[thread].get(); | ||||||
|   std::lock_guard<std::mutex> lock_guard(queue->tasks_mutex); |   std::lock_guard<std::mutex> lock_guard(queue->tasks_mutex); | ||||||
|   queue->tasks.push_back(task); |   queue->tasks.push_back(task); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void TaskManager::SetIdleTask(const Task& task) { | void TaskManager::SetIdle(TaskThread thread, const TIdleTask& task) { | ||||||
|   TaskQueue* queue = pending_tasks_[task.target_thread].get(); |   TaskQueue* queue = pending_tasks_[thread].get(); | ||||||
|   std::lock_guard<std::mutex> lock_guard(queue->tasks_mutex); |   std::lock_guard<std::mutex> lock_guard(queue->tasks_mutex); | ||||||
|   assert(!queue->idle_task && "There is already an idle task"); |   assert(!queue->idle_task && "There is already an idle task"); | ||||||
|   queue->idle_task = task; |   queue->idle_task = task; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void TaskManager::RunTasks(TaskTargetThread thread, optional<std::chrono::duration<long long, std::nano>> max_time) { | bool TaskManager::RunTasks(TaskThread thread, optional<std::chrono::duration<long long, std::nano>> max_time) { | ||||||
|   auto start = std::chrono::high_resolution_clock::now(); |   auto start = std::chrono::high_resolution_clock::now(); | ||||||
|   TaskQueue* queue = pending_tasks_[thread].get(); |   TaskQueue* queue = pending_tasks_[thread].get(); | ||||||
| 
 | 
 | ||||||
|  |   bool ran_task = false; | ||||||
|  | 
 | ||||||
|   while (true) { |   while (true) { | ||||||
|     optional<Task> task; |     optional<TTask> task; | ||||||
| 
 | 
 | ||||||
|     // Get a task.
 |     // Get a task.
 | ||||||
|     { |     { | ||||||
|       std::lock_guard<std::mutex> lock_guard(queue->tasks_mutex); |       std::lock_guard<std::mutex> lock_guard(queue->tasks_mutex); | ||||||
|       if (queue->tasks.empty()) |       if (queue->tasks.empty()) | ||||||
|         return; |         break; | ||||||
|       task = queue->tasks[queue->tasks.size() - 1]; |       task = std::move(queue->tasks[queue->tasks.size() - 1]); | ||||||
|       queue->tasks.pop_back(); |       queue->tasks.pop_back(); | ||||||
|     } |     } | ||||||
|      |      | ||||||
|     // Execute task.
 |     // Execute task.
 | ||||||
|     assert(task); |     assert(task); | ||||||
|     task->action(); |     (*task)(); | ||||||
|  |     ran_task = true; | ||||||
| 
 | 
 | ||||||
|     // If we've run past our max time stop.
 |     // Stop if we've run past our max time. Don't run idle_task.
 | ||||||
|     auto elapsed = std::chrono::high_resolution_clock::now() - start; |     auto elapsed = std::chrono::high_resolution_clock::now() - start; | ||||||
|     if (max_time && elapsed > *max_time) |     if (max_time && elapsed > *max_time) | ||||||
|       return; |       return ran_task; | ||||||
|   } |   } | ||||||
|  | 
 | ||||||
|  |   if (queue->idle_task) { | ||||||
|  |     // Even if the idle task returns false we still ran something before.
 | ||||||
|  |     ran_task = (*queue->idle_task)() || ran_task; | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   return ran_task; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| TEST_SUITE("Task"); | TEST_SUITE("Task"); | ||||||
| @ -110,18 +67,18 @@ TEST_CASE("tasks are run as soon as they are posted") { | |||||||
|   // Post three tasks.
 |   // Post three tasks.
 | ||||||
|   int next = 1; |   int next = 1; | ||||||
|   int a = 0, b = 0, c = 0; |   int a = 0, b = 0, c = 0; | ||||||
|   tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { |   tm.Post(TaskThread::QueryDb, [&] { | ||||||
|     a = next++; |     a = next++; | ||||||
|   })); |   }); | ||||||
|   tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { |   tm.Post(TaskThread::QueryDb, [&] { | ||||||
|     b = next++; |     b = next++; | ||||||
|   })); |   }); | ||||||
|   tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { |   tm.Post(TaskThread::QueryDb, [&] { | ||||||
|     c = next++; |     c = next++; | ||||||
|   })); |   }); | ||||||
| 
 | 
 | ||||||
|   // Execute all tasks.
 |   // Execute all tasks.
 | ||||||
|   tm.RunTasks(TaskTargetThread::QueryDb, nullopt); |   tm.RunTasks(TaskThread::QueryDb, nullopt); | ||||||
| 
 | 
 | ||||||
|   // Tasks are executed in reverse order.
 |   // Tasks are executed in reverse order.
 | ||||||
|   REQUIRE(a == 3); |   REQUIRE(a == 3); | ||||||
| @ -135,20 +92,20 @@ TEST_CASE("post from inside task manager") { | |||||||
|   // Post three tasks.
 |   // Post three tasks.
 | ||||||
|   int next = 1; |   int next = 1; | ||||||
|   int a = 0, b = 0, c = 0; |   int a = 0, b = 0, c = 0; | ||||||
|   tm.PostTask(Task(TaskTargetThread::QueryDb, [&] () { |   tm.Post(TaskThread::QueryDb, [&] () { | ||||||
|     a = next++; |     a = next++; | ||||||
| 
 | 
 | ||||||
|     tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { |     tm.Post(TaskThread::QueryDb, [&] { | ||||||
|       b = next++; |       b = next++; | ||||||
| 
 | 
 | ||||||
|       tm.PostTask(Task(TaskTargetThread::QueryDb, [&] { |       tm.Post(TaskThread::QueryDb, [&] { | ||||||
|         c = next++; |         c = next++; | ||||||
|       })); |       }); | ||||||
|     })); |     }); | ||||||
|   })); |   }); | ||||||
| 
 | 
 | ||||||
|   // Execute all tasks.
 |   // Execute all tasks.
 | ||||||
|   tm.RunTasks(TaskTargetThread::QueryDb, nullopt); |   tm.RunTasks(TaskThread::QueryDb, nullopt); | ||||||
| 
 | 
 | ||||||
|   // Tasks are executed in normal order because the next task is not posted
 |   // Tasks are executed in normal order because the next task is not posted
 | ||||||
|   // until the previous one is executed.
 |   // until the previous one is executed.
 | ||||||
| @ -157,4 +114,37 @@ TEST_CASE("post from inside task manager") { | |||||||
|   REQUIRE(c == 3); |   REQUIRE(c == 3); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | TEST_CASE("idle task is run after nested tasks") { | ||||||
|  |   TaskManager tm; | ||||||
|  | 
 | ||||||
|  |   int count = 0; | ||||||
|  |   tm.SetIdle(TaskThread::QueryDb, [&]() { | ||||||
|  |     ++count; | ||||||
|  |     return true; | ||||||
|  |   }); | ||||||
|  | 
 | ||||||
|  |   // No tasks posted - idle runs once.
 | ||||||
|  |   REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt)); | ||||||
|  |   REQUIRE(count == 1); | ||||||
|  |   count = 0; | ||||||
|  | 
 | ||||||
|  |   // Idle runs after other posted tasks.
 | ||||||
|  |   bool did_run = false; | ||||||
|  |   tm.Post(TaskThread::QueryDb, [&]() { | ||||||
|  |     did_run = true; | ||||||
|  |   }); | ||||||
|  |   REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt)); | ||||||
|  |   REQUIRE(did_run); | ||||||
|  |   REQUIRE(count == 1); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | TEST_CASE("RunTasks returns false when idle task returns false and no other tasks were run") { | ||||||
|  |   TaskManager tm; | ||||||
|  | 
 | ||||||
|  |   REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt) == false); | ||||||
|  |    | ||||||
|  |   tm.SetIdle(TaskThread::QueryDb, []() { return false; }); | ||||||
|  |   REQUIRE(tm.RunTasks(TaskThread::QueryDb, nullopt) == false); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| TEST_SUITE_END(); | TEST_SUITE_END(); | ||||||
							
								
								
									
										42
									
								
								src/task.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								src/task.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,42 @@ | |||||||
|  | #pragma once | ||||||
|  | 
 | ||||||
|  | #include <optional.h> | ||||||
|  | 
 | ||||||
|  | #include <chrono> | ||||||
|  | #include <functional> | ||||||
|  | #include <mutex> | ||||||
|  | #include <unordered_map> | ||||||
|  | #include <vector> | ||||||
|  | 
 | ||||||
|  | using std::experimental::optional; | ||||||
|  | using std::experimental::nullopt; | ||||||
|  | 
 | ||||||
|  | enum class TaskThread { | ||||||
|  |   Indexer, | ||||||
|  |   QueryDb, | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | struct TaskManager { | ||||||
|  |   using TTask = std::function<void()>; | ||||||
|  |   using TIdleTask = std::function<bool()>; | ||||||
|  | 
 | ||||||
|  |   TaskManager(); | ||||||
|  | 
 | ||||||
|  |   // Run |task| at some point in the future. This will run the task as soon as possible.
 | ||||||
|  |   void Post(TaskThread thread, const TTask& task); | ||||||
|  | 
 | ||||||
|  |   // Run |task| whenever there is nothing else to run.
 | ||||||
|  |   void SetIdle(TaskThread thread, const TIdleTask& idle_task); | ||||||
|  | 
 | ||||||
|  |   // Run pending tasks for |thread|. Stop running tasks after |max_time| has
 | ||||||
|  |   // elapsed. Returns true if tasks were run.
 | ||||||
|  |   bool RunTasks(TaskThread thread, optional<std::chrono::duration<long long, std::nano>> max_time); | ||||||
|  | 
 | ||||||
|  |   struct TaskQueue { | ||||||
|  |     optional<TIdleTask> idle_task; | ||||||
|  |     std::vector<TTask> tasks; | ||||||
|  |     std::mutex tasks_mutex; | ||||||
|  |   }; | ||||||
|  | 
 | ||||||
|  |   std::unordered_map<TaskThread, std::unique_ptr<TaskQueue>> pending_tasks_; | ||||||
|  | }; | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user