J.U.C之常用同步器(ReentrantLock, CountDownLatch, Semaphore, CyclicBarrier, Condition)源码分析

常用同步器

J.U.C中许多常用同步器都是基于AQS实现,主要有以下:CountDownLatch,Semaphore,CyclicBarrier,ReentrantLock,Condition。
下面对上述同步器进行源码分析, 源码分析下基于JDK 8。

ReentrantLock

概述

ReentrantLock是在JDK 1.5 引入的一个显示锁,可以实现多线程的互斥访问,可以实现和synchronized关键字同样的功能。ReentrantLock只支持独占方式的获取操作,是对lock接口的实现。
ReentrantLock将同步状态用于保存锁获取操作的次数(0 - 锁可被获取;整数i - 锁已被其他线程获取,因为ReentrantLock可重入,所以状态为整数i(i >= 1)),并且还维护了一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。并且在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁;在tryAcquire中使用这个域来区分获取操作是重入的还是竞争的。

源码分析

下面的源码分析都是基于非公平的ReentrantLock实现。

获取锁操作

  1. lock()
    ReentrantLock对外提供lock方法给用户调用,用户不需要知道ReentrantLock的具体实现细节就可完成锁的获取操作。

    final void lock() {
    acquire(1);
    }
  2. acquire(int arg)
    lock会调用acquire方法完成获取操作。若tryAcquire方法返回true,即成功获取到锁,就直接返回,线程继续往下运行;若获取失败则调用acquireQueud方法阻塞当前线程,等待锁的释放。
    tryAcquire方法交由ReentrantLock来实现,它自己来实现判断逻辑 - 何时才算获取锁成功。
    阻塞一个线程由acquireQueued来实现。关于如何阻塞一个线程和如何唤醒线程,参考《JU.C之AQS源码分析》

    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  3. tryAcquire(int arg)
    ReentrantLock 实现 tryAcquire方法来获取资源。

    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 得到当前资源状态
    int c = getState();
    // 当前同步状态 == 0, 表示锁未被获取
    if (c == 0) {
    // 获取锁, 原子性修改同步状态
    if (compareAndSetState(0, acquires)) {
    // 设置为独占模式的线程, 即owner状态
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 若是重复获取, 即重入获取, 则修改同步状态为c+acquires
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

释放锁操作

  1. unlock()
    ReentrantLock对外提供unlock方法给用户调用,用户不需要知道ReentrantLock的具体实现细节就可完成锁的释放操作。

    public void unlock() {
    sync.release(1);
    }
  2. release(int arg)
    unlock方法会调用tryRelease方法去释放锁。若释放锁成功,则代表下一个等待锁的线程可以来获取锁,此时调用用unparkSuccesor方法唤醒;若释放锁失败,则直接返回。
    tryRelease方法交由ReentrantLock来实现,它自己来实现判断逻辑 - 何时才算释放锁成功。
    关于如何唤醒下一个线程,参考《JU.C之AQS源码分析》

    public final boolean release(int arg) {
    // tryRelease(arg)返回true, 则表示释放锁成功,
    // 可以去唤醒下一个等待线程了
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    // 唤醒下一个等待线程
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
  3. tryRelease(int releases)
    ReentrantLock 实现 tryRelease方法来释放资源。

    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 当前线程不是获取锁的线程, 抛出错误
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    // 若c == 0, 当前锁没有进行重入
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    // 设置锁状态, 因为是可重入的, 可能需要多次释放操作
    setState(c);
    return free;
    }

应用实例

public class Test {
private ArrayList<Integer> arrayList = new ArrayList<Integer>();
Lock lock = new ReentrantLock();    

public static void main(String[] args)  {
final Test test = new Test();

new Thread(){
public void run() {
test.insert(Thread.currentThread());
};
}.start();

new Thread(){
public void run() {
test.insert(Thread.currentThread());
};
}.start();
}  

public void insert(Thread thread) {
// 获取锁
lock.lock();
try {
System.out.println(thread.getName()+"得到了锁");
for(int i=0;i<5;i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
}finally {
System.out.println(thread.getName()+"释放了锁");
// 释放锁
lock.unlock();
}
}
}

扩展

  1. 公平锁与非公平锁:
    公平锁尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该锁,这种就是公平锁;
    非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
  2. ReentrantLock 与 synchronized对比
    1. ReentrantLock 是对lock接口的实现,而synchronized是Java中的关键字,synchronized是内置的语言实现;
    2. synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而ReentrantLock 在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用ReentrantLock 时需要在finally块中释放锁;
    3. ReentrantLock 可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
    4. 通过ReentrantLock 可以知道有没有成功获取锁,而synchronized却无法办到; ReentrantLock 可以提高多个线程进行读操作的效率;
    5. 可以通过 ReentrantLock lock = new ReentrantLock(true) 来实现公平锁。

Condition

概述

JUC提供了Lock可以方便的进行锁操作,但是有时候我们也需要对线程进行条件性的阻塞和唤醒,以此来模拟线程间的相互协助,这时我们就需要condition条件变量,它就像是在线程上加了多个开关,可以方便的对持有锁的线程进行阻塞和唤醒。
官方解释如下:

条件(也称为条件队列或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其"等待")。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式释放相关的锁,并挂起当前线程,就像Object.wait 做的那样

Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。wait,notify等需要和synchronized结合适用,Condition则绑定在一个锁(Lock)上,一般作Lock的内部实现。
两者的主要区别见下图:

源码分析

Condition interface

Condition接口提供了如下方法以供实现:

/*============阻塞=============*/
// 当前线程在接到信号或被中断之前一直处于等待状态.=
void await() throws InterruptedException;
// 当前线程在接到信号之前一直处于等待状态 [注意:该方法对中断不敏感]
void awaitUninterruptibly();
// 当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
// 返回值表示剩余时间, 如果在'nanosTimeout'之前唤醒,那么返回值
='nanosTimeout - 消耗时间',
// 如果返回值'<= 0' ,则可以认定它已经超时了
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
// 如果没有到指定时间就被通知, 则返回 true ,否则表示到了指定时间,返回返回 false
boolean awaitUntil(Date deadline) throws InterruptedException;

/*===========唤醒==============*/
// 唤醒一个等待线程, 将线程从条件队列移到AQS同步队列中
// 该线程从等待方法返回前必须获得与Condition相关的锁
void signal();
// 唤醒所有等待线程, 将所有等待线程从条件队列中移到AQS步队列中
void signalAll();

Condition接口仅有一个实现类为ConditionObject,该类定义为AQS的内部类。

线程挂起操作

  1. await()
    线程在调用await方法后将执行挂起操作,即新建 Condition 节点加入到 Condition 的队列尾部,并阻塞当前线程在条件队列中,直到线程等待的某个条件为真时才会被唤醒。在当前线程持有锁的基础上释放锁资源,以便其他线程获取锁资源。

    public final void await() throws InterruptedException {
    // 线程被中断,则抛出异常
    if (Thread.interrupted())
    throw new InterruptedException();
    // 将节点加入到条件队列(Condition Queue)中
    Node node = addConditionWaiter();
    // 释放当前线程的锁,因为锁可重入, 所以是完全释放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 当前节点是否在AQS同步队列中
    // 如果不在同步队列中, 则park当前线程, 说明该线程还未满足竞争锁资源的
    // 条件;如果在或者被中断过, 则退出循环
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }

    // 当走到这步时, 代表已经调用了signal/signalAll方法, 重新竞争锁资源
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    // 清理取消等待的节点
    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }
  2. addConditionWaiter()
    addConditionWaiter会将新建一个状态为Condition状态的节点并加入到条件队列的末尾。

    private Node addConditionWaiter() {
    Node t = lastWaiter;
    // Node的节点状态如果不为CONDITION,
    // 则表示该节点不处于等待状态,需要清除节点
    if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
    }
    // 当前线程新建节点,状态 CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
    firstWaiter = node;
    else
    t.nextWaiter = node;
    lastWaiter = node;
    return node;
    }
  3. fullyRelease()
    fullyRelease方法负责完成释放该线程持有的锁,因为例如 ReentrantLock 是可以重入的。

    final int fullyRelease(Node node) {
    boolean failed = true;
    try {
    // 持有锁的数量
    int savedState = getState();
    // 完全释放锁, 因为ReentrantLock可重入
    if (release(savedState)) {
    failed = false;
    return savedState;
    } else {
    throw new IllegalMonitorStateException();
    }
    } finally {
    // 释放锁失败, 将节点状态置为CANCELLED
    if (failed)
    node.waitStatus = Node.CANCELLED;
    }
    }
  4. isOnSyncQueue()
    isOnSyncQueue方法判断一个节点是否在AQS的同步队列上。

    final boolean isOnSyncQueue(Node node) {
    // 状态为 Condition或者前驱节点为 null, 返回 false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
    // 后继节点不为 null,肯定在同步队列中
    if (node.next != null)
    return true;
    // 从同步队列尾部开始向前查找, 判断是否在同步队列上
    return findNodeFromTail(node);
    }
  5. unlinkCancelledWaiters()
    unlinkCancelledWaiters方法负责将条件队列中状态不为Condition的节点删除。

    private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 记录上一个Node节点
    Node trail = null;
    while (t != null) {
    Node next = t.nextWaiter;
    // 节点状态不为CONDITION, 将其删除
    if (t.waitStatus != Node.CONDITION) {
    t.nextWaiter = null;
    if (trail == null)
    firstWaiter = next;
    else
    trail.nextWaiter = next;
    if (next == null)
    lastWaiter = trail;
    }
    else
    trail = t;
    t = next;
    }
    }

