目录
一、阻塞队列1.1 生产者消费者模型1.2 Java提供的阻塞队列1.3 实现一个简单生产者消费者模型 二、自己实现阻塞队列2.1 成员变量2.2 构造方法2.3 put方法2.4 take方法2.5 最终代码
一、阻塞队列
阻塞队列:是一种特殊的队列,也有先进先出的特性。它是一种线程安全的队列。
有以下两个特性:
阻塞队列的一个重要应用场景就是:实现生产者消费模型。
1.1 生产者消费者模型
生产者消费者模型:是多线程编程中的一种典型的编码技巧。用来降低生产者与消费者之间的耦合度。生产者和消费者之间的交易场所就是一个阻塞队列。
这样的模型的优势有以下两个:
解耦合,降低代码耦合度:像如果是A B两个服务器,之间直接进行交互,如果对A或者B中的数据进行修改操作,大概率就会影响到另一个服务器。而使用阻塞队列作为交易平台,我们修改服务器的数据时,由于阻塞队列中的结构固定,两个服务器之间的耦合度就降低。削峰削谷:
在服务器中,波峰就是请求量高的时候,波谷就是请求量低的时候。
如果是AB两个服务器之间进行交互,当上游服务器A经历波峰,将大量请求传给服务器B的时候,服务器就有可能挂掉。
因为上游服务器,干的活简单,消耗的资源少;而下游服务器,干的活复杂,消耗的的资源就多。
但是如果我们将阻塞队列作为交易平台,那么服务器B就可以依据自己的节奏从队列中拿请求。
但是这样的模型也会付出代价:
引入阻塞队列之后整体结构会更加复杂。比如本来是AB两个服务器之间的交互,但引入一个作为阻塞队列的服务器(这种称为消息队列),就需要部署这个服务器,还要与AB实现交互。效率也会有影响。1.2 Java提供的阻塞队列
提供了一个BlockingDeque的接口(需要导java.util.concurrent.BlockingQueue
包):
主要使用下面3个实现了BlockingDeque接口的来实例化阻塞队列:
链表实现的,LinkedBlockingDeque(需要导java.util.concurrent.LinkedBlockingDeque
包):数组实现的,ArrayBlockingDeque需要导`java.util.concurrent.ArrayBlockingDeque包):
小根堆实现的,PriorityBlockingDeque需要导
java.util.concurrent.PriorityBlockingDeque
包):在阻塞队列中我们虽然可以使用队列中常用的出队列入队列方法,但是那些方法不带阻塞效果。带阻塞效果的入队列方法是put,出队列方法是take,这两个方法都会抛出InterruptedException异常。
1.3 实现一个简单生产者消费者模型
实现一个简单的生产者消费者模型:
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;public class Demo { public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(1000); Thread producer = new Thread(() -> { int i = 0; while(true) { try { blockingQueue.put(i++); System.out.println(i + "入队列成功"); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumer = new Thread(() -> { while(true){ try { int x = blockingQueue.take(); System.out.println(x + "出队列成功"); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); consumer.start(); }}
二、自己实现阻塞队列
我们使用数组来实现一个循环队列。
不知道循环队列的实现的可以看下面这个链接:队列
2.1 成员变量
使用capacity代表数组的最大长度;使用size表示数组中元素的个数;head表示队头元素的下标;tail表示队尾元素的下标。private int capacity = 0xffff; private String[] elem ;//存储数组 private int size;//存储元素个数 private int head;//队头 private int tail;//队尾
2.2 构造方法
提供两个构造方法:
使用默认最大值初始化数组;使用传的参初始化数组。 public MyBlockingQueue(int capacity) { this.capacity = capacity; elem = new String[this.capacity]; } public MyBlockingQueue() { elem = new String[this.capacity]; }
2.3 put方法
由于put和take方法都涉及到修改判断等操作,为避免原子性问题带来线程安全问题对该这些操作都要加锁。
在put方法中我们需要在队列满的时候发生阻塞,使用wait来等待。而在Java官方文档给出了建议我们使用循环语句来使用wait。
因为wait是除了notify唤醒外,还有可能被interrupt方法唤醒抛出异常,如果只要if,不用while,抛出异常后就会继续执行下面的逻辑,带来bug。而使用循环就不会,抛出异常后,会再次判断循环条件。
最后在入队成功后发出一个通知notify来唤醒由于队列空而阻塞等待的线程。
public void put(String s) throws InterruptedException { synchronized (this) { while(size == elem.length) { this.wait(); } elem[tail] = s; tail = (tail+1) % elem.length; size++; this.notify(); } }
2.4 take方法
当队列为空的时候,跟put一样使用wait来阻塞。
最后在出队成功后发出一个通知notify来唤醒由于队列满而阻塞等待的线程。
public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = elem[head]; head = (head+1) % capacity; size--; this.notify(); return ret; } }
2.5 最终代码
最终我们自己实现的一个简单的阻塞队列就如下:
public class MyBlockingQueue { private int capacity = 0xffff; private String[] elem ;//存储数组 private int size;//存储元素个数 private int head;//队头 private int tail;//队尾 public MyBlockingQueue(int length) { elem = new String[length]; } public MyBlockingQueue() { elem = new String[this.capacity]; } public void put(String s) throws InterruptedException { synchronized (this) { while(size == elem.length) { this.wait(); } elem[tail] = s; tail = (tail+1) % capacity; size++; this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = elem[head]; head = (head+1) % capacity; size--; this.notify(); return ret; } }}