项目地址在这里: https://gitee.com/penggli_2_0/TcpServer
仿mudou的高并发服务器
1 项目介绍2 模块组成3 实现时间轮模块3.1 设计思想3.2 定时任务类3.3 TimeWheel时间轮类
1 项目介绍
这是一个仿muduo库One Thread One Loop式主从Reactor模型实现⾼并发服务器项目。是对基于事件驱动Reactor模型的改良版!
通过实现的高并发服务器组件,可以简洁快速的完成⼀个⾼性能的服务器搭建。并且,通过组件内提供的不同应⽤层协议⽀持,也可以快速完成⼀个高性能应用服务器的搭建(当前为了便于项目的演⽰,项目中提供HTTP协议组件的支持)。在这里,要明确的是咱们要实现的是⼀个高并发服务器组件,因此当前的项目中并不包含实际的业务内容!
实现的是主从Reactor模型服务器!分为两个部分:主Reactor 与 子Reactor:
主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的高效,提⾼服务器的并发性能。主Reactor获取到新连接后分发给⼦Reactor进⾏通信事件监控。子Reactor线程监控各⾃的描述符的读写事件进行数据读写以及业务处理!One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理的循环。
当前实现中,因为并不确定组件使用者的意向,因此不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
2 模块组成
在这项目中我们需要两大模块:
协议模块:协议是双方进行通信的基础,这是一定要实现的!sever服务模块:实现Reactor模型的TCP服务器,实现高效派发任务!SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终完成高性能服务器组件的实现。具体的管理也分为三个方面:
监听连接管理:对监听连接进⾏管理。通信连接管理:对通信连接进⾏管理。超时连接管理:对超时连接进⾏管理。基于以上的管理思想,将这个模块进⾏细致的划分⼜可以划分为以下多个子模块:
Buffer模块:
Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能。先前的文章中我们使用string模拟过缓冲区,这里需要进行丰富功能!
Socket模块:
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。
Channel模块:
Channel模块是对⼀个描述符需要进⾏的IO事件管理的模块,实现对描述符可读,可写,错误等事件的管理操作,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
Connection模块:
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对⼀个通信套接字的整体的管理,每⼀个进⾏数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进⾏管理。
Acceptor模块:
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。
TimerQueue模块:
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执行,同时也可以通过刷新定时任务来延迟任务的执行。这个模块主要是对Connection对象的⽣命周期管理,对非活跃连接进⾏超时后的释放功能
Poller模块:
Poller模块是对多路转接epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。
EventLoop模块:
EventLoop模块可以理解就是我们上边所说的Reactor模块,进行事件派发。它是对Poller模块,TimerQueue模块,Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。EventLoop模块必然是⼀个对象对应⼀个线程的模块,线程内部的目的就是运行EventLoop的启动函数。
TcpServer模块:
这个模块是⼀个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模块。
3 实现时间轮模块
3.1 设计思想
时间轮思想:时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到了。
同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
3.2 定时任务类
这里先对时间轮模块进行一个简单实现:
首先需要设计一个TimeTask 定时任务类
class Timer{private: uint64_t _id; // 任务Id uint32_t _timeout; // 延迟时间 bool _canceled; // false表示没有被取消了 true表示被取消了 Task_t _task_cb; // 定时任务 Release_t _release; // 释放操作public: Timer(uint64_t id, uint32_t timeout, Task_t task) : _id(id), _timeout(timeout), _canceled(false), _task_cb(task) { } void Cancel() { _canceled = true; } ~Timer() { if (_canceled == false) _task_cb(); _release(); } // 设置释放操作 void SetRelease(Release_t release) { _release = release; } // 返回延迟时间 uint32_t DelayTime() { return _timeout; }};
3.3 TimeWheel时间轮类
注意:这里使用会使用智能指针!shared_ptr 指针 与 weak_ptr 指针配合使用,weak_ptr不会增加shared_ptr的计数!
成员变量: 二维数组时间轮,每一节点储存PtrTask数组当前时间指针 int _tick; 走到哪里 执行哪里的任务表盘最大数量(默认60秒)定时器任务ID映射表 unordered_map<uint64_t , WeakPtr> _timers 成员函数 构造函数析构函数哈希表操作函数 RemoveTimer 删除对应ID的对象TimerAdd函数添加定时任务 :创建一个TimeTask对象指针 ,设置RemoveTimer给_release ,将对象指针设置进哈希表中 注意使用WeakPtr!将任务添加到时间轮中TimerRefresh 刷新延迟定时任务:通过保存的定时器对象的Weak_ptr构造一个TaskPtr ,添加到时间轮中RunTimerTask 运行任务时间轮,向后移动一个位置,释放该位置的资源 会自动执行析构函数 运行任务class TimeWheel{ using TaskPtr = std::shared_ptr<Timer>; using WeakPtr = std::weak_ptr<Timer>; // 辅助shared_ptr 不会增加引用计数private: int _capacity; // 最大容量 表盘最大数量(默认60秒) int _tick; // 移动表针 std::vector<std::vector<TaskPtr>> _wheel; // 时间轮 std::unordered_map<uint64_t, WeakPtr> _timers; // 定时任务对象哈希表private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) { _timers.erase(it); } }public: TimeWheel() : _capacity(gnum), _tick(0), _wheel(_capacity, std::vector<TaskPtr>()) { } void TimerAdd(uint64_t id, int delay, Task_t cb) { TaskPtr p(new Timer(id, delay, cb)); p->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); _timers[id] = WeakPtr(p); // 进行映射 注意是WeakPtr // 放入时间轮 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(p); } void TimerRefresh(uint64_t id) { // 更新任务 auto it = _timers.find(id); if (it == _timers.end()) return; // 通过WeakPtr构造一个shared_ptr TaskPtr p = it->second.lock(); int delay = p->DelayTime(); // 放入时间轮 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(p); } void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); } void TimerCancel(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) return;//没有找到直接退出! TaskPtr p = it->second.lock(); if (p) p->Cancel(); } ~TimeWheel() { }};