线程唤醒操作

  1. signal()
    signal方法会将Condition 的头节点移动到AQS同步队列尾部,让其等待再次获取锁资源。会首先唤醒条件队列中第一个节点,即等待时间最长的节点。

    public final void signal() {
    //检测当前线程是否拥有锁
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    //头节点,唤醒条件队列中的第一个节点
    Node first = firstWaiter;
    if (first != null)
    doSignal(first);
    }
  2. doSignal()
    doSignal方法会依次移除条件队列的节点,并调用transferForSignal方法将节点移动到AQS同步队列中。

    private void doSignal(Node first) {
    do {
    //修改头结点,完成旧头结点的移出工作
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    // 将条件队列中的节点移动到同步队列中
    } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
    }
  3. transferForSignal()
    transferForSignal会将条件队列的节点移动到AQS同步队列中。并修改当前节点的前一个结点状态为SIGNAL。

    final boolean transferForSignal(Node node) {
    //将该节点从状态CONDITION改变为初始状态0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

    //将节点加入到同步队列中去,返回的是同步队列中node节点前面的一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
    return true;
    }

使用实例

使用Condition实现生产者消费者模型:

public class ConditionExample {
// 缓冲区
private LinkedList<String> buffer;
// 缓冲区大小
private int maxSize ;
private Lock lock;
// 定义两个Condition, 即两个条件队列
private Condition fullCondition;
private Condition notFullCondition;

ConditionExample(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList<String>();
lock = new ReentrantLock();
fullCondition = lock.newCondition();
notFullCondition = lock.newCondition();
}

public void set(String string) throws InterruptedException {
//获取锁
lock.lock();
try {
while (maxSize == buffer.size()){
//缓冲区已经满了,不能再进行添加,
// 调用await方法将添加的线程设置为等待状态,并且释放锁
notFullCondition.await();
}
// 添加数据到缓存区
buffer.add(string);
// 告知可以从缓冲区获取数据了 ,将fullCondition条件队列的线程
// 移动到AQS同步队列中, 等待唤醒并获取锁
fullCondition.signal();
} finally {
// 记得释放锁
lock.unlock();
}
}

public String get() throws InterruptedException {
String string;
lock.lock();
try {
while (buffer.size() == 0){
// 缓冲区为空,不能进行获取,
// 调用await方法将获取的线程设置为等待状态,并且释放锁
fullCondition.await();
}
// 从缓存区取数据
string = buffer.poll();
// 告知可以添加数据到缓存区了,将notFullCondition条件队列的线程
// 移动到AQS同步队列中, 等待唤醒并获取锁
notFullCondition.signal();
} finally {
// 释放锁
lock.unlock();
}
return string;
}
}

