当前位置:首页 » 《随便一记》 » 正文

【Linux 进程间的通信】—进程池(何为进程池,看了就懂)

19 人参与  2024年12月29日 16:00  分类 : 《随便一记》  评论

点击全文阅读


进程池的核心思想是预先创建一定数量的进程,并将这些进程放入一个池中。当有新的任务到来时,进程池会分配一个空闲的进程来处理这个任务。任务完成后,进程不会关闭,而是返回池中等待下一个任务。这种方式避免了频繁创建和销毁进程的开销,同时也限制了同时运行的进程数量,防止操作系统过载。

 

 为什么要使用进程池

因为:在多核处理器的系统中,创建多个进程来处理并行任务,可以提高效率。相比于线程池,进程池更适用于 CPU 密集型任务,因为每个进程有独立的内存空间和 CPU 核心资源,不会受到全局解释器锁(GIL)的限制。

用C++模拟实现进程池

//Channel.hpp#ifndef __CHANNEL_HPP__ //如果我们没有定义防止头文件被重复被包含#define __CHANNEL_HPP__#include<iostream>#include<unistd.h>#include<string>using namespace std;// 先描述class channel{public:    channel(int wtd, pid_t who) : _wtd(wtd), _who(who)    {        _name = "channel" + to_string(wtd) + "--" + to_string(who);    }    string Name()    {        return _name;    }    // 发送任务到管道    void Send(int cmd)    {        write(_wtd, &cmd, sizeof(cmd));    }    void Close()    {        close(_wtd);    }    pid_t Id()    {        return _who;    }    ~channel() {}private:    int _wtd;    string _name;    pid_t _who;};#endif
//ProcessPool.hpp#include <iostream>#include <string>#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <sys/wait.h>#include <vector>#include <functional>#include "Task.hpp"#include"Channel.hpp"using namespace std;using work_t = function<void()>;enum{    OK = 0,    ARGCEROOR,    PIPEROOR,    FORKEROOR,};class ProcessPool{public:    ProcessPool(int n,work_t w):nums(n),work(w)    {}// work_t work:回调int InitProcesspool(){    for (int i = 0; i < nums; i++)    {        // 1 创建管道        int pipedor[2] = {0};        int n = pipe(pipedor);        if (n < 0)        {            return PIPEROOR;        }        // 2 创建指定个数的进程        pid_t pid = fork();        if (pid < 0)        {            return FORKEROOR;        }        if (pid == 0)        {            // child 进程            close(pipedor[1]);            dup2(pipedor[0], 0);            work();            // 退出            exit(0);        }        // 父进程        close(pipedor[0]);        channels.emplace_back(pipedor[1], pid);        // channel c(pipedor[1],pid);        // channels.push_back(c);        // pid_t id = waitpid(pid, nullptr, 0);    }    return OK;}void DispatchTask(){    int who = 0;    int n = 10;    while (n--)    {        // 选择一个任务(int)        int retask = tm.SelectTask();        // 选择一个子进程        channel &cee = channels[who++];        who %= channels.size();        cout << "*****************************************" << endl;        cout << "send  " << retask << "channl: " << cee.Name() << "---" << n << endl;        cout << "*****************************************" << endl;        // 派发任务        cee.Send(retask);        sleep(1);    }}void CloseProcessPool(){    for (auto &c : channels)    {        c.Close();    }    for (auto &c : channels)    {        pid_t rid = waitpid(c.Id(), nullptr, 0);        if (rid > 0)        {            cout << "     child   " << rid << "   wait   " << endl;        }    }}private:    vector<channel> channels;    int nums;    work_t work;};
//Task.hpp#include<iostream>#include<unordered_map>#include<functional>#include<time.h>#include<sys/types.h>#include<unistd.h>using namespace std;using tast_t=function<void()>;void Task1(){    cout<<"我是一个打印任务 : pid :"<<getpid()<<endl;    }void Task2(){    cout<<"我是一个日志任务 pid :"<<getpid()<<endl;}void Task3(){    cout<<"我是一个快速任务 pid :"<<getpid()<<endl;}static int  number=0;class Task{private:   unordered_map<int,tast_t> _tasks;public:    Task()    {        srand(time(nullptr));        InitTask(Task1);        InitTask(Task2);        InitTask(Task3);    }    //插入任务    void InitTask(tast_t t)    {        _tasks[number++]=t;    }    int SelectTask()    {        return rand()%number;    }    //根据nums派发任务    void Excute(int nums)    {        if(_tasks.find(nums)==_tasks.end())return;        _tasks[nums]();    }    ~Task()    {    }};Task tm;void Work(){    while (true)    {        int cmd = 0;        int n = read(0, &cmd, sizeof(cmd));        if (n == sizeof(cmd))        {            tm.Excute(cmd);        }        else if (n == 0)        {            cout << "pid  " << getpid() << "   quit  " << endl;            break;        }        else        {        }    }}
//main.cc#include "ProcessPool.hpp"//#include "Task.hpp"void Usage(string proc){    cout << "Usage :" << proc << "process nums" << endl;}void DegBUg(vector<channel> &channels){    for (auto e : channels)    {        cout << e.Name() << endl;    }}int main(int argc, char *argv[]){    if (argc != 2)    {        Usage(argv[0]);        return ARGCEROOR;    }    // vector<channel> channels;    int nums = stoi(argv[1]);    ProcessPool *pp = new ProcessPool(nums, Work);    // 创建进程池    pp->InitProcesspool();    // 派发任务    pp->DispatchTask();    // 关闭进程池    pp->CloseProcessPool();    // 创建进程池    // InitProcesspool(nums, channels, Work);    // // DegBUg(channels);    // // 派发任务    // DispatchTask(channels);    // // 关闭进程池    // CloseProcessPool(channels);    delete pp;    return 0;}

 

进程池常见应用场景:

并行计算:对于 CPU 密集型的任务,使用进程池来并行计算每个任务。数据处理:当需要处理大量数据且每个数据的处理是独立的(例如图像处理、大规模数据分析等)时,可以使用进程池提高效率。Web 抓取:当需要抓取大量网页时,可以使用进程池进行并行抓取,从而提高抓取速度。

注意事项:

进程的开销:创建新进程有一定的开销,适用于长时间运行的任务。对于非常短的任务,使用线程池或直接串行执行可能更加高效。共享数据:进程池中的每个进程拥有独立的内存空间,因此进程之间不能直接共享数据。如果需要共享数据,可以使用 multiprocessing.Manager 或 Queue 等工具。避免死锁:在多进程程序中,要特别注意避免死锁(例如多个进程互相等待),特别是在多个进程之间共享资源时。

总结:进程池是一种高效的并行处理机制,尤其适用于 CPU 密集型任务


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/208844.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1