J.U.C之AQS源码学习

AQS(AbstractQueuedSynchronizer)

概述

AQS 提供了一种实现阻塞锁和一系列依赖FIFO同步队列的同步器的框架,ReentrantLock,CountDownLatch等等都是基于AQS的基础上实现的。
使用AQS去实现自定义的同步器时,我们只需要实现对共享资源的获取和释放即可,对于阻塞线程的维护和唤醒都有AQS进行了实现,进一步简便了自定义同步器的实现。
在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作,还有定义状态信息。

  1. 获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取"操作的含义就是获取锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。例如在使用CountDownLatch时,“获取"操作意味着"等待并直到闭锁到达结束状态”,使用FutureTask时,则意味着"等待并直到任务已经完成”。
    根据同步器的不同,获取操作可以是独占操作(ReentrantLock),也可以是非独占操作(Semaphore,CountDownLatch)。
    如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquire,tryRelease和isHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquireShared,tryReleaseShared等。AQS中的acquire,acquiredShared,release,releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。
    在同步器的子类中,可以根据其获取操作和释放操作的语义,使用getState,setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类"获取"或"释放"同步器的操作是否成功。
  2. "释放"操作并不是一个可阻塞的操作,当执行"释放"时,所有在请求时被阻塞的线程都会开始执行。
  3. 一个类想要成为状态依赖的类,它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息(volatile关键字修饰),可通过getState,setState和compareAndSetState等方法来操作。这个整数可以表示为任意状态。例如ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始,正在运行,已完成或已取消)。
    同步器还可以自行管理一些额外的状态变量,例如ReentrantLock保存了锁的所有者的信息,以此来区分获取操作是重入的还是竞争的。

下图说明了获取操作和释放操作的伪代码:

对于获取操作:同步器判断当前状态是否允许获得操作,如果允许,则允许线程执行,并更新同步器的状态,否则获取操作将阻塞或失败。具体的判断实现,由同步器的语义决定。
对于释放操作:更新同步器的状态,并且如果新的状态允许某个被阻塞的线程获取成功,则解除等待队列中一个或多个线程的等待状态。

示例

OneShotLatch是一个使用AQS来实现的二元闭锁。包含两个方法:await,signal,分别对应获取操作和释放操作。
AQS状态用来表示闭锁的状态 - 关闭(0) 和 打开(1)。起初,闭锁是关闭的,任何调用await方法的线程都将阻塞并直到闭锁被打开。当通过调用signal打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行。

@ThreadSafe
public class oneShotLatch {
private final Sync sync = new Sync();

public void signal(){
sync.releaseShared(0);
}

public void await(){
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueueSynchronizer{
protected int tryAcquireShared(int ignored){
// 如果闭锁打开状态(1), 则获取操作成功, 否则失败
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored){
// 打开闭锁
setState(1);
// 返回true, 代表其他线程可以获取该闭锁
return true;
}
}
}

AQS 的acquireSharedInterruptibly 和 releaseShared 方法如下:

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

以上代码的执行流程如下:

  1. 当某个线程调用await方法时,该方法会调用AQS的acquireSharedInterruptibly方法,然后接着调用oneShotLatch的tryAcquireShared方法(在这个方法中实现同步器的获取语义),在tryAcquireShared的实现中返回一个值来表示获取操作能否执行。
    若闭锁是关闭的(获取失败),那么AQS会调用doAcquireSharedInterruptibly方法将线程放入等待线程队列中。
  2. 当某个线程调用signal方法时,该方法会调用AQS的releaseShared方法,然后接着调用oneSlotLatch的tryReleaseShared方法(在这个方法中实现同步器的释放语义),oneShotLatch的tryReleaseShared方法实现将无条件把闭锁设置为打开,通过返回true表示该同步器处于被释放的状态。
    若释放成功,AQS 会调用 doReleaseShared 方法会让所有等待的线程都尝试重新请求同步器,并且由于tryAcquireShared将返回成功,因此获取操作将成功。

源码分析

下述的源码分析基于JDK 8。
前面所提到的获取操作可能会阻塞线程,释放操作可能会对唤醒线程,并且也提到了阻塞线程是记录在一个同步队列上,那么AQS是如何来完成线程的阻塞和唤醒的呢?
AQS通过一个FIFO队列(双向链表,该队列也被称为同步队列)来保存阻塞线程,队列中每个Node节点就是对每一个等待获取资源的线程的封装,Node 作为一个静态内部类保存了线程本身(Thread),线程的等待状态(WaitStatus),双向链表指针(pre,next)以及独占模式还是共享模式等等。
其中在队列中的一个线程具有以下五种状态:

  1. CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化;
  2. SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL;
  3. CONDITION(-2):表示结点等待在Condition上。当调用Condition的await方法时,会将线程置为CONDITION状态,并将线程从同步队列(Sync queue)移动到条件队列中(Condition queue),当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从条件队列转移到同步队列中,将线程状态设置为0,等待获取同步锁;
  4. PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点;
  5. 0:新结点入队时的默认状态。

其中head结点所指的标杆结点,就是当前获取到资源的那个结点或null。

独占模式下资源获取