使用Object::wait,notify实现生产者消费者模型:

public class ProducerAndConsumer {
// 缓存区
private LinkedList<String> buffer;
// 缓冲区大小
private int maxSize ;

ConditionTest(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList<String>();
}

public void set(String string) throws InterruptedException {
synchronized(buffer){
while (maxSize == buffer.size()){
// 释放锁, 并挂起当前线程
buffer.wait();
}
buffer.add(string);
buffer.notify();
}
}

public String get() throws InterruptedException {
String string;
synchronized(buffer){
while (maxSize == 0){
// 释放锁, 并挂起当前线程
buffer.wait();
}
string = buffer.poll();
buffer.notify();
}
return string;
}
}

参考资料

[1] http://www.iocoder.cn/JUC/sike/Condition/
[2] https://www.cnblogs.com/dolphin0520/p/3920385.html

Semaphore

概述

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源,或者说是同时执行某个特定操作的数量。
Semaphore将AQS同步状态用来保存当前可用许可的数量,许可的数量可以通过构造函数的参数指定。

  1. 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可;
  2. 访问资源后,使用release释放许可。
    Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

源码分析

获取许可操作

  1. acquire()
    Semaphore对外提供acquire方法,用户不需要了解其他详细的获取细节,线程通过调用该方法来获取许可,以完成对特定资源的访问。

    public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
  2. acquireSharedInterruptibly()
    acquireSharedInterruptibly方法会调用tryAcquireShared方法来获取许可。若返回正整数(大于等于0),表示获取许可成功,线程继续向前执行;否则将调用doAcquireSharedInterruptibly方法将线程加入到同步队列,等待许可的释放。
    tryAcquireShared方法交由Semaphore来实现,它自己来实现判断逻辑 - 怎么才算获取许可成功。
    xxxxShared表示该获取操作是非独占操作(Shared)。
    关于如何阻塞一个线程,将线程加入到同步队列中,参考《JU.C之AQS源码分析》

    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 线程中断
    if (Thread.interrupted())
    throw new InterruptedException();
    // 若返回整数, 则表示获取需求成功, 线程继续向后执行
    // 否则, 获取失败, 将线程加入到同步队列, 等待许可的释放
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
  3. tryAcquireShared()
    Semaphore实现tryAcquireShared方法来完成具体的许可获取逻辑。

    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }
    final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
    // 得到可用许可
    int available = getState();
    // 计算剩余许可
    int remaining = available - acquires;
    // 原子更新可用许可, 并返回剩余许可数
    if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    }
    }

