概述
在编程中接触的队列更多的为非阻塞队列,例如PriorityQueue,LinkedList。这些队列不会对当前线程进行阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略(例如使用Object::wait,Object::notify来实现线程阻塞)。阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。
常用阻塞队列
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列;
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE;
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列;
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
阻塞队列方法
非阻塞队列方法
add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
peek():获取队首元素,若成功,则返回队首元素;否则返回null。
对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。
上述5个方法都没有采用同步策略。
阻塞队列方法
阻塞队列中也提供了add(E e),remove(),offer(E e),poll(),peek()方法,不同的是,阻塞队列对上述方法都采用了同步策略。
put(E e):put方法用来向队尾存入元素,如果队列满,则等待;
take():take方法用来从队首取元素,如果队列为空,则等待;
offer(E e,long timeout, TimeUnit unit):offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll(long timeout, TimeUnit unit):poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素。
阻塞队列源码分析
以ArrayBlockingQueue为例分析具体实现原理,基于JDK 8。
主要变量
// 实现了BlockingQueue接口, 继承了AbstractQueue类 public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; // 使用一个数组来存储队列中的元素 private final E[] items; // 队列头指针, take, poll, remove方法使用 private int takeIndex; // 队列尾指针, put, offer, add方法使用 private int putIndex; // 队列中元素个数 private int count; // 可重入锁来控制所有的访问 private final ReentrantLock lock; // 等待获取元素的条件变量 private final Condition notEmpty; // 等待放置元素的条件变量 private final Condition notFull; }
构造函数
// capacity参数指定容量 public ArrayBlockingQueue(int capacity) { } // capacity参数指定容量以及公平性 public ArrayBlockingQueue(int capacity, boolean fair) { } // capacity参数指定容量, 公平性以及初始化集合 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { }
put方法
put方法用来向队尾存入元素,如果队列满,则等待。
public void put(E e) throws InterruptedException { // 判断传入元素是否为空 checkNotNull(e); // 获得可一个重入锁, 保证线程安全性 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 若阻塞队列已满, 则将线程加入到同步队列中 while (count == items.length) // 阻塞当前线程,将其加入到同步队列 notFull.await(); // 进行入队操作 enqueue(e); } finally { // 释放锁 lock.unlock(); } }
enqueue方法
enqueue方法完成具体的入队操作,并且唤醒等待获取元素的线程。
private void enqueue(E x) { final Object[] items = this.items; // 元素入队 items[putIndex] = x; // 入队之后队列已满 if (++putIndex == items.length) putIndex = 0; // 队列元素 + 1 count++; // 唤醒等待获取元素的线程 // 从等待队列中移除, 移到AQS同步队列中, 等待获取锁 notEmpty.signal(); }
take方法
take方法用来从队首取元素,如果队列为空,则等待。
public E take() throws InterruptedException { // 获取可重入锁, 支持中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 若阻塞队列为空, 则将线程加入到同步队列中 // 等待获取资源 while (count == 0) notEmpty.await(); // 进行出队操作 return dequeue(); } finally { lock.unlock(); } }
dequeue方法
dequeue方法完成具体的出队操作,并且唤醒等待入队元素的线程。
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取队列中元素 E x = (E) items[takeIndex]; // 获取之后元素置为null items[takeIndex] = null; // 出队之后队列为空 if (++takeIndex == items.length) t akeIndex = 0; // 队列元素 - 1 count--; if (itrs != null) itrs.elementDequeued(); // 唤醒等待放置元素的线程 notFull.signal(); return x; }
使用实例
使用Object::wait和Object::notify方法实现生产者-消费者模式。
public class ProducerAndConsumer { // 缓存区 private LinkedList<String> buffer; // 缓冲区大小 private int maxSize ; ProducerAndConsumer(int maxSize){ this.maxSize = maxSize; buffer = new LinkedList<String>(); } public void set(String string) throws InterruptedException { synchronized(buffer){ while (maxSize == buffer.size()){ // 释放锁, 并挂起当前线程 buffer.wait(); } buffer.add(string); buffer.notify(); } } public String get() throws InterruptedException { String string; synchronized(buffer){ while (maxSize == 0){ // 释放锁, 并挂起当前线程 buffer.wait(); } string = buffer.poll(); buffer.notify(); } return string; } }
使用阻塞队列实现生产者-消费者模式。
public class ProducerAndConsumer { // 缓存区 private ArrayBlockingQueue<String> buffer; // 缓冲区大小 private int maxSize ; ProducerAndConsumer(int maxSize){ this.maxSize = maxSize; buffer = new ArrayBlockingQueue<String>(); } public void set(String string) throws InterruptedException { // 阻塞队列已经提供了同步策略, 不需要再显示进行同步 try{ buffer.put(string); } catch (InterruptedException e) { e.printStackTrace(); } } public String get() throws InterruptedException { // 阻塞队列已经提供了同步策略, 不需要再显示进行同步 String str = null; try{ str = buffer.put(string); } catch (InterruptedException e) { e.printStackTrace(); } return str; } }
参考资料
[1] https://www.cnblogs.com/dolphin0520/p/3932906.html