TaskExecutor.h
Task | TaskIn
一个是函数对象,一个是可调用的类对象
using TaskIn = std::function<void()>;
using Task = TaskCancelableImp<void()>;
TaskCancelableImp
一个重载了operator()的类,返回值和参数是对应的模板参数,函数可取消
取消通过一强一弱智能指针实现
同时定义了取消后的默认返回值
template<class R, class... ArgTypes>class TaskCancelableImp<R(ArgTypes...)> : public TaskCancelable {public: using Ptr = std::shared_ptr<TaskCancelableImp>; using func_type = std::function<R(ArgTypes...)>; ~TaskCancelableImp() = default; template<typename FUNC> TaskCancelableImp(FUNC &&task) { _strongTask = std::make_shared<func_type>(std::forward<FUNC>(task)); _weakTask = _strongTask; } void cancel() override { _strongTask = nullptr; } operator bool() { return _strongTask && *_strongTask; } void operator=(std::nullptr_t) { _strongTask = nullptr; } R operator()(ArgTypes ...args) const { auto strongTask = _weakTask.lock(); if (strongTask && *strongTask) { return (*strongTask)(std::forward<ArgTypes>(args)...); } return defaultValue<R>(); } template<typename T> static typename std::enable_if<std::is_void<T>::value, void>::type defaultValue() {} template<typename T> static typename std::enable_if<std::is_pointer<T>::value, T>::type defaultValue() { return nullptr; } template<typename T> static typename std::enable_if<std::is_integral<T>::value, T>::type defaultValue() { return 0; }protected: std::weak_ptr<func_type> _weakTask; std::shared_ptr<func_type> _strongTask;};
TaskExecutorGetterImp
UML
TaskExecutorGetter
virtual TaskExecutor::Ptr getExecutor() = 0;virtual size_t getExecutorSize() const = 0;
结构
std::vector<TaskExecutor::Ptr> _threads; //线程池,多个任务执行器
因为装了多个TaskExecutor,所以对应的获取执行器,统计负载,等功能都有。
addPoller
根据核数添加对应的poller线程
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) { auto cpus = thread::hardware_concurrency(); size = size > 0 ? size : cpus; for (size_t i = 0; i < size; ++i) { EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority)); poller->runLoop(false, register_thread); auto full_name = name + " " + to_string(i); poller->async([i, cpus, full_name]() { setThreadName(full_name.data()); setThreadAffinity(i % cpus); }); _threads.emplace_back(std::move(poller)); } return size;}
getExecutorDelay 给每一个thread异步加入一个定时器,统计执行的时间。finished里析构是调用回调函数。
void TaskExecutorGetterImp::getExecutorDelay(const function<void(const vector<int> &)> &callback) { std::shared_ptr<vector<int> > delay_vec = std::make_shared<vector<int>>(_threads.size()); shared_ptr<void> finished(nullptr, [callback, delay_vec](void *) { //此析构回调触发时,说明已执行完毕所有async任务 callback((*delay_vec)); }); int index = 0; for (auto &th : _threads) { std::shared_ptr<Ticker> delay_ticker = std::make_shared<Ticker>(); th->async([finished, delay_vec, index, delay_ticker]() { (*delay_vec)[index] = (int) delay_ticker->elapsedTime(); }, false); ++index; }}
getExecutor
根据线程负载情况,获取最空闲的任务执行器
TaskExecutor::Ptr TaskExecutorGetterImp::getExecutor() { auto thread_pos = _thread_pos; if (thread_pos >= _threads.size()) { thread_pos = 0; } TaskExecutor::Ptr executor_min_load = _threads[thread_pos]; auto min_load = executor_min_load->load(); for (size_t i = 0; i < _threads.size(); ++i, ++thread_pos) { if (thread_pos >= _threads.size()) { thread_pos = 0; } auto th = _threads[thread_pos]; auto load = th->load(); if (load < min_load) { min_load = load; executor_min_load = th; } if (min_load == 0) { break; } } _thread_pos = thread_pos; return executor_min_load;}
TaskExecutor
一个任务执行器的最重要的是实现继承自TaskExecutorInterface异步执行的接口
UML
ThreadLoadCounter 计算load
统计线程的cpu使用率
|sleep…|run…|sleep…|run…|交替执行,在load里统计run/run+sleep的百分比
!TaskExecutorInterface
继承的子类需要实现一个异步执行的接口!
class TaskExecutorInterface {void sync(const TaskIn &task); //同步执行就是引用 void sync_first(const TaskIn &task); virtual Task::Ptr async_first(TaskIn task, bool may_sync = true); //异步执行涉及到拷贝了 virtual Task::Ptr async(TaskIn task, bool may_sync = true) = 0;}
最终都是调用async,子类来实现async
如何化同步为异步,子类async的返回值可以作为判断是否已经执行任务的根据,如果ret && *ret那么说明任务被放到异步队列中了,sem.wait()等待任务执行时 sem.post()唤醒,否则说明已经执行完毕
EventPoller::async_l的参数说明: 可取消的任务本体,如果已经同步执行,则返回nullptr
void TaskExecutorInterface::sync(const TaskIn &task) { semaphore sem; auto ret = async([&]() { onceToken token(nullptr, [&]() { //通过RAII原理防止抛异常导致不执行这句代码 sem.post(); //在异步代码中让同步继续进行下去 }); task(); }); if (ret && *ret) {//任务需要被异步执行 sem.wait(); }}