目录
前言:
RingQueue编写生产消费模型
认识接口
开始编写
前言:
前文我们介绍了基于阻塞队列实现生产消费模型,使用阻塞队列实现生产消费模型中,我们学习到了pthread_cond_wait的第二个参数的重要性,不仅会解锁,此时锁被其他人持有,当条件满足的时候,就重新竞争锁,所以在pthread_cond_wait函数这里是不会存在死锁的。
第二个重要的条件是while,首先不说函数等待失败,直接造成错误的情况,如果多个消费者同时在等待,但是只有一把锁,这种情况就需要重新判断,所以使用的while,以上是前文两个比较重要的讨论。
但是纵观全文,我们发现基于阻塞队列实现生产消费模型的时候,是必须要对条件是否满足进行判断的,于是就有人思考,可不可以让其他东西代替我们判断条件是否满足呢?毕竟我们这里代码量小,不容易出错,对于代码量的情况来说的话,就不好说了,所以我们应该借助什么东西呢?
答案是信号量,对于信号量存在的PV操作,以及信号量是将整个资源划分成一个一个的小部分,我们在进程间通信部分已经介绍了,这里我们不再花费更多的时间介绍信号量,但是我们有必要认识一下信号量的接口,基于信号量编写生产消费模型我们可以尝试使用环形队列,我们使用vector来模拟。
更多细节就直接进入主题吧!
RingQueue编写生产消费模型
认识接口
对于信号量的接口,Ubuntu系统也可以可以man到的,我们需要用到的接口有sem_init,sem_wait和sem_post,sem_destroy,这是在POSIX里面的信号量的操作,而不是system V的操作,两者要分清楚,那么对应的头文件是semaphore.h文件:
int sem_init(sem_t *sem, int pshared, unsigned int value);int sem_wait(sem_t *sem);int sem_post(sem_t *sem);int sem_destroy(sem_t *sem);
其中稍微复杂一点就是sem_init函数了,其他的多简单,有了线程学习之前的基础,我们基本上可以直接使用了。sem_init函数的第一个参数是sem_t类型的,和phtread_t的一样,第二个参数我们直接设置为0,这个参数决定的是线程间共享信号量还是进程间共享信号量,0代表的线程间共享,第三个参数就是申请多少个信号量。
对于信号量来说,在System -V里面,我们简短的介绍了pv操作,对于P操作相当于信号量--,对于V操作相当于信号量++,而因为是对临界资源的访问,所以这两个操作应该是原子的。
对于以上函数的返回值都是成功返回0,失败返回-1,并且错误码被设置。
可是为什么使用以上的函数就可以不用判断条件是否满足了呢?
sem_wait()函数
功能:相当于P操作,用于等待信号量变为正数(即请求资源)。如果信号量的值为0,则调用线程将被阻塞,直到信号量的值大于0为止。参数:包括指向信号量对象的指针(sem)。返回值:成功时返回0,失败时返回-1并设置errno。sem_post()函数
功能:相当于V操作,用于释放信号量(即释放资源)。信号量的值将增加1。如果有任何线程在等待该信号量,则其中一个线程将被唤醒。参数:包括指向信号量对象的指针(sem)。返回值:成功时返回0,失败时返回-1并设置errno。因为PV操作的函数,会自动判断信号量,或者说条件是否满足,如果满足,那么就操作线程。
以上是对于sem_*函数的介绍。
开始编写
同前文的blockqueue一样,我们先来确定成员变量应该有谁?
首先,我们既然是基于环形队列和信号量编写的,那么生产者和消费者的位置,我们应该知道吧?那么就应该有两个变量用来表示位置。对于位置来说,后续操作肯定是免不了%操作的,虽然有PV操作,但是我们应该防止越界。
其次,信号量的变量肯定要有吧?在构造函数和析构函数的时候初始化 + 析构就可以了。可是我们应该引入几个信号量呢?在最开始生产者生产的时候,消费者一个信号量都不能消费吧?那么这不就是初识信号量为0吗?当生产者进行了V操作之后,消费者的信号量+1(重点),消费者消费了同理,所以应该有两个信号量。
最后,对于锁来说,我们前文加锁是为了防止对于临界资源的访问出错,这里需要加吗?当然要加了,对于环形队列的访问难道不是临界资源吗?当然是了,所以同样需要锁,可是需要几把锁呢?一把锁吗?如果只用一把锁,那不就是基于阻塞队列编写的吗?(只能有一个人操作)这里既然有了信号量的加持,都有人帮我们判断条件了,我们不妨设计两把锁,因为生产者和消费者是并发进行的,要满足消费者和消费者,生产者和生产者之间的关系。即互斥。对于生产者和消费者之间,我们都不用担心互斥,信号量已经帮我们做了,我们只要保证同步即可。
const int default_cap = 5;template<typename T>class RingQueue{public:private: std::vector<T> _ring_queue; int _max_cap; int _c_step; int _p_step; sem_t _data_sem; sem_t _space_sem; pthread_mutex_t _c_mutex; pthread_mutex_t _p_mutex;};
那么就是正常的构造函数和析构函数:
const int default_cap = 5;template<typename T>class RingQueue{public: RingQueue(int max_cap = default_cap) : _max_cap(max_cap), _c_step(0), _p_step(0), _ring_queue(max_cap) { sem_init(&_data_sem, 0, 0); sem_init(&_space_sem, 0, _max_cap); pthread_mutex_init(&_c_mutex, nullptr); pthread_mutex_init(&_p_mutex, nullptr); } ~RingQueue() { sem_destroy(&_data_sem); sem_destroy(&_space_sem); pthread_mutex_destroy(&_c_mutex); pthread_mutex_destroy(&_p_mutex); }private: std::vector<T> _ring_queue; int _max_cap; int _c_step; int _p_step; sem_t _data_sem; sem_t _space_sem; pthread_mutex_t _c_mutex; pthread_mutex_t _p_mutex;};
有了析构和构造,我们现在只需要关心pop和push了,其中我们不妨简单封装一下PV操作的函数:
private: void P(sem_t& sem) { sem_wait(&sem); } void V(sem_t& sem) { sem_post(&sem); }
对于push操作,因为访问了临界资源,所以一定要加锁,加锁之后,生产者的位置需要++,并且要保证++之后不会被超出队列的总长度,那么就要模运算,就没了:
void Push(const T& in) { P(_space_sem); pthread_mutex_lock(&_p_mutex); _ring_queue[_p_step] = in; _p_step++; _p_step %= _max_cap; pthread_mutex_unlock(&_p_mutex); V(_data_sem); }
对于PV操作一定是二者都要同时进行的,不会只执行单独的一个,都是全部执行。
对于pop操作一样的:
void Pop(T* out) { P(_data_sem); pthread_mutex_lock(&_c_mutex); *out = _ring_queue[_c_step]; _c_step++; _c_step %= _max_cap; pthread_mutex_unlock(&_c_mutex); V(_space_sem); }
对于头文件RingQueue.hpp的编写就结束了,我们在主函数部分编写测试代码试试:
void *Consumer(void *args){ RingQueue<int> *c = static_cast<RingQueue<int> *>(args); while (true) { int out = 0; c->Pop(&out); std::cout << "Consumer pop data-> " << out << std::endl; //sleep(1); }}void *Productor(void *args){ RingQueue<int> *q = static_cast<RingQueue<int> *>(args); while (true) { int in = rand() % 100 + 1; q->Push(in); std::cout << "Productor push data-> " << in << std::endl; // sleep(1); }}int main(){ srand(time(nullptr) ^ getpid()); pthread_t c, p; RingQueue<int> *rq = new RingQueue<int>; pthread_create(&c, nullptr, Consumer, (void *)rq); pthread_create(&p, nullptr, Productor, (void *)rq); pthread_join(c, nullptr); pthread_join(p, nullptr); return 0;}
以上是main函数的编写,实际上,这份代码也支持多生产多消费,毕竟锁在那里,那么以上就是环形队列编写生产消费模型的介绍。
感谢阅读!