J.U.C 之阻塞队列

概述

在编程中接触的队列更多的为非阻塞队列,例如PriorityQueue,LinkedList。这些队列不会对当前线程进行阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略(例如使用Object::wait,Object::notify来实现线程阻塞)。阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

常用阻塞队列

  1. ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列;
  2. LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE;
  3. PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列;
  4. DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

阻塞队列方法

非阻塞队列方法

  1. add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
  2. remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
  3. offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
  4. poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
  5. peek():获取队首元素,若成功,则返回队首元素;否则返回null。

对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。
上述5个方法都没有采用同步策略。

阻塞队列方法

阻塞队列中也提供了add(E e),remove(),offer(E e),poll(),peek()方法,不同的是,阻塞队列对上述方法都采用了同步策略。

  1. put(E e):put方法用来向队尾存入元素,如果队列满,则等待;
  2. take():take方法用来从队首取元素,如果队列为空,则等待;
  3. offer(E e,long timeout, TimeUnit unit):offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
  4. poll(long timeout, TimeUnit unit):poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素。

阻塞队列源码分析

以ArrayBlockingQueue为例分析具体实现原理,基于JDK 8。

主要变量

// 实现了BlockingQueue接口, 继承了AbstractQueue类
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
  
private static final long serialVersionUID = -817911632652898426L;

// 使用一个数组来存储队列中的元素
private final E[] items;
// 队列头指针, take, poll, remove方法使用
private int takeIndex;
// 队列尾指针, put, offer, add方法使用
private int putIndex;
// 队列中元素个数
private int count;
 
// 可重入锁来控制所有的访问
private final ReentrantLock lock;
// 等待获取元素的条件变量
private final Condition notEmpty;
// 等待放置元素的条件变量
private final Condition notFull;
}

构造函数

// capacity参数指定容量
public ArrayBlockingQueue(int capacity) {
}
// capacity参数指定容量以及公平性
public ArrayBlockingQueue(int capacity, boolean fair) {
}
// capacity参数指定容量, 公平性以及初始化集合
public ArrayBlockingQueue(int capacity, boolean fair,
                                                  Collection<? extends E> c) {
}

put方法

put方法用来向队尾存入元素,如果队列满,则等待。

public void put(E e) throws InterruptedException {
// 判断传入元素是否为空
checkNotNull(e);
// 获得可一个重入锁, 保证线程安全性
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 若阻塞队列已满, 则将线程加入到同步队列中
while (count == items.length)
// 阻塞当前线程,将其加入到同步队列
notFull.await();
// 进行入队操作
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}

enqueue方法

enqueue方法完成具体的入队操作,并且唤醒等待获取元素的线程。

private void enqueue(E x) {
final Object[] items = this.items;
// 元素入队
items[putIndex] = x;
// 入队之后队列已满
if (++putIndex == items.length)
putIndex = 0;
// 队列元素 + 1
count++;
// 唤醒等待获取元素的线程
// 从等待队列中移除, 移到AQS同步队列中, 等待获取锁
notEmpty.signal();
}

take方法

take方法用来从队首取元素,如果队列为空,则等待。

public E take() throws InterruptedException {
// 获取可重入锁, 支持中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 若阻塞队列为空, 则将线程加入到同步队列中
// 等待获取资源
while (count == 0)
notEmpty.await();
// 进行出队操作
return dequeue();
} finally {
lock.unlock();
}
}

dequeue方法

dequeue方法完成具体的出队操作,并且唤醒等待入队元素的线程。

private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取队列中元素
E x = (E) items[takeIndex];
// 获取之后元素置为null
items[takeIndex] = null;
// 出队之后队列为空
if (++takeIndex == items.length)
t akeIndex = 0;
// 队列元素 - 1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒等待放置元素的线程
notFull.signal();
return x;
}

使用实例

使用Object::wait和Object::notify方法实现生产者-消费者模式。

public class ProducerAndConsumer {
// 缓存区
private LinkedList<String> buffer;
// 缓冲区大小
private int maxSize ;
 
ProducerAndConsumer(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList<String>();
}
  
public void set(String string) throws InterruptedException {
synchronized(buffer){
while (maxSize == buffer.size()){
// 释放锁, 并挂起当前线程
buffer.wait();
}
buffer.add(string);
buffer.notify();
}
}
 
public String get() throws InterruptedException {
String string;
synchronized(buffer){
while (maxSize == 0){
// 释放锁, 并挂起当前线程
buffer.wait();
}
string = buffer.poll();
buffer.notify();
}
return string;
}

}

使用阻塞队列实现生产者-消费者模式。

public class ProducerAndConsumer {
// 缓存区
private ArrayBlockingQueue<String> buffer;
// 缓冲区大小
private int maxSize ;

ProducerAndConsumer(int maxSize){
this.maxSize = maxSize;
buffer = new ArrayBlockingQueue<String>();
}

public void set(String string) throws InterruptedException {
// 阻塞队列已经提供了同步策略, 不需要再显示进行同步
try{
buffer.put(string);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String get() throws InterruptedException {
// 阻塞队列已经提供了同步策略, 不需要再显示进行同步
String str = null;
try{
str = buffer.put(string);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
}
}

参考资料

[1] https://www.cnblogs.com/dolphin0520/p/3932906.html

Author: HB
Link: http://www.huangbin.fun/J-U-C-之阻塞队列.html
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.