AQS(AbstractQueuedSynchronizer)
概述
AQS 提供了一种实现阻塞锁和一系列依赖FIFO同步队列的同步器的框架,ReentrantLock,CountDownLatch等等都是基于AQS的基础上实现的。
使用AQS去实现自定义的同步器时,我们只需要实现对共享资源的获取和释放即可,对于阻塞线程的维护和唤醒都有AQS进行了实现,进一步简便了自定义同步器的实现。
在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作,还有定义状态信息。
- 获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取"操作的含义就是获取锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。例如在使用CountDownLatch时,“获取"操作意味着"等待并直到闭锁到达结束状态”,使用FutureTask时,则意味着"等待并直到任务已经完成”。
根据同步器的不同,获取操作可以是独占操作(ReentrantLock),也可以是非独占操作(Semaphore,CountDownLatch)。
如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquire,tryRelease和isHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquireShared,tryReleaseShared等。AQS中的acquire,acquiredShared,release,releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。
在同步器的子类中,可以根据其获取操作和释放操作的语义,使用getState,setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类"获取"或"释放"同步器的操作是否成功。 - "释放"操作并不是一个可阻塞的操作,当执行"释放"时,所有在请求时被阻塞的线程都会开始执行。
- 一个类想要成为状态依赖的类,它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息(volatile关键字修饰),可通过getState,setState和compareAndSetState等方法来操作。这个整数可以表示为任意状态。例如ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始,正在运行,已完成或已取消)。
同步器还可以自行管理一些额外的状态变量,例如ReentrantLock保存了锁的所有者的信息,以此来区分获取操作是重入的还是竞争的。
下图说明了获取操作和释放操作的伪代码:
对于获取操作:同步器判断当前状态是否允许获得操作,如果允许,则允许线程执行,并更新同步器的状态,否则获取操作将阻塞或失败。具体的判断实现,由同步器的语义决定。
对于释放操作:更新同步器的状态,并且如果新的状态允许某个被阻塞的线程获取成功,则解除等待队列中一个或多个线程的等待状态。
示例
OneShotLatch是一个使用AQS来实现的二元闭锁。包含两个方法:await,signal,分别对应获取操作和释放操作。
AQS状态用来表示闭锁的状态 - 关闭(0) 和 打开(1)。起初,闭锁是关闭的,任何调用await方法的线程都将阻塞并直到闭锁被打开。当通过调用signal打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行。
@ThreadSafe |
AQS 的acquireSharedInterruptibly 和 releaseShared 方法如下:
public final void acquireSharedInterruptibly(int arg) |
以上代码的执行流程如下:
- 当某个线程调用await方法时,该方法会调用AQS的acquireSharedInterruptibly方法,然后接着调用oneShotLatch的tryAcquireShared方法(在这个方法中实现同步器的获取语义),在tryAcquireShared的实现中返回一个值来表示获取操作能否执行。
若闭锁是关闭的(获取失败),那么AQS会调用doAcquireSharedInterruptibly方法将线程放入等待线程队列中。 - 当某个线程调用signal方法时,该方法会调用AQS的releaseShared方法,然后接着调用oneSlotLatch的tryReleaseShared方法(在这个方法中实现同步器的释放语义),oneShotLatch的tryReleaseShared方法实现将无条件把闭锁设置为打开,通过返回true表示该同步器处于被释放的状态。
若释放成功,AQS 会调用 doReleaseShared 方法会让所有等待的线程都尝试重新请求同步器,并且由于tryAcquireShared将返回成功,因此获取操作将成功。
源码分析
下述的源码分析基于JDK 8。
前面所提到的获取操作可能会阻塞线程,释放操作可能会对唤醒线程,并且也提到了阻塞线程是记录在一个同步队列上,那么AQS是如何来完成线程的阻塞和唤醒的呢?
AQS通过一个FIFO队列(双向链表,该队列也被称为同步队列)来保存阻塞线程,队列中每个Node节点就是对每一个等待获取资源的线程的封装,Node 作为一个静态内部类保存了线程本身(Thread),线程的等待状态(WaitStatus),双向链表指针(pre,next)以及独占模式还是共享模式等等。
其中在队列中的一个线程具有以下五种状态:
- CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化;
- SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL;
- CONDITION(-2):表示结点等待在Condition上。当调用Condition的await方法时,会将线程置为CONDITION状态,并将线程从同步队列(Sync queue)移动到条件队列中(Condition queue),当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从条件队列转移到同步队列中,将线程状态设置为0,等待获取同步锁;
- PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点;
- 0:新结点入队时的默认状态。
其中head结点所指的标杆结点,就是当前获取到资源的那个结点或null。
独占模式下资源获取
-
acquire(int arg)
acquire方法是独占模式下线程获取资源的顶层入口。如果获取到了资源(tryAcquire方法返回true,该方法实现具体由自定义同步器的语义决定),则线程直接返回;否则将当前线程加入等待队列,等待其他线程释放资源。public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
} -
tryAcquire(int arg)
tryAcquire方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}AQS作为一个框架,它提供了tryAcquire方法交由自定义同步器来实现具体的获取资源的逻辑。
-
addWaiter(Node mode)
addWriter方法根据传递过来的mode(SHARED - 共享模式,EXCLUSIVE - 独占模式)创建一个新的node节点,队列的尾结点不为空,则将新结点插入到尾部,并返回Node结点。private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
// 将新node放置结尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 原子更新tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 上一步设置失败, 则通过enq入队
enq(node);
return node;
} -
endq(final Node node)
当tail结点为空时,addWaiter方法插入结点失败,此时会调用enq方法来进行新结点入队。private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
// 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
// 若此时tail结点不为空, 则将node结点放入尾部
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
} -
acquireQueued(final Node node,int arg)
当执行完addWaiter方法时,关于当前线程的Node结点已经放入同步队列,当前线程进入到等待状态,等待其他线程释放资源。final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程是否被中断
boolean interrupted = false;
// 自旋等待获取资源
for (;;) {
// 拿到前驱结点
final Node p = node.predecessor();
// 如果前驱结点是head, 则表示当前结点是"老二", 便有资格去获取资源
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点
// 就是当前获取到资源的那个结点或null
setHead(node);
// setHead中node.prev已置为null,此处再将head.next置为null,
// 就是为了方便GC回收以前的head结点
p.next = null;
// 成功获取资源
failed = false;
// 返回等待过程中是否被中断过
return interrupted;
}
// 利用shouldParkAfterFailedAcquire方法将线程设置了watting状态
// 如果等待过程中被中断过,哪怕只有那么一次,就interrupted标记为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源
// 那么取消结点在队列中的等待
if (failed)
cancelAcquire(node);
}
} -
shouldParkAfterFailedAcquire (Node pre,Node node)
shouldParkAfterFailedAcquire 主要用来检查状态,防止前面的结点已经放弃了等待,但是没有被移除。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前一个结点的状态
int ws = pred.waitStatus;
// 前序结点为SIGNAL时, 表示后继结点在等待唤醒
if (ws == Node.SIGNAL)
return true;
// 当前驱结点 > 0时, 即为CANCELLED状态时, 代表前驱节点中的线程已经放弃了等待
if (ws > 0) {
// 一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱状态正常,那就把前驱的状态设置成SIGNAL, 表示后继节点在等待唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
} -
parkAndCheckInterrupt()
parkAndCheckInterrupt方法就是让线程去休息,真正进入等待状态。
park方法会让当前线程进入waiting状态,可以有两种途径唤醒它:unpark方法/interrupt方法。private final boolean parkAndCheckInterrupt() {
//调用park()使线程进入waiting状态
LockSupport.park(this);
//如果被唤醒,查看自己是不是被中断的
return Thread.interrupted();
} -
总结
- 调用AQS提供的acquire方法来获取资源,在acquire方法中会调用自定义同步器所实现的tryAcquire方法判断是否可以成功获取资源,若获取成功,则直接返回,线程继续向前执行;
- 若第一步中获取资源失败,则调用addWaiter方法将当前线程封装成一个Node结点加入到同步队列的末尾;
- 入队成功后,调用acquireQueued方法将设置线程为waitting状态(具体是调用park方法)。需要注意是,在首次调用acquireQueued方法时就会进行一次判断,判断是否可以获取到资源,因为此时可能会其他线程进行了释放操作;
- 如果线程调用了unpark方法和interrupt方法,那么将唤醒当前线程,判断线程是否可以
获取到资源,若可以进行获取操作,则将线程的Node结点设置为head结点,并返回。
独占模式下资源释放
-
release(int arg)
release方法是独占模式下线程获取资源的顶层入口。该方法会释放指定量的资源,与此同时,它会唤醒同步队列中的等待线程来获取资源。具体释放资源操作由tryAcquire方法实现,而该方法的具体实现由自定义同步器的语义决定。public final boolean release(int arg) {
if (tryRelease(arg)) {
// 找到头结点
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒等待队列的下一个线程
unparkSuccessor(h);
return true;
}
return false;
} -
tryRelease (int arg)
tryRelease方法尝试去释放指定量的资源。如果释放成功,则直接返回true,否则直接返回false。protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}AQS作为一个框架,它提供了tryRelease方法交由自定义同步器来实现具体的释放资源的逻辑。
-
unparkSuccessor(Node node)
unparkSuccessor方法用于唤醒同步队列中当前Node结点的下一个结点,唤醒其中的线程。private void unparkSuccessor(Node node) {
// 获取当前结点的状态
int ws = node.waitStatus;
// 若结点状态为 SIGNAL(-1), CONDITION(-2), PROPAGATE(-3)
// 将状态更新为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个需要唤醒的结点
Node s = node.next;
// 若下一个需要唤醒的结点为null 或状态为已取消(CANCELLED - 1)
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前查找, 找到一个可以被唤醒的结点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 调用unpark方法, 唤醒s结点中的线程, 那么被唤醒的线程会继续执行acquireQueued中的获取操作
LockSupport.unpark(s.thread);
}
共享模式下资源获取
-
acquireShared(int arg)
acquireShared方法是共享模式下线程获取资源的顶层入口。它会获取指定量的资源,若获取成功则直接返回,否则失败进行同步队列等待资源的释放。public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
} -
tryAcquireShared(int arg)
AQS作为一个框架,它提供了tryRelease方法交由自定义同步器来实现具体获取资源的逻辑。相比于tryAcquire返回一个boolean值,tryAcquiredShared方法将返回一个整数:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
} -
doAcquireShared(int arg)
doAcquireShared方法会调用addWaiter方法将当前线程封装为一个Node结点并插入到同步队列的末尾。之后进入到一个自旋操作,若满足条件(前序结点为head)会尝试获取一次资源,若获取失败,则调用parkAndCheckInterrupt方法中的park方法使得线程进行watting状态。
doAcquireShared的获取流程和acquireQueued方法的获取流程大致一致,只是将中断之后的调用方法放置在了方法内部。private void doAcquireShared(int arg) {
// 调用addWaiter方法将当前线程的结点插入到同步队列末尾
final Node node = addWaiter(Node.SHARED);
// 是否成功获取资源
boolean failed = true;
try {
// 等待过程中是否被中断的标记
boolean interrupted = false;
for (;;) {
// 获得前驱结点
final Node p = node.predecessor();
// 如果前驱结点是head, 则表示当前结点是"老二", 便有资格去获取资源
if (p == head) {
// 尝试获取资源
int r = tryAcquireShared(arg);
// 若返回整数则代表获取成功
if (r >= 0) {
// 将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null;
//如果等待过程中被打断过, 此时将中断补上
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断状态,调用park方法使进入waiting状态, 等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
} -
setHeadAndPropagate(Node node,int propagate)
setHeadAndPropagate方法 首先会将当前Node结点设置为head结点,若还有剩余资源,则尝唤醒下一个邻居线程。private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录旧头部以便下面的检查操作
// 将head指向自己
setHead(node);
// 如果还有剩余量, 继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 唤醒下一个邻居线程
}
}
共享模式下资源释放
-
releaseShared(int arg)
releaseShared方法是共享模式下线程获取资源的顶层入口。该方法会释放指定量的资源,与此同时,它会唤醒同步队列中的等待线程来获取资源。具体释放资源操作由tryAcquireShared方法实现,而该方法的具体实现由自定义同步器的语义决定)public final boolean releaseShared(int arg) {
// 尝试释放资源
if (tryReleaseShared(arg)) {
// 唤醒后继结点
doReleaseShared();
return true;
}
return false;
} -
tryReleaseShared (int arg)
tryReleaseShared方法尝试去释放指定量的资源。如果释放成功,则直接返回true,否则直接返回false。protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}AQS作为一个框架,它提供了tryReleaseShared方法交由自定义同步器来实现具体的释放资源的逻辑。
-
doReleaseShared ()
doReleaseShared方法主要用来唤醒同步队列上的线程。private void doReleaseShared() {
// 自旋操作
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 若head结点的状态为SIGNAL
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继结点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
参考链接
[1] 《并发编程实战》
[2] https://www.cnblogs.com/waterystone/p/4920797.html