进程池的核心思想是预先创建一定数量的进程,并将这些进程放入一个池中。当有新的任务到来时,进程池会分配一个空闲的进程来处理这个任务。任务完成后,进程不会关闭,而是返回池中等待下一个任务。这种方式避免了频繁创建和销毁进程的开销,同时也限制了同时运行的进程数量,防止操作系统过载。
为什么要使用进程池
因为:在多核处理器的系统中,创建多个进程来处理并行任务,可以提高效率。相比于线程池,进程池更适用于 CPU 密集型任务,因为每个进程有独立的内存空间和 CPU 核心资源,不会受到全局解释器锁(GIL)的限制。
用C++模拟实现进程池
//Channel.hpp#ifndef __CHANNEL_HPP__ //如果我们没有定义防止头文件被重复被包含#define __CHANNEL_HPP__#include<iostream>#include<unistd.h>#include<string>using namespace std;// 先描述class channel{public: channel(int wtd, pid_t who) : _wtd(wtd), _who(who) { _name = "channel" + to_string(wtd) + "--" + to_string(who); } string Name() { return _name; } // 发送任务到管道 void Send(int cmd) { write(_wtd, &cmd, sizeof(cmd)); } void Close() { close(_wtd); } pid_t Id() { return _who; } ~channel() {}private: int _wtd; string _name; pid_t _who;};#endif
//ProcessPool.hpp#include <iostream>#include <string>#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <sys/wait.h>#include <vector>#include <functional>#include "Task.hpp"#include"Channel.hpp"using namespace std;using work_t = function<void()>;enum{ OK = 0, ARGCEROOR, PIPEROOR, FORKEROOR,};class ProcessPool{public: ProcessPool(int n,work_t w):nums(n),work(w) {}// work_t work:回调int InitProcesspool(){ for (int i = 0; i < nums; i++) { // 1 创建管道 int pipedor[2] = {0}; int n = pipe(pipedor); if (n < 0) { return PIPEROOR; } // 2 创建指定个数的进程 pid_t pid = fork(); if (pid < 0) { return FORKEROOR; } if (pid == 0) { // child 进程 close(pipedor[1]); dup2(pipedor[0], 0); work(); // 退出 exit(0); } // 父进程 close(pipedor[0]); channels.emplace_back(pipedor[1], pid); // channel c(pipedor[1],pid); // channels.push_back(c); // pid_t id = waitpid(pid, nullptr, 0); } return OK;}void DispatchTask(){ int who = 0; int n = 10; while (n--) { // 选择一个任务(int) int retask = tm.SelectTask(); // 选择一个子进程 channel &cee = channels[who++]; who %= channels.size(); cout << "*****************************************" << endl; cout << "send " << retask << "channl: " << cee.Name() << "---" << n << endl; cout << "*****************************************" << endl; // 派发任务 cee.Send(retask); sleep(1); }}void CloseProcessPool(){ for (auto &c : channels) { c.Close(); } for (auto &c : channels) { pid_t rid = waitpid(c.Id(), nullptr, 0); if (rid > 0) { cout << " child " << rid << " wait " << endl; } }}private: vector<channel> channels; int nums; work_t work;};
//Task.hpp#include<iostream>#include<unordered_map>#include<functional>#include<time.h>#include<sys/types.h>#include<unistd.h>using namespace std;using tast_t=function<void()>;void Task1(){ cout<<"我是一个打印任务 : pid :"<<getpid()<<endl; }void Task2(){ cout<<"我是一个日志任务 pid :"<<getpid()<<endl;}void Task3(){ cout<<"我是一个快速任务 pid :"<<getpid()<<endl;}static int number=0;class Task{private: unordered_map<int,tast_t> _tasks;public: Task() { srand(time(nullptr)); InitTask(Task1); InitTask(Task2); InitTask(Task3); } //插入任务 void InitTask(tast_t t) { _tasks[number++]=t; } int SelectTask() { return rand()%number; } //根据nums派发任务 void Excute(int nums) { if(_tasks.find(nums)==_tasks.end())return; _tasks[nums](); } ~Task() { }};Task tm;void Work(){ while (true) { int cmd = 0; int n = read(0, &cmd, sizeof(cmd)); if (n == sizeof(cmd)) { tm.Excute(cmd); } else if (n == 0) { cout << "pid " << getpid() << " quit " << endl; break; } else { } }}
//main.cc#include "ProcessPool.hpp"//#include "Task.hpp"void Usage(string proc){ cout << "Usage :" << proc << "process nums" << endl;}void DegBUg(vector<channel> &channels){ for (auto e : channels) { cout << e.Name() << endl; }}int main(int argc, char *argv[]){ if (argc != 2) { Usage(argv[0]); return ARGCEROOR; } // vector<channel> channels; int nums = stoi(argv[1]); ProcessPool *pp = new ProcessPool(nums, Work); // 创建进程池 pp->InitProcesspool(); // 派发任务 pp->DispatchTask(); // 关闭进程池 pp->CloseProcessPool(); // 创建进程池 // InitProcesspool(nums, channels, Work); // // DegBUg(channels); // // 派发任务 // DispatchTask(channels); // // 关闭进程池 // CloseProcessPool(channels); delete pp; return 0;}
进程池常见应用场景:
并行计算:对于 CPU 密集型的任务,使用进程池来并行计算每个任务。数据处理:当需要处理大量数据且每个数据的处理是独立的(例如图像处理、大规模数据分析等)时,可以使用进程池提高效率。Web 抓取:当需要抓取大量网页时,可以使用进程池进行并行抓取,从而提高抓取速度。
注意事项:
进程的开销:创建新进程有一定的开销,适用于长时间运行的任务。对于非常短的任务,使用线程池或直接串行执行可能更加高效。共享数据:进程池中的每个进程拥有独立的内存空间,因此进程之间不能直接共享数据。如果需要共享数据,可以使用multiprocessing.Manager
或 Queue
等工具。避免死锁:在多进程程序中,要特别注意避免死锁(例如多个进程互相等待),特别是在多个进程之间共享资源时。