  1. acquire(int arg)
    acquire方法是独占模式下线程获取资源的顶层入口。如果获取到了资源(tryAcquire方法返回true,该方法实现具体由自定义同步器的语义决定),则线程直接返回;否则将当前线程加入等待队列,等待其他线程释放资源。

    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  2. tryAcquire(int arg)
    tryAcquire方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。

    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }

    AQS作为一个框架,它提供了tryAcquire方法交由自定义同步器来实现具体的获取资源的逻辑。

  3. addWaiter(Node mode)
    addWriter方法根据传递过来的mode(SHARED - 共享模式,EXCLUSIVE - 独占模式)创建一个新的node节点,队列的尾结点不为空,则将新结点插入到尾部,并返回Node结点。

    private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    // 将新node放置结尾
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    // 原子更新tail
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    // 上一步设置失败, 则通过enq入队
    enq(node);
    return node;
    }
  4. endq(final Node node)
    当tail结点为空时,addWaiter方法插入结点失败,此时会调用enq方法来进行新结点入队。

    private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
    Node t = tail;
    // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
    if (t == null) {
    if (compareAndSetHead(new Node()))
    tail = head;
    // 若此时tail结点不为空, 则将node结点放入尾部
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
  5. acquireQueued(final Node node,int arg)
    当执行完addWaiter方法时,关于当前线程的Node结点已经放入同步队列,当前线程进入到等待状态,等待其他线程释放资源。

    final boolean acquireQueued(final Node node, int arg) {
    // 标记是否成功拿到资源
    boolean failed = true;
    try {
    // 标记等待过程是否被中断
    boolean interrupted = false;
    // 自旋等待获取资源
    for (;;) {
    // 拿到前驱结点
    final Node p = node.predecessor();
    // 如果前驱结点是head, 则表示当前结点是"老二", 便有资格去获取资源
    if (p == head && tryAcquire(arg)) {
    //拿到资源后,将head指向该结点
    // 就是当前获取到资源的那个结点或null
    setHead(node);
    // setHead中node.prev已置为null,此处再将head.next置为null,
    // 就是为了方便GC回收以前的head结点
    p.next = null;
    // 成功获取资源
    failed = false;
    // 返回等待过程中是否被中断过
    return interrupted;
    }
    // 利用shouldParkAfterFailedAcquire方法将线程设置了watting状态
    // 如果等待过程中被中断过,哪怕只有那么一次,就interrupted标记为true
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    // 如果等待过程中没有成功获取资源
    // 那么取消结点在队列中的等待
    if (failed)
    cancelAcquire(node);
    }
    }
  6. shouldParkAfterFailedAcquire (Node pre,Node node)
    shouldParkAfterFailedAcquire 主要用来检查状态,防止前面的结点已经放弃了等待,但是没有被移除。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前一个结点的状态
    int ws = pred.waitStatus;
    // 前序结点为SIGNAL时, 表示后继结点在等待唤醒
    if (ws == Node.SIGNAL)
    return true;
    // 当前驱结点 > 0时, 即为CANCELLED状态时, 代表前驱节点中的线程已经放弃了等待
    if (ws > 0) {
    // 一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    // 如果前驱状态正常,那就把前驱的状态设置成SIGNAL, 表示后继节点在等待唤醒
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }
  7. parkAndCheckInterrupt()
    parkAndCheckInterrupt方法就是让线程去休息,真正进入等待状态。
    park方法会让当前线程进入waiting状态,可以有两种途径唤醒它:unpark方法/interrupt方法。

    private final boolean parkAndCheckInterrupt() {
    //调用park()使线程进入waiting状态
    LockSupport.park(this);
    //如果被唤醒,查看自己是不是被中断的
    return Thread.interrupted();
    }
  8. 总结

    1. 调用AQS提供的acquire方法来获取资源,在acquire方法中会调用自定义同步器所实现的tryAcquire方法判断是否可以成功获取资源,若获取成功,则直接返回,线程继续向前执行;
    2. 若第一步中获取资源失败,则调用addWaiter方法将当前线程封装成一个Node结点加入到同步队列的末尾;
    3. 入队成功后,调用acquireQueued方法将设置线程为waitting状态(具体是调用park方法)。需要注意是,在首次调用acquireQueued方法时就会进行一次判断,判断是否可以获取到资源,因为此时可能会其他线程进行了释放操作;
    4. 如果线程调用了unpark方法和interrupt方法,那么将唤醒当前线程,判断线程是否可以
      获取到资源,若可以进行获取操作,则将线程的Node结点设置为head结点,并返回。

独占模式下资源释放

  1. release(int arg)
    release方法是独占模式下线程获取资源的顶层入口。该方法会释放指定量的资源,与此同时,它会唤醒同步队列中的等待线程来获取资源。具体释放资源操作由tryAcquire方法实现,而该方法的具体实现由自定义同步器的语义决定。

    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    // 找到头结点
    Node h = head;
    if (h != null && h.waitStatus != 0)
    // 唤醒等待队列的下一个线程
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
  2. tryRelease (int arg)
    tryRelease方法尝试去释放指定量的资源。如果释放成功,则直接返回true,否则直接返回false。

    protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
    }

    AQS作为一个框架,它提供了tryRelease方法交由自定义同步器来实现具体的释放资源的逻辑。

  3. unparkSuccessor(Node node)
    unparkSuccessor方法用于唤醒同步队列中当前Node结点的下一个结点,唤醒其中的线程。

    private void unparkSuccessor(Node node) {
    // 获取当前结点的状态
    int ws = node.waitStatus;
    // 若结点状态为 SIGNAL(-1), CONDITION(-2), PROPAGATE(-3)
    // 将状态更新为0
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    // 找到下一个需要唤醒的结点
    Node s = node.next;
    // 若下一个需要唤醒的结点为null 或状态为已取消(CANCELLED - 1)
    if (s == null || s.waitStatus > 0) {
    s = null;
    // 从后往前查找, 找到一个可以被唤醒的结点
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    // 调用unpark方法, 唤醒s结点中的线程, 那么被唤醒的线程会继续执行acquireQueued中的获取操作
    LockSupport.unpark(s.thread);
    }

共享模式下资源获取

  1. acquireShared(int arg)
    acquireShared方法是共享模式下线程获取资源的顶层入口。它会获取指定量的资源,若获取成功则直接返回,否则失败进行同步队列等待资源的释放。

    public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
    }
  2. tryAcquireShared(int arg)
    AQS作为一个框架,它提供了tryRelease方法交由自定义同步器来实现具体获取资源的逻辑。相比于tryAcquire返回一个boolean值,tryAcquiredShared方法将返回一个整数:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。

    protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
    }
  3. doAcquireShared(int arg)
    doAcquireShared方法会调用addWaiter方法将当前线程封装为一个Node结点并插入到同步队列的末尾。之后进入到一个自旋操作,若满足条件(前序结点为head)会尝试获取一次资源,若获取失败,则调用parkAndCheckInterrupt方法中的park方法使得线程进行watting状态。
    doAcquireShared的获取流程和acquireQueued方法的获取流程大致一致,只是将中断之后的调用方法放置在了方法内部。

    private void doAcquireShared(int arg) {
    // 调用addWaiter方法将当前线程的结点插入到同步队列末尾
    final Node node = addWaiter(Node.SHARED);
    // 是否成功获取资源
    boolean failed = true;
    try {
    // 等待过程中是否被中断的标记
    boolean interrupted = false;
    for (;;) {
    // 获得前驱结点
    final Node p = node.predecessor();
    // 如果前驱结点是head, 则表示当前结点是"老二", 便有资格去获取资源
    if (p == head) {
    // 尝试获取资源
    int r = tryAcquireShared(arg);
    // 若返回整数则代表获取成功
    if (r >= 0) {
    // 将head指向自己,还有剩余资源可以再唤醒之后的线程
    setHeadAndPropagate(node, r);
    p.next = null;
    //如果等待过程中被打断过, 此时将中断补上
    if (interrupted)
    selfInterrupt();
    failed = false;
    return;
    }
    }
     
    // 判断状态,调用park方法使进入waiting状态, 等着被unpark()或interrupt()
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
  4. setHeadAndPropagate(Node node,int propagate)
    setHeadAndPropagate方法 首先会将当前Node结点设置为head结点,若还有剩余资源,则尝唤醒下一个邻居线程。

    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 记录旧头部以便下面的检查操作
    // 将head指向自己
    setHead(node);
    // 如果还有剩余量, 继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared(); // 唤醒下一个邻居线程
    }
    }

共享模式下资源释放

  1. releaseShared(int arg)
    releaseShared方法是共享模式下线程获取资源的顶层入口。该方法会释放指定量的资源,与此同时,它会唤醒同步队列中的等待线程来获取资源。具体释放资源操作由tryAcquireShared方法实现,而该方法的具体实现由自定义同步器的语义决定)

    public final boolean releaseShared(int arg) {
    // 尝试释放资源
    if (tryReleaseShared(arg)) {
    // 唤醒后继结点
    doReleaseShared();
    return true;
    }
    return false;
    }
  2. tryReleaseShared (int arg)
    tryReleaseShared方法尝试去释放指定量的资源。如果释放成功,则直接返回true,否则直接返回false。

    protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
    }

    AQS作为一个框架,它提供了tryReleaseShared方法交由自定义同步器来实现具体的释放资源的逻辑。

  3. doReleaseShared ()
    doReleaseShared方法主要用来唤醒同步队列上的线程。

    private void doReleaseShared() {
    // 自旋操作
    for (;;) {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    // 若head结点的状态为SIGNAL
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue;
    // 唤醒后继结点
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;
    }
    if (h == head)
    break;
    }
    }

参考链接

[1] 《并发编程实战》
[2] https://www.cnblogs.com/waterystone/p/4920797.html

Author: HB
Link: http://www.huangbin.fun/J-U-C之AQS源码学习.html
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.