释放许可操作

  1. release()
    Semaphore对外提供release方法,用户不需要了解其他详细的许可释放细节就可以完成许可释放操作。

    public void release() {
    sync.releaseShared(1);
    }
  2. releaseShared()
    release方法会调用releaseShared方法去释放许可。若释放许可成功,则调用doReleaseShared方法来唤醒同步队列中的等待线程来尝试重新获取需求;若释放许可失败,则直接返回。
    tryReleaseShared方法交由Semaphore来实现,它自己来实现释放许可逻辑。
    关于如何释放同步队列中的一个等待线程,参考《JU.C之AQS源码分析》

    public final boolean releaseShared(int arg) {
    // 许可释放成功
    if (tryReleaseShared(arg)) {
    // 唤醒等待获取许可的线程
    doReleaseShared();
    return true;
    }
    return false;
    }
  3. tryReleaseShared()
    Semaphore通过tryReleaseShared方法来实现许可的释放逻辑。

    protected final boolean tryReleaseShared(int releases) {
    // 自旋操作
    for (;;) {
    // 得到当前许可数量
    int current = getState();
    // 修改许可数量
    int next = current + releases;
    if (next < current) // overflow
    throw new Error("Maximum permit count exceeded");
    // 原子更新许可数量
    if (compareAndSetState(current, next))
    return true;
    }
    }

应用实例

Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。
读取数据到数据库中,可以启动几十个线程并发读取来进行读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。

public class SemaphoreTest{
private static final int COUNT = 40;
private static Executor executor = Executors.newFixedThreadPool(COUNT);
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[]args){
for(inti=0;i<COUNT;i++){
executor.execute(newThreadTest.Task());
}
}

static class Task implements Runnable{
@Override
public void run(){
try{
// 获取许可, 因为许可数量只有十个, 所有最多只能有10个线程进行操作
semaphore.acquire();
//todo 存数据过程
semaphore.release();
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
}
}
}
}

参考资料

[1] https://www.jianshu.com/p/0090341c6b80

CountDownLatch

概述

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

实现原理

CountDownLatch是通过一个计数器来实现的,通过同步状态保存当前计数值,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

源码分析

获取闭锁操作

  1. await()
    CountDownLatch对外部提供await方法,用户不需要了解闭锁获取的细节,线程通过调用该方法来获取闭锁。
    在await() 方法中会调用与Semaphore相同的acquireShared方法来获取锁,并且获取失败之后的处理也一致,都是AQS提供了公共方法。主要不同体现在tryAcquireShared 方法的实现上。

    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
  2. tryAcquireShared(int acquires)
    当同步状态等于0时,表示其他线程已经全部执行完毕,获取闭锁成功,线程继续往下执行;若失败,则将线程放入到同步队列中,并阻塞线程。

    protected int tryAcquireShared(int acquires) {
    // 同步状态等于0时,代表其他线程已经执行完毕, 获取闭锁成功
    return (getState() == 0) ? 1 : -1;
    }

