当前位置:首页 » 《资源分享》 » 正文

【Linux】:日志策略 + 线程池(单例模式)

4 人参与  2024年12月21日 08:01  分类 : 《资源分享》  评论

点击全文阅读


 ?个人主页:island1314

?个人专栏:Linux—登神长阶

⛺️ 欢迎关注:?点赞 ??留言 ?收藏  ? ? ?


1. 前言 ?

? 下面开始,我们结合我们之前所做的所有封装,进行一个线程池的设计。在写之前,我们要做如下准备

准备 线程 的封装准备 锁 和 条件变量的封装引入日志,对线程进行封装

这里用到了我们之前博客用到的头文件及代码  【Linux】:多线程(互斥 && 同步)

2. 日志和策略 ?

?什么是设计模式

T行业这么火,涌入的人很多,俗话说林子大了啥鸟都有,大佬和菜鸡们两极分化的越来越严重。为了让菜鸡们不太拖大佬的后腿,于是大佬们针对一些经典的常见的场景,给定了一些对应的解决方案,这个解决方案就是 设计模式

? 认识日志

计算机中的日志是记录系统和软件运行中发生事件的文件,主要作用是监控运行状态、记录异常信息,帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具

?日志格式的指标

必须有的:时间戳、日志等级、日志内容可选的:文件名行号、进程线程相关信息id 等等

日志有现成的解决方案,如:spdlog、glog、Boost.Log、Log4cxx  等等,但是我们依旧采用自定义日志的方式。比如这里我们采用 设计模式-策略模式 来进行日志的设计

我们想要的日志格式如下:

