EventPoller.h
WorkThreadPool.h
EventPoller
UML
继承自TaskExecutor类,最重要的是实现async方法
结构
std::thread::id _loop_thread_id //执行事件循环的线程id//async相关//内部事件管道 PipeWrap _pipe; //从其他线程切换过来的任务 std::mutex _mtx_task; List<Task::Ptr> _list_task;//epoll相关 int _epoll_fd = -1; unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;//定时器相关 std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map;
async/async_l
同步任务&&本对象的轮询线程调用 直接执行
异步任务 加入poller管理的任务队列,写入管道通知主线程
Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) { TimeTicker(); if (may_sync && isCurrentThread()) { task(); return nullptr; } auto ret = std::make_shared<Task>(std::move(task)); { lock_guard<mutex> lck(_mtx_task); if (first) { _list_task.emplace_front(ret); } else { _list_task.emplace_back(ret); } } //写数据到管道,唤醒主线程 _pipe.write("", 1); return ret;}
构造函数
自己维护一个红黑树,初始化时就加入对**管道事件(执行异步任务)**的监听
_loop_thread_id,后面会看到构造函数只会被TaskExecutorGetterImp::addPoller调用,且此时绑定的线程id并非执行epoll_wait的线程,在构造之后,会在在执行runLoop时分配一个新线程再次绑定_loop_thread_id
EventPoller::EventPoller(ThreadPool::Priority priority) { _priority = priority; SockUtil::setNoBlocked(_pipe.readFD()); SockUtil::setNoBlocked(_pipe.writeFD());#if defined(HAS_EPOLL) _epoll_fd = epoll_create(EPOLL_SIZE); if (_epoll_fd == -1) { throw runtime_error(StrPrinter << "创建epoll文件描述符失败:" << get_uv_errmsg()); } SockUtil::setCloExec(_epoll_fd);#endif //HAS_EPOLL _logger = Logger::Instance().shared_from_this(); _loop_thread_id = this_thread::get_id(); //添加内部管道事件 if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) { throw std::runtime_error("epoll添加管道失败"); }}
onPipeEvent
没错,管道事件回调即把异步任务队列中的任务执行个编
inline void EventPoller::onPipeEvent() { char buf[1024]; int err = 0; do { if (_pipe.read(buf, sizeof(buf)) > 0) { continue; } err = get_uv_error(true); } while (err != UV_EAGAIN); decltype(_list_task) _list_swap; { lock_guard<mutex> lck(_mtx_task); _list_swap.swap(_list_task); } _list_swap.for_each([&](const Task::Ptr &task) { try { (*task)(); } catch (ExitException &) { _exit_flag = true; } catch (std::exception &ex) { ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what(); } });}
epoll_ctl
socket中可以看到,对应的回调函数都有一个socket实例的弱指针,回调中可以操作socket的所有!
EPOLL_CTL_ADD
typedef union epoll_data{ void *ptr; int fd; uint32_t u32; uint64_t u64;} epoll_data_t;struct epoll_event{ uint32_t events;/* Epoll events */ epoll_data_t data;/* User data variable */} __EPOLL_PACKED;
并没有使用epoll_event.data.ptr来存放回调的数据,而是_event_map保存对应的fd对应的回调
int EventPoller::addEvent(int fd, int event, PollEventCB cb) { TimeTicker(); if (!cb) { WarnL << "PollEventCB 为空!"; return -1; } if (isCurrentThread()) {#if defined(HAS_EPOLL) struct epoll_event ev = {0}; ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE; ev.data.fd = fd; int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (ret == 0) { _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb))); } return ret;#endif //HAS_EPOLL } async([this, fd, event, cb]() { addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb))); }); return 0;}
看到这里回顾之前的socket部分,socket连接之后进入工作状态的回调onRead|onWrite|emitErr,其实也是同理,事件来了,来了做什么,socket对象自己是知道的。实际上socket对象真正对应的读写等事件的cb,是我们根据session具体类型指定的。
这就是大多数网络框架留出来的让用户自定义的onXXX回调部分,即处理业务逻辑的部分
bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) { weak_ptr<Socket> weak_self = shared_from_this(); weak_ptr<SockFD> weak_sock = sock; _enable_recv = true; _read_buffer = _poller->getSharedBuffer(); int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self,weak_sock,is_udp](int event) { auto strong_self = weak_self.lock(); auto strong_sock = weak_sock.lock(); if (!strong_self || !strong_sock) { return; } if (event & EventPoller::Event_Read) { strong_self->onRead(strong_sock, is_udp); } if (event & EventPoller::Event_Write) { strong_self->onWriteAble(strong_sock); } if (event & EventPoller::Event_Error) { strong_self->emitErr(getSockErr(strong_sock)); } }); return -1 != result;}
EPOLL_CTL_DEL/EPOLL_CTL_MOD
和epoll_ctl_add相同,只不过没有del的回调没有存,没必要。mod对应的也只是修改event类型,没有对_event_map进行操作
runLoop
跑不了reactor模型的范式,在就绪事件里调用对应的回调(回调放在了_event_map中,没有给内核event.data.ptr传指针,自己管理自己维护)
特别的是,blocked设置调用是否阻塞,master线程调用不能阻塞,则会else中新开一个线程,开好之后,通过信号量通知进度
void EventPoller::runLoop(bool blocked, bool ref_self) { if (blocked) { ThreadPool::setPriority(_priority); lock_guard<mutex> lck(_mtx_running); _loop_thread_id = this_thread::get_id(); if (ref_self) { s_current_poller = shared_from_this(); } _sem_run_started.post(); _exit_flag = false; uint64_t minDelay;#if defined(HAS_EPOLL) struct epoll_event events[EPOLL_SIZE]; while (!_exit_flag) { minDelay = getMinDelay(); startSleep();//用于统计当前线程负载情况 int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1); sleepWakeUp();//用于统计当前线程负载情况 if (ret <= 0) { //超时或被打断 continue; } for (int i = 0; i < ret; ++i) { //reactor范式 struct epoll_event &ev = events[i]; int fd = ev.data.fd; auto it = _event_map.find(fd); if (it == _event_map.end()) { epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr); continue; } auto cb = it->second; try { (*cb)(toPoller(ev.events)); } catch (std::exception &ex) { ErrorL << "EventPoller执行事件回调捕获到异常:" << ex.what(); } } } } else { _loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self); _sem_run_started.wait(); }}
static thread_local std::weak_ptr<EventPoller> s_current_poller; ref_self是否引用自己,给线程中执行的开了一个口子,可以操作当前的线程的poller.
thread_local 关键字修饰的变量具有线程(thread)周期,这些变量在线程开始的时候被生成,在线程结束的时候被销毁,并且每一个线程都拥有一个独立的变量实例。
C++ 11 关键字:thread_local
EventPoller::Ptr EventPoller::getCurrentPoller() { return s_current_poller.lock();}
getMinDelay 执行定时任务/设定epoll的超时等待时间
flushDelayTask
执行到期的的定时任务,如果需要循环执行,再次加入一个定时任务
距离下一个定时任务还有多久设计为epoll_wait的超时等待时间
uint64_t EventPoller::flushDelayTask(uint64_t now_time) { decltype(_delay_task_map) task_copy; task_copy.swap(_delay_task_map); for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) { //已到期的任务 try { auto next_delay = (*(it->second))(); if (next_delay) { //可重复任务,更新时间截止线 _delay_task_map.emplace(next_delay + now_time, std::move(it->second)); } } catch (std::exception &ex) { ErrorL << "EventPoller执行延时任务捕获到异常:" << ex.what(); } } task_copy.insert(_delay_task_map.begin(), _delay_task_map.end()); task_copy.swap(_delay_task_map); auto it = _delay_task_map.begin(); if (it == _delay_task_map.end()) { //没有剩余的定时器了 return 0; } //最近一个定时器的执行延时 return it->first - now_time;}
添加定时任务,会异步执行,引起管道事件,即往定时列表中添加定时任务。poller一直在runloop怎么添加的,所以异步执行时,会判断是否是当前poller线程,不是就写管道通知来活了
EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function<uint64_t()> task) { DelayTask::Ptr ret = std::make_shared<DelayTask>(std::move(task)); auto time_line = getCurrentMillisecond() + delay_ms; async_first([time_line, ret, this]() { //异步执行的目的是刷新select或epoll的休眠时间 _delay_task_map.emplace(time_line, ret); }); return ret;}
Any/AnyStorage
任意对象void*类型的问题在于,当delete时,能不能获得对应的类型,正确的执行析构。shared_ptr的第二个参数允许我们传入deletor,解决这个问题
//可以保存任意的对象class Any{public: using Ptr = std::shared_ptr<Any>; Any() = default; ~Any() = default; template <typename C,typename ...ArgsType> void set(ArgsType &&...args){ _data.reset(new C(std::forward<ArgsType>(args)...),[](void *ptr){ delete (C*) ptr; }); } template <typename C> C& get(){ if(!_data){ throw std::invalid_argument("Any is empty"); } C *ptr = (C *)_data.get(); return *ptr; } operator bool() { return _data.operator bool (); //std::shared_ptr<T>::operator bool true if *this stores a pointer, false otherwise. } bool empty(){ return !bool(); }private: std::shared_ptr<void> _data;};//用于保存一些外加属性class AnyStorage : public std::unordered_map<std::string,Any>{public: AnyStorage() = default; ~AnyStorage() = default; using Ptr = std::shared_ptr<AnyStorage>;};
std::shared_ptr的工作原理
shared_ptr<void>确实会记录传入的类型,因此析构时能够正确执行。
但是如果传入的是父类指针,就体现了析构函数虚函数的重要性
#include <memory>#include <iostream>#include <vector>class test {public: test() { std::cout << "Test created" << std::endl; } virtual ~test() { std::cout << "Test destroyed" << std::endl; }};class test1: public test{public:test1(){std::cout << "Test1 created" << std::endl;}~test1(){std::cout<< "Test1 destroyed"<< std::endl;}};int main() { std::cout << "At begin of main.\ncreating std::vector<std::shared_ptr<void>>" << std::endl; std::vector<std::shared_ptr<void>> v; { std::cout << "Creating test" << std::endl; v.push_back( std::shared_ptr<test>( (test*)new test1() ) );//传入的是test类型的指针,如果没有virtual不会执行test1的析构 v.push_back( std::shared_ptr<test>( new test1() ) ); //传入的是test1类型的指针,即使没有virtual也能正确调用 std::cout << "Leaving scope" << std::endl; } std::cout << "Leaving main" << std::endl; return 0;}
EventPollerPool
UML
INSTANCE_IMP(EventPollerPool) #define INSTANCE_IMP(class_name, ...) \class_name &class_name::Instance() { \ static std::shared_ptr<class_name> s_instance(new class_name(__VA_ARGS__)); \ static class_name &s_instance_ref = *s_instance; \ return s_instance_ref; \}// 声明一个单例,智能指针管理生命周期EventPollerPool &EventPollerPool::Instance() { static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); static EventPollerPool &s_instance_ref = *s_instance; return s_instance_ref; }EventPollerPool::EventPollerPool() { auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true); InfoL << "创建EventPoller个数:" << size;}
在池单例的构造函数中,调用TaskExecutorGetter的addPoller方法
EventPoller::Ptr EventPollerPool::getFirstPoller() { return dynamic_pointer_cast<EventPoller>(_threads.front());}//根据负载情况获取轻负载的实例 如果优先返回当前线程,那么会返回当前线程 返回当前线程的目的是为了提高线程安全性EventPoller::Ptr EventPollerPool::getPoller(bool prefer_current_thread) { auto poller = EventPoller::getCurrentPoller(); if (prefer_current_thread && _prefer_current_thread && poller) { return poller; } return dynamic_pointer_cast<EventPoller>(getExecutor());}/** * 设置 getPoller() 是否优先返回当前线程 * 在批量创建Socket对象时,如果优先返回当前线程, * 那么将导致负载不够均衡,所以可以暂时关闭然后再开启 * @param flag 是否优先返回当前线程 */void EventPollerPool::preferCurrentThread(bool flag) { _prefer_current_thread = flag;}
WorkThreadPool
长得和EventPoller一样…唯一不同在于线程优先级不同 PRIORITY_LOWEST ,EventPollerPool对应的线程优先级PRIORITY_HIGHEST
总结
如何实现一个可以存储任意类型的Any结构,对应的资源释放怎么管理,shared_ptr, 析构函数虚函数的重要性
thread_local 具有线程周期,这些变量在线程开始的时候被生成,在线程结束的时候被销毁,并且每一个线程都拥有一个独立的变量实例
对于异步任务执行的实现,pipe事件监听+带锁的异步任务队列
如果添加定时任务,如何在epoll_wait的循环中不错过定时任务,如何处理定时任务
多个poller线程的负载均衡怎么做的 (没有什么高大上,记录wait和run的时间,选择空闲比最小的)