释放闭锁操作

  1. countDown()
    CountDownLatch对外部提供countDown方法,其他线程调用该方法来释放资源。
    在countDown() 方法中会调用与Semaphore相同的releaseShared方法来释放资源,并且释放成功之后的处理也一致,都会尝试唤醒同步队列上的等待线程,都是AQS提供了公共方法。主要不同体现在tryReleaseShared 方法的实现上。

    public void countDown() {
    sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
    // 释放资源成功
    if (tryReleaseShared(arg)) {
    // 尝试唤醒同步队列的等待线程
    doReleaseShared();
    return true;
    }
    return false;
    }
  2. tryReleaseShared(int args)
    tryReleaseShared方法将会减少同步状态,当同步状态等于0时,返回true,会调用doReleaseShared 方法唤醒同步队列的等待线程。

    protected boolean tryReleaseShared(int releases) {
    // 减少同步状态直到为0
    for (;;) {
    int c = getState();
    if (c == 0)
    return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
    return nextc == 0;
    }
    }

应用实例

public class CountDownLatchExample {

private final static int threadCount = 200;

public static void main(String[] args) throws InterruptedException {
// 创建一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
executorService.execute(() ->{
try {
test(threadNum);
} catch (Exception e){
log.info("{}", e);
}finally {
countDownLatch.countDown();
}
});
}

// 只有当threadCount 为0时, 即200个线程执行完毕之后, 才会执行后面的语句
countDownLatch.await();
// 等待10ms, 若200个线程没有执行完毕, 则当前线程会继续执行
// countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
executorService.shutdown();
}

public static void test (int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}

使用场景

  1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数器为1的CountDownLatch,并让其他所有线程都在这个锁上等待,只需要调用一次countDown()方法就可以让其他所有等待的线程同时恢复执行。
  2. 开始执行前等待N个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统都已经启动和运行了。
  3. 死锁检测:一个非常方便的使用场景是你用N个线程去访问共享资源,在每个测试阶段线程数量不同,并尝试产生死锁。

参考文档

[1] https://www.jianshu.com/p/4b6fbdf5a08f

CountDownLatch和Semaphore结合实例

public class ConcurrencyTest{
//请求数
public static intc lientTotal=1000;
//并发请求数
public static int threadTotal=50;
//共享计数量
public static int count=0;

public static void main(String[] args)throws InterruptedException{
//定义线程池
ExecutorService executorService=Executors.new CachedThreadPool();
//定义信号量,并设置运行并发数
final Semaphore semaphore=new Semaphore(threadTotal);
//定义CountDownLatch,在所有的请求完成之后输出结果
final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);

//线程执行
for(inti=0;i<clientTotal;i++){
executorService.execute(()->{
try{
//获取信号量-最大并发数为50
semaphore.acquire();
count();
//释放信号量
semaphore.release();
}catch(InterruptedException e){
log.info("InterruptedException",e);
}
//进行count--
  countDownLatch.countDown();
});
}
//只有countDown到0的时候,才会唤起线程继续执行,不然会挂起继续等待
countDownLatch.await();
//关闭线程池
executorService.shutdown();
//在所有子线程执行完之后,打印count值
log.info("count:{}",count);
}

private static void count(){
count++;
}
}

CyclicBarrier

概述

CycliBarrier也称为循环屏障,它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,循环是指因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

实现原理

CyclicBarrier将同步状态设置为到达屏障的线程数量。当某一个线程到达屏障之后,将当前线程加入到同步队列中并阻塞,并且数量减一,当数量为0时,唤醒被阻塞的所有线程,让它们继续执行。