[可读性很好的时间] [⽇志等级] [进程pid] [打印对应⽇志的⽂件名][⾏号] - 消息内容,⽀持可变参数[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [17] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [18] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [20] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [21] - hello world[2024-08-04 12:27:03] [WARNING] [202938] [main.cc] [23] - hello world

? 日志模式 详解 见代码注释 Log.hpp 

注意:我们这里用到的 "Mutex.hpp" 是我们之前在 【Linux】:多线程(互斥 && 同步) 实现的互斥量封装
#pragma once#include <iostream>#include <string>#include <unistd.h>#include <sstream>#include <fstream>#include <memory>#include <filesystem> // C++ 17 后的标准#include <unistd.h>#include <time.h>#include "Mutex.hpp"namespace LogMudule{    // 获取当前系统时间    std::string CurrentTime()    {        time_t time_stamp = ::time(nullptr);        struct tm curr;        localtime_r(&time_stamp, &curr); // 时间戳,获取可读性更高的时间信息        char buffer[1024];         // bug ->  ?        snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",                 curr.tm_year + 1900, // 这里需要 + 1900                 curr.tm_mon + 1,                 curr.tm_mday,                 curr.tm_hour,                 curr.tm_min,                 curr.tm_sec);        return buffer;    }    using namespace LockModule;    // 构成: 1. 构建日志字符串 2. 刷新落盘(①screen ②file)    //  1. 日志文件的默认路径 和 文件名    const std::string defaultlogpath = "./log/";    const std::string defaultlogname = "log.txt";    // 2. 日志等级    enum class LogLevel    {        DEBUG = 1,        INFO,        WARNING,        ERROR,        FATAL    };    std::string Level2String(LogLevel level)    {        switch(level)        {        case LogLevel::DEBUG:            return "DEBUG";        case LogLevel::INFO:            return "INFO";        case LogLevel::WARNING:            return "WARNING";        case LogLevel::ERROR:            return "ERROR";        case LogLevel::FATAL:            return "FATAL";        default:            return "None";        }    }    // 3. 刷新策略,只进行刷新,不提供方法    class LogStrategy    {    public: // 基类 需要 析构设成 虚方法        virtual ~LogStrategy() = default;        virtual void SyncLog(const std::string &message) = 0;    };    // 3.1 控制台策略(screen)    class  ConsoleLogStrategy : public LogStrategy    {    public:        ConsoleLogStrategy()        {}        ~ConsoleLogStrategy()        {}        void SyncLog(const std::string &message)        {            LockGuard lockguard(_lock);            std::cout << message << std::endl;          }    private:        Mutex _lock;    };    // 3.2 文件级(磁盘)策略    class FileLogStrategy : public LogStrategy    {    public:        FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname)          : _logpath(logpath),            _logname(logname)        {            // 确认 _logpath 是存在的            LockGuard lockguard(_lock);            if(std::filesystem::exists(_logpath))            {                return;            }            try{                std::filesystem::create_directories(_logpath); // 新建            }            catch(std::filesystem::filesystem_error &e)            {                std::cerr << e.what() << "\n";            }        }        ~FileLogStrategy()        {}        // 下面用的是 c++ 的文件操作        void SyncLog(const std::string &message)        {            LockGuard lockguard(_lock);            std::string log = _logpath + _logname; // ./log/log.txt            std::ofstream out(log, std::ios::app);  // 日志写入,一定是追加, app -> append            if(!out.is_open()) return ;            out << message << "\n";            out.close();        }    private:        std::string _logpath;        std::string _logname;        // 锁        Mutex _lock;    };    // 日志类:构建日志字符串,根据策略 进行刷新    class Logger    {    public:        Logger()        {            // 默认采用 ConsoleLogStrategy 策略            _strategy = std::make_shared<ConsoleLogStrategy>();        }                void EnableConsoleLog()        {            _strategy = std::make_shared<ConsoleLogStrategy>();        }        void EnableFileLog()        {            _strategy = std::make_shared<FileLogStrategy>();        }                ~Logger() {}        // 一条完整的信息: [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] + 日志的可变部分(<< "hello world" << 3.14 << a << b;)                class LogMessage        {        public:            LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger)                : _currtime(CurrentTime()),                  _level(level),                  _pid(::getpid()),                  _filename(filename),                  _line(line),                  _logger(logger)            {                std::stringstream ssbuffer;                ssbuffer << "[" << _currtime << "] "                         << "[" << Level2String(_level) << "] " // 对日志等级进行转换显示                         << "[" << _pid << "] "                         << "[" << _filename << "] "                         << "[" << _line << "] - ";                _loginfo = ssbuffer.str();            }            template<typename T>            LogMessage &operator<<(const T&info)            {                std::stringstream ss;                ss << info;                _loginfo += ss.str();                return *this;            }            ~LogMessage()            {                if (_logger._strategy)                {                    _logger._strategy->SyncLog(_loginfo);                }            }        private:            std::string _currtime; // 当前日志的时间            LogLevel _level;       // 日志等级            pid_t _pid;            // 进程pid            std::string _filename; // 源文件名称            int _line;             // 日志所在的行号            Logger &_logger;       // 负责根据不同的策略进行刷新            std::string _loginfo;  // 一条完整的日志记录        };        // 就是要拷贝(临时的 logmessage), 故意的拷贝        LogMessage operator()(LogLevel level, const std::string &filename, int line)        {            return LogMessage(level, filename, line, *this);        }    private:        std::shared_ptr<LogStrategy> _strategy; // 日志刷新的策略方案    };    Logger logger;#define LOG(Level) logger(Level, __FILE__, __LINE__)#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()#define ENABLE_FILE_LOG() logger.EnableFileLog()}

代码剖析:

? 我们这里实现了一个日志模块(LogModule),通过不同的日志策略(如控制台输出或文件输出)来记录日志。具体来说,它分为以下几个部分:

1. 获取当前时间 (CurrentTime 函数) 

该函数通过 C++ 标准库的 timelocaltime_r 获取当前系统时间并格式化为 YYYY-MM-DD HH:MM:SS 的字符串格式。这个时间戳用于日志记录。

2. 日志等级 (LogLevel 枚举)

定义了五个日志等级: DEBUG、INFO、WARNING、ERROR、FATAL ,表示日志的严重性。Level2String 函数根据 LogLevel 转换为对应的字符串形式,用于日志输出。

3. 日志策略(LogStrategy 类及其派生类)

LogStrategy 基类:定义了一个纯虚函数 SyncLog,用于实际的日志刷新操作(即将日志信息输出到目标介质)。ConsoleLogStrategy 类:实现了 SyncLog 方法,将日志信息输出到控制台。FileLogStrategy 类:实现了 SyncLog 方法,将日志信息输出到文件中。文件路径和文件名默认设置为 ./log/log.txt 。如果目录不存在,则会尝试创建目录。

4. 日志类 (Logger 类)

Logger 类负责管理日志的策略,可以切换控制台输出或文件输出。Logger 提供了两个方法: EnableConsoleLog:切换为控制台输出策略。EnableFileLog:切换为文件输出策略。内部有一个嵌套类 LogMessage,它用来生成具体的日志条目。每次创建一个 LogMessage 对象时,会自动格式化日志信息并最终将其传递给策略进行输出。

5. 日志信息格式化

LogMessage 类的构造函数会根据当前时间、日志等级、进程 ID、文件名和行号等信息来生成一条完整的日志记录。operator<< 被重载,以支持日志信息的追加,可以向日志信息中添加不同类型的内容(如字符串、数字等)。

6. 宏定义

LOG(Level):简化日志记录的调用方式,自动记录当前文件名和行号。ENABLE_CONSOLE_LOG():设置日志策略为控制台输出。ENABLE_FILE_LOG():设置日志策略为文件输出。

? 测试代码 Main.cc 如下:

#include "Log.hpp"using namespace LogMudule;int main(){    ENABLE_FILE_LOG(); // 开启日志文件的文件输出    LOG(LogLevel::DEBUG) << "Hello File";    LOG(LogLevel::DEBUG) << "Hello File";    LOG(LogLevel::DEBUG) << "Hello File";    LOG(LogLevel::DEBUG) << "Hello File";    ENABLE_CONSOLE_LOG(); // 往显示器输出    LOG(LogLevel::DEBUG) << "Hello IsLand";    LOG(LogLevel::DEBUG) << "Hello IsLand";    LOG(LogLevel::DEBUG) << "Hello IsLand";    LOG(LogLevel::DEBUG) << "Hello IsLand";    return 0;}

输出如下:

3. 线程池设计 ?

? 3.1 线程池的基本概念

? 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。

线程池 通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现,线程池中的线程可以从阻塞队列中获取任务进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。可以避免大量线程频繁创建或销毁所带来的时间成本,也可以避免在峰值压力下,系统资源耗尽的风险;并且可以统一对线程池中的线程进行管理,调度监控。

? 3.2 线程池应用场景

需要大量的线程来完成任务,且完成任务的时间比较短。  比如 WEB 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,比如我们可以想象一个热门网站的点击次数。但对于一些长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。  突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

? 3.3 线程池种类

创建 固定数量线程池 ,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口浮动线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口

这里我是选择固定线程个数的线程池来做样例进行实现

 

? 3.4 线程池实现 (ThreadPool)

Task.hpp

#pragma once#include <iostream>#include <string>#include <string>#include <functional>#include "Log.hpp"using namespace LogMudule;using task_t = std::function<void(std::string name)>;void Push(std::string name){    LOG(LogLevel::DEBUG) << "我是一个推送数据到服务器的一个任务, 我正在被执行" << "[" << name << "]";}
Log.hpp 是我们上面日志那里实现的,"Mutex.hpp""Cond.hpp" 是我们之前博客【Linux】:多线程(互斥 && 同步) 实现的互斥量和条件变量封装,而 "Thread.hpp" 也是我们之前博客 【Linux】:线程库简单封装 实现的 1 号 版本

ThreadPool.hpp 

#pragma once#include <iostream>#include <string>#include "Log.hpp"#include "Mutex.hpp"#include <queue>#include <vector>#include <memory>#include "Cond.hpp"#include "Thread.hpp"namespace ThreadPoolModule{    using namespace LogMudule;    using namespace ThreadModule;    using namespace CondModule;    using namespace LockModule;    // 用来做测试的线程方法    void DefaultTest()    {        while(true)        {            LOG(LogLevel::DEBUG) << "我是一个测试线程";            sleep(1);        }    }    using thread_t = std::shared_ptr<Thread>;    const static int defaultnum = 5;        template<typename T>    class ThreadPool    {    private:        bool IsEmpty() { return _taskq.empty();}        void HandlerTask(std::string name)        {            LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask 的逻辑";            while(true)            {                // 1. 拿任务                T t;                {                    LockGuard lockguard(_lock);                    while(IsEmpty() && _isrunning)                    {                        _wait_num++; // 线程等待数量加 1                        _cond.Wait(_lock);                        _wait_num--;                    }                    // 2. 任务队列不为空 && 线程池退出                    if(IsEmpty() && !_isrunning){                        break;                    }                    t = _taskq.front();                    _taskq.pop();                }                // 2. 处理任务                t(name); // 规定,未来所有的任务处理,全部都是必须提供 () 方法            }            LOG(LogLevel::INFO) << "线程" << name << "退出";        }    public:        ThreadPool(int num = defaultnum): _num(num), _wait_num(0), _isrunning(false)        {            for(int i = 0; i < _num; i++)            {                // _threads.push_back(std::make_shared<Thread>(DefaultTest)); // 构建 make_shared 对象                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // 构建 make_shared 对象,当我们传递名字的时候,需要带上 placeholder                                // _threads.push_back(std::make_shared<Thread>([this]{ThreadPool::HandlerTask})); // 构建 make_shared 对象                                LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";            }        }        void Equeue(T &&in)        {            LockGuard lockguard(_lock); // 保护临界资源,使其安全            if(!_isrunning) return ;            _taskq.push(std::move(in));            if(_wait_num > 0){                _cond.Notify();            }        }        void Start()        {            if(_isrunning) return ;            _isrunning = true; // 注意这里            for(auto &thread_ptr : _threads)            {                LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";                thread_ptr->Start();            }        }        void Wait()        {            for(auto &thread_ptr: _threads)            {                thread_ptr->Join();                LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";            }        }        void Stop()        {            LockGuard lockguard(_lock);            if(_isrunning)            {                // 3. 不能再入任务了                _isrunning = false;                // 1. 让线程自己退出 && 2. 历史的任务被处理完了                if(_wait_num > 0)                    _cond.NotifyAll();                             }        }        ~ThreadPool()        {        }    private:        std::vector<thread_t> _threads;         int _num;        int _wait_num;         std::queue<T> _taskq; // 任务队列 -> 临界资源         Mutex _lock;        Cond _cond;        bool _isrunning;    };}

1. 命名空间

namespace ThreadPoolModule{    using namespace LogMudule;    using namespace ThreadModule;    using namespace CondModule;    using namespace LockModule;}
使用了四个外部命名空间:LogMudule(日志模块),ThreadModule(线程模块),CondModule(条件变量模块),LockModule(锁模块)。这些模块分别提供日志记录、线程管理、条件变量和互斥锁功能。

2. 测试线程方法

void DefaultTest(){    while(true)    {        LOG(LogLevel::DEBUG) << "我是一个测试线程";        sleep(1);    }}
DefaultTest 是一个模拟的线程任务函数,线程会每秒打印一次日志。sleep(1) 用于让线程每秒钟执行一次。

3. thread_t 类型定义

using thread_t = std::shared_ptr<Thread>;
定义了 thread_t 类型,它是 std::shared_ptr<Thread> 的别名。这样可以方便管理线程对象的生命周期,避免手动管理内存。

4. ThreadPool 类

ThreadPool 类是一个模板类,用来管理和分配线程池中的任务。其主要功能包括任务队列管理、线程创建、启动、停止和等待等。

成员变量

_threads: 存储线程的容器,类型为 std::vector<thread_t>,存放线程的共享指针。_num: 线程池中的线程数量,默认值为 defaultnum(5)。_wait_num: 记录当前等待任务的线程数量。_taskq: 任务队列,存储待执行的任务。类型为 std::queue<T>_lock: 互斥锁,保护任务队列和其他临界资源。_cond: 条件变量,用于线程之间的同步,帮助线程等待任务或通知线程执行任务。_isrunning: 布尔标志,表示线程池是否正在运行。

成员函数

① IsEmpty

bool IsEmpty() { return _taskq.empty(); }
判断任务队列是否为空。

② HandlerTask

void HandlerTask(std::string name){    LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask 的逻辑";    while(true)    {        // 拿任务        T t;        {            LockGuard lockguard(_lock);            while(IsEmpty() && _isrunning)            {                _wait_num++; // 线程等待数量加 1                _cond.Wait(_lock);                _wait_num--;            }            if (IsEmpty() && !_isrunning) { break; }            t = _taskq.front();            _taskq.pop();        }        // 处理任务        t(name);  // 假设任务对象可以直接调用    }    LOG(LogLevel::INFO) << "线程" << name << "退出";}

HandlerTask 是每个线程执行的函数:

线程不断从任务队列中取任务执行。如果任务队列为空,线程会等待,直到有新任务被加入队列。线程通过条件变量 Wait() 等待任务,避免空转浪费 CPU 资源。t(name) 任务执行函数,通过传递线程名称进行日志记录。

③ Equeue

void Equeue(T &&in){    LockGuard lockguard(_lock);    if (!_isrunning) return;    _taskq.push(std::move(in));  // 把任务添加到队列中    if (_wait_num > 0)    {        _cond.Notify();  // 通知等待线程有新任务    }}
Equeue 将任务加入到任务队列 _taskq 中。通过 std::move 移动任务对象来避免不必要的拷贝。如果有线程在等待任务,调用 Notify() 通知这些线程继续工作。

④ Start

void Start(){    if (_isrunning) return;    _isrunning = true;    for (auto &thread_ptr : _threads)    {        LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";        thread_ptr->Start();    }}
Start 启动线程池中的所有线程。线程会调用 HandlerTask 开始处理任务。

⑤ Wait

void Wait(){    for (auto &thread_ptr : _threads)    {        thread_ptr->Join();        LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";    }}
Wait 等待所有线程执行完毕。Join 会阻塞当前线程直到每个线程完成任务。

⑥ Stop

void Stop(){    LockGuard lockguard(_lock);    if (_isrunning)    {        _isrunning = false;        if (_wait_num > 0)            _cond.NotifyAll();  // 通知所有等待的线程退出    }}

Stop 用于停止线程池的运行,标记 _isrunning = false,使线程池不再接收新任务。

如果有线程正在等待任务,则通过 _cond.NotifyAll() 唤醒这些线程,让它们退出。

5. 线程池的工作流程

线程池在构造时会创建指定数量的线程。任务通过 Equeue 方法提交到任务队列中。线程池通过 Start 启动所有线程,每个线程执行 HandlerTask,从任务队列中取任务并处理。通过 Wait 等待所有线程执行完毕。通过 Stop 停止线程池并通知所有线程退出。

6. 线程管理

线程池通过 std::shared_ptr<Thread> 来管理线程,避免了手动内存管理的问题。使用条件变量来实现线程的等待和通知机制。使用互斥锁 Mutex 确保任务队列的线程安全。

7. 日志记录

通过 LOG 宏记录线程池的各种操作,如线程的启动、任务的处理等。这些日志有助于调试和监控线程池的运行状态。

测试代码 ThreadPool.cc:

#include "ThreadPool.hpp"#include "Task.hpp"#include <memory>using namespace ThreadPoolModule;int main(){    ENABLE_CONSOLE_LOG(); // 默认开启 -- 日志显示策略    // ENABLE_FILE_LOG(); // 文件显示    std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();    tp->Start();    int cnt = 10;    while(cnt)    {        tp->Equeue(Push);        cnt--;        sleep(1);    }    tp->Stop();    sleep(3);    tp->Wait();    return 0;}

运行输入如下: 

当然我们也可以把我们的输出结果写到文件中,关闭默认开启,打开文件显示就行

4. 线程安全的单例模式 ??

? 4.1 单例模式的概念及性质

单例模式(Singleton Pattern)是一种创建型设计模式,它确保一个类只有一个实例,并提供全局访问点来访问这个实例。在C++中,单例模式通常用于需要控制资源访问或管理全局状态的情况下,比如日志记录器、配置管理器、线程池等

其特点如下:

某些类, 只应该具有一个对象(实例), 就称之为单例。 例如一个丈夫只能有一个老婆在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用⼀个单例的类来管理这些数据.

? 4.2 饿汉方式和懒汉方式的实现

这里我们打个比方 ?

吃完饭, 立刻洗碗, 这种就是饿汉方式。因为下一顿吃的时候可以立刻拿着碗就能吃饭.吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.

懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.

? 4.3 饿汉方式实现单例模式 

template <typename T>class Singleton {static T data;public:static T* GetInstance() {return &data;}};

? - 只要通过 Singleton 这个包装类来使用 T 对象, 则⼀个进程中只有⼀个 T 对象的实例。

? 4.4 懒汉方式实现单例模式

template <typename T>class Singleton {static T* inst;public:static T* GetInstance() {if (inst == NULL) {inst = new T();} return inst;}};

存在一个严重的问题,线程不安全
第一次调用 Getlnstance 的时候,如果两个线程同时调用,可能会创建出两份 T 对象的实例,但是后续再次调用就没有问题了

因此我们这里再来一个懒汉方式实现单例模式(线程安全版本)

// 懒汉模式, 线程安全template <typename T>class Singleton {volatile static T* inst;    // 需要设置 volatile 关键字, 否则可能被编译器优化.static std::mutex lock;public:static T* GetInstance() {if (inst == NULL) {     // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.lock.lock();        // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.if (inst == NULL) {inst = new T();} lock.unlock();} return inst;}}

注意事项:

加锁解锁的位置双重 if 判定,避免不必要的锁竞争volatile 关键字防止过度优化 (指令重排序和从寄存器中读取数据) (可见性和有序性)

? 4.5 单例式线程池实现(SigThreadPool)

这里说明一下,下面的代码也用到了我们之前线程池实现的代码,是基于之前的代码的一个改善

ThreadPool.hpp

#pragma once#include <iostream>#include <string>#include "Log.hpp"#include "Mutex.hpp"#include <queue>#include <vector>#include <memory>#include "Cond.hpp"#include "Thread.hpp"namespace ThreadPoolModule{    using namespace LogMudule;    using namespace ThreadModule;    using namespace CondModule;    using namespace LockModule;    // 用来做测试的线程方法    void DefaultTest()    {        while(true)        {            LOG(LogLevel::DEBUG) << "我是一个测试线程";            sleep(1);        }    }    using thread_t = std::shared_ptr<Thread>;    const static int defaultnum = 5;        template<typename T>    class ThreadPool    {    private:        bool IsEmpty() { return _taskq.empty();}        void HandlerTask(std::string name)        {            LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask 的逻辑";            while(true)            {                // 1. 拿任务                T t;                {                    LockGuard lockguard(_lock);                    while(IsEmpty() && _isrunning)                    {                        _wait_num++; // 线程等待数量加 1                        _cond.Wait(_lock);                        _wait_num--;                    }                    // 2. 任务队列不为空 && 线程池退出                    if(IsEmpty() && !_isrunning){                        break;                    }                    t = _taskq.front();                    _taskq.pop();                }                // 2. 处理任务                t(name); // 规定,未来所有的任务处理,全部都是必须提供 () 方法            }            LOG(LogLevel::INFO) << "线程" << name << "退出";        }        ThreadPool(const ThreadPool<T> &) = delete; // 拷贝构造设置为私有        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // 拷贝构造设置为私有        ThreadPool(int num = defaultnum): _num(num), _wait_num(0), _isrunning(false)        {            for(int i = 0; i < _num; i++)            {                _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // 构建 make_shared 对象,当我们传递名字的时候,需要带上 placeholder                LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";            }        }    public:        static ThreadPool<T> *getInstance()        {            if(instance == NULL)                {                LockGuard lockguard(mutex);                    if(instance == NULL)                    {                        LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";                        instance = new ThreadPool<T>();                    }            }            return instance;        }        void Equeue(T &&in)        {            LockGuard lockguard(_lock); // 保护临界资源,使其安全            if(!_isrunning) return ;            _taskq.push(std::move(in));            if(_wait_num > 0){                _cond.Notify();            }        }        void Start()        {            if(_isrunning) return ;            _isrunning = true; // 注意这里            for(auto &thread_ptr : _threads)            {                LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";                thread_ptr->Start();            }        }        void Wait()        {            for(auto &thread_ptr: _threads)            {                thread_ptr->Join();                LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";            }        }        void Stop()        {            LockGuard lockguard(_lock);            if(_isrunning)            {                // 3. 不能再入任务了                _isrunning = false;                // 1. 让线程自己退出 && 2. 历史的任务被处理完了                if(_wait_num > 0)                    _cond.NotifyAll();                             }        }        ~ThreadPool()        {        }    private:        std::vector<thread_t> _threads;         int _num;        int _wait_num;         std::queue<T> _taskq; // 任务队列 -> 临界资源         Mutex _lock;        Cond _cond;        bool _isrunning;        static ThreadPool<T> *instance;        static Mutex mutex; // 用来只保护单例    };    template<typename T>    ThreadPool<T> *ThreadPool<T>::instance = NULL;        template<typename T>    Mutex ThreadPool<T>::mutex; //只用来保护单例}

① ThreadPool 类:

这是一个模板类(ThreadPool<T>),其中 T 表示任务类型。线程池可以通过管理一组工作线程来异步执行任务。任务被排队等待,工作线程在有任务时会取出并执行。

② 构造函数与实例管理:

构造函数接受线程数 (_num),并创建相应数量的工作线程(使用 std::shared_ptr<Thread>)。getInstance() 函数确保线程池实例是单例的,采用了单例模式,确保只有一个 ThreadPool<T> 实例存在。

③ 任务管理:

Equeue() 函数将任务添加到任务队列中。如果有任何线程处于等待状态,它会通知这些线程继续执行。HandlerTask() 函数定义了每个工作线程执行的逻辑。线程会持续检查任务队列,处理任务,并在线程池停止时退出。

④ 线程控制:

Start() 启动所有线程,每个线程会执行 HandlerTask()Wait() 等待所有线程处理完任务后才继续执行程序。Stop() 停止线程池,并确保不再接收新的任务。

⑤ 同步机制:

使用 Mutex 来保护临界区(如修改任务队列或停止线程池)。使用 Cond 变量进行线程同步。如果任务队列为空,线程会等待,直到有新任务被添加时被通知。

⑥ 日志:

使用 LOG 函数(可能来自 LogModule)输出不同严重级别的日志(如 INFO、DEBUG)。这些日志有助于调试和显示线程池的运行状态。

测试代码 ThreadPool.cc

#include "ThreadPool.hpp"#include "Task.hpp"#include <memory>using namespace ThreadPoolModule;int main(){    ENABLE_CONSOLE_LOG(); // 默认开启 -- 日志显示策略    ThreadPool<task_t>::getInstance()->Start();    int cnt = 5;    while(cnt)    {        ThreadPool<task_t>::getInstance()->Equeue(Push);        cnt--;        sleep(1);    }    ThreadPool<task_t>::getInstance()->Stop();    ThreadPool<task_t>::getInstance()->Wait();        return 0;}

运行结果如下:

5. 共勉 ? 

【*★,°*:.☆( ̄▽ ̄)/$:*.°★* 】那么本篇到此就结束啦,如果有不懂 和 发现问题的小伙伴可以在评论区说出来哦,同时我还会继续更新关于【Linux】的内容,请持续关注我 !!


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/204440.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1