ccls/task.cc

151 lines
3.8 KiB
C++
Raw Normal View History

2017-02-23 09:23:23 +00:00
#include <condition_variable>
#include <iostream>
#include <thread>
#include <vector>
#include "indexer.h"
#include "query.h"
2017-02-28 08:37:20 +00:00
#include "optional.h"
2017-02-23 09:23:23 +00:00
#include "third_party/tiny-process-library/process.hpp"
2017-02-24 08:39:25 +00:00
#include <queue>
#include <mutex>
#include <condition_variable>
2017-02-23 09:23:23 +00:00
2017-02-28 08:37:20 +00:00
using std::experimental::optional;
using std::experimental::nullopt;
2017-02-24 08:39:25 +00:00
// A threadsafe-queue. http://stackoverflow.com/a/16075550
template <class T>
class SafeQueue {
public:
// Add an element to the queue.
void enqueue(T t) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(t);
cv_.notify_one();
2017-02-23 09:23:23 +00:00
}
2017-02-24 08:39:25 +00:00
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty()) {
// release lock as long as the wait and reaquire it afterwards.
cv_.wait(lock);
}
T val = queue_.front();
queue_.pop();
return val;
2017-02-23 09:23:23 +00:00
}
2017-02-28 08:37:20 +00:00
// Get the "front"-element.
// Returns empty if the queue is empty.
optional<T> try_dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty())
return nullopt;
T val = queue_.front();
queue_.pop();
return val;
}
2017-02-24 08:39:25 +00:00
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
2017-02-23 09:23:23 +00:00
};
2017-02-24 08:39:25 +00:00
struct Task {
int priority = 0;
bool writes_to_index = false;
bool should_exit = false;
static Task MakeExit() {
Task task;
task.should_exit = true;
return task;
2017-02-23 09:23:23 +00:00
}
2017-02-24 08:39:25 +00:00
// TODO: Create index task.
// Task running in a separate process, parsing a file into something we can
// import.
// TODO: Index import task.
// Completed parse task that wants to import content into the global database.
// Runs in main process, primary thread. Stops all other threads.
// TODO: Index fresh task.
// Completed parse task that wants to update content previously imported into
// the global database. Runs in main process, primary thread. Stops all other
// threads.
//
// Note that this task just contains a set of operations to apply to the global
// database. The operations come from a diff based on the previously indexed
// state in comparison to the newly indexed state.
//
// TODO: We may be able to run multiple freshen and import tasks in parallel if
// we restrict what ranges of the db they may change.
// TODO: QueryTask
// Task running a query against the global database. Run in main process,
// separate thread.
2017-02-23 09:23:23 +00:00
Command query;
Location location;
std::string argument;
};
// NOTE: When something enters a value into master db, it will have to have a
// ref count, since multiple parsings could enter it (unless we require
// that it be defined in that declaration unit!)
struct TaskManager {
2017-02-24 08:39:25 +00:00
SafeQueue<Task> queued_tasks;
2017-02-23 09:23:23 +00:00
// Available threads.
std::vector<std::thread> threads;
TaskManager(int num_threads);
};
2017-02-24 08:39:25 +00:00
static void ThreadMain(int id, TaskManager* tm) {
while (true) {
Task task = tm->queued_tasks.dequeue();
if (task.should_exit) {
std::cout << id << ": Exiting" << std::endl;
return;
}
std::cout << id << ": waking" << std::endl;
}
2017-02-23 09:23:23 +00:00
}
TaskManager::TaskManager(int num_threads) {
for (int i = 0; i < num_threads; ++i) {
2017-02-24 08:39:25 +00:00
threads.push_back(std::thread(&ThreadMain, i, this));
2017-02-23 09:23:23 +00:00
}
}
void Pump(TaskManager* tm) {
//tm->threads[0].
}
2017-03-01 04:12:57 +00:00
int main5555555555(int argc, char** argv) {
2017-02-24 08:39:25 +00:00
TaskManager tm(5);
2017-02-23 09:23:23 +00:00
// TODO: looks like we will have to write shared memory support.
// TODO: We signal thread to pick data, thread signals data pick is done.
// Repeat until we encounter a writer, wait for all threads to signal
// they are done.
// TODO: Let's use a thread safe queue/vector/etc instead.
2017-02-24 08:39:25 +00:00
for (int i = 0; i < 10; ++i)
tm.queued_tasks.enqueue(Task::MakeExit());
2017-02-23 09:23:23 +00:00
for (std::thread& thread : tm.threads)
thread.join();
std::cin.get();
return 0;
}