源码分析

  1. CyclicBarrier(int parties) / CyclicBarrier(int parties, Runnable barrierAction)
    CyclicBarrier提供了两种构造方法,提供了参数有:需要到达屏障的线程数,到达指定屏障后优先执行的动作。

    public CyclicBarrier(int parties) {
    this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 屏障数量
    this.parties = parties;
    // 当前还有多少线程没有到达屏障
    this.count = parties;
    this.barrierCommand = barrierAction;
    }
  2. await()
    CyclicBarrier对外提供await方法。用户不需要了解循环屏障的具体实现细节就可使用其提供的功能。

    public int await() throws InterruptedException, BrokenBarrierException {
    try {
    // dowait(timed, nanos)
    // 当timed = true时, 表示设置等待超时时间, nanos为时才
    return dowait(false, 0L);
    } catch (TimeoutException toe) {
    throw new Error(toe);
    }
    }
  3. dowait(boolean timed, long nanos)
    await通过调用dowait方法来实现线程的阻塞和唤醒。若还有剩余线程未到达屏障,线程通过调用该方法来完成阻塞操作,将线程阻塞在屏障前;若所有线程已到达屏障前,则唤醒所有被阻塞的线程,让它们继续向前执行。

    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {
    // 定义一个可重入锁, 保证多线程安全性
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    // 构建一个"代"(Generation)对象
    // 同一个Generation表示同一代
    final Generation g = generation;
    // 如果broken=true表示当前屏障被损坏了,抛出异常
    if (g.broken)
    throw new BrokenBarrierException();
    // 如果线程被中断过, 设置屏障为损坏状态    
    if (Thread.interrupted()) {
    breakBarrier();
    throw new InterruptedException();
    }
    // 需要到达屏障的线程数减 - 1
    int index = --count;
    // 若数量为0, 表示所有线程都到达了屏障
    if (index == 0) {
    boolean ranAction = false;
    try {
    final Runnable command = barrierCommand;
    // 表示到达屏障之后,如果我们有设置barrierCommand, 则优先执行
    if (command != null)
    command.run();
     //执行到这里的时候,说明所有线程都执行await()方法(到达了屏障)
    // 且设置的barrierCommand也已经执行完了
    //接下来要做的事情就是换代(所以CyclicBarrier是通过换代的方式
    // 实现重新计数的)
    //换代之后相当于进入一个新的周期,所有线程在后续中又可以通过
    // await()阻塞一次
    ranAction = true;
    nextGeneration();
    return 0;
    } finally {
    // 如果ranAction = false说明当前屏障还有流程没执行完,
    // 所以需要屏障设置会损坏状态
    if (!ranAction)
    breakBarrier();
    }
    }

    // 进行自旋操作直到count=0,调用breakBarrier方法(表示屏障有问题的场景),
    // 中断或者超时
    for (;;) {
    try {
    // 没有设置超时时间,
    // 调用Condition的await()阻塞,相当于把线程加入
    // 到Condition队列中阻塞, 等到调用signalAll方法唤醒
    if (!timed)
    trip.await();
    // 设置阻塞超时时间
    else if (nanos > 0L)
    nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
    // 如果当前屏障没有换代,也没有损坏,
    // 那么就设置为损坏状态之后再抛出中断异常
    if (g == generation && ! g.broken) {
    breakBarrier();
    throw ie;
    } else {
    // 中断线程等待
    Thread.currentThread().interrupt();
    }
    }
    // 如果屏障已经被损坏了
    if (g.broken)
    throw new BrokenBarrierException();
                                
    // 如果发现已经换代了, 就不继续循环了,
    // 直接返回剩余屏障数
    if (g != generation)
    return index;
    // 表示超时时间到了
    if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
    }
    }
    } finally {
    lock.unlock();
    }
    }
  4. nextGeneration()
    若所有线程都到达了屏障,将条件队列中的线程切换到AQS同步队列中,并唤醒所有等待线程。

    private void nextGeneration() {
    // 条件队列中的等待线程切换AQS同步队列中, AQS同步队列中等待线程依次被唤醒获取锁
    trip.signalAll();
    // 开始一个循环屏障
    count = parties;
    generation = new Generation();
    }

使用实例

public class CyclicBarrierExample {

// 设置屏障数量为5
// private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// 到达指定屏障之后, 优先执行barrierAction中的动作
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
log.info("Priority Action!!");
});

public static void main(String[] args) throws InterruptedException {

ExecutorService executorService = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
  doSomething(threadNum);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}

private static void doSomething(int threadNum) throws InterruptedException, BrokenBarrierException {
Thread.sleep(1000);
log.info("{}: I am ready!", threadNum);
// 某个线程初始化完毕, 则阻塞等待, 若阻塞数量达到5个, 则一起执行每个线程后面的方法
cyclicBarrier.await();
log.info("{}: I am done!", threadNum);
}
}

适用场景

CyclicBarrier适用多线程计算数据,最后合并计算结果的场景。
相比于CountDownLatch,CyclicBarrier能适用于更多场景,例如,如果计算发生错误,可以重置计数 器,并让线程重新执行一次。因为CountDownLatch计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。

参考资料

[1] http://blog.sikacode.com/article/40#CyclicBarrier_68

Author: HB
Link: http://www.huangbin.fun/J-U-C之常用同步器-ReentrantLock-CountDownLatch-Semaphore-CyclicBarrier-Condition-源码分析.html
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.