AQS 和 高级同步器
JUC 包提供了一系列支持中等规模并发的同步器,它们都是基于AbstractQueuedSynchronizer
这个通用同步器框架实现的。
一. AQS 的实现
同步器是这样这样一种抽象数据结构:同步器内部维护一个状态, 同时它支持两类操作,一种是acquire,另一种是release。acquire操作在状态不满足条件时阻塞调用线程,直到同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。对这两种操作, 同步器应额外提供可超时的 + 可中断的版本. 此外, AQS 还同时支持工作在 exclusive 模式(如 lock) / shared 模式(如 信号量) / 混合模式 下.
AQS 需要3个组件来完成以上任务:
- 状态的原子性管理
- 线程的阻塞和唤醒
- 队列
1. 状态的原子性管理
AQS 用一个 volatile int
表示同步状态, 并用 CAS 操作 (unsafe.compareAndSwapInt
) 保证其 条件判断与更新动作 的原子性.
同步状态具体的含义 / acquire release 的语义由子类决定, 因此 AQS 预留了如下方法给子类实现,这是模板方法
模式的典型应用。通常在这些方法中对同步状态进行改变,如果条件不允许则立即返回false或负数:
// exclusive
protected boolean tryAcquire(int arg);
protected boolean tryRelease(int arg);
// shared
protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);
并提供了以下方法操作 state:
protected final int getState();
protected final boolean compareAndSetState(int expect, int update);
protected final void setState(int newState);
真正的 acquire / release 动作则是以下方法, 一般子类会选择包裹这些方法, 为其提供更明确的方法名(如 lock / unlock)
// exclusive
public final void acquire(int arg);
public final boolean release(int arg) ;
// shared
public final void acquireShared(int arg) ;
public final boolean releaseShared(int arg);
// ... 超时版本/可中断版本的 acquire/release 方法
2. 线程的阻塞和唤醒
JUC 提供了 LockSupport
类支持线程的阻塞和唤醒:
// 阻塞当前线程
public static void park() {
unsafe.park(false, 0L);
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
// 唤醒某个线程
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}
Thread.suspend()和Thread.resume()存在的问题是一来可能造成死锁, 二来如果在 suspend() 调用之前 resume 了, 线程依然会被阻塞. LockSupport 的 unpark 会给予当前线程一个标记但不会累计, park 消费这个标记, 这样可以解决这个问题.
3. 队列
AQS 用一个 FIFO 队列管理被阻塞的线程, 它的 head (大部分情况下)保存着 最后 一个 acquire 成功的线程, acquire 失败的线程挂接在后面并处在一个循环中,等待被唤醒后继续尝试acquire。被阻塞的节点支持两种模式: exclusive 和 shared,不同节点的工作模式可以不一样
exclusive 模式
此时acquire()
的工作流程如下所示:
首次 tryAcquire 即成功, 则立即退出, 注意, 这时线程是不会进入队列的; 失败则生成一个节点并入队.
入队相关的方法如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果队列为空, 用一个 dummy node 初始化队列
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
当队列为空, 但之前已经有线程 acquire 成功导致当前线程需要入队时, 用一个 dummy node 初始化队列, 并将当前线程的节点插入最末. 注意, 除了这种情况, 其他时刻 head 始终是最后一个 acquire 成功的线程.
入队后线程进入 acquire 循环, 不停地阻塞 / 尝试获取锁:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 返回acquire阻塞时是否被中断过
for (;;) {
final Node p = node.predecessor();
// 如果自己是第二个节点且 tryAcquire 成功
if (p == head && tryAcquire(arg)) {
// head 出队, 自己成为 head
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 否则 park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 检查park时是否被中断过
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
因为第一个节点表示的是当前锁的持有者(exclusive 模式), 按照 FIFO 的规则, head 的后继是锁被释放后第一个获取锁的线程. 因此在 acquire 循环中, 每当线程被唤醒(无论是因为锁被释放了还是线程中断), 都要检查一下自己是不是第二个节点, 如果不是则继续 park.
如果自己是第二个节点, 且 tryAccquire 成功了, 这意味着 head 已经释放了锁, 并且自己成功获取了锁, 于是当前节点将head 踢出队列, 自己成为新的 head, 整个 acquire 过程结束.
队列中的节点有一个字段 waitStatus 记录该节点当前所处的状态:
- CANCELLED = 1: 节点因为超时或interrupt而被取消, 一旦达到此状态节点将被踢出。
- SIGNAL = -1: 后继节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park()操作),因此当前节点在 release / 取消后需要唤醒(LockSupport.unpack())它的后继。
- CONDITION = -2:表明节点正处在 condition 队列中;
- PROPAGATE = -3: 只有在shared模式下的head才会处在这个状态,表示同步状态可能允许其他节点acquire成功,唤醒动作需要propagate到后续节点;
- NORMAL = 0 : 新生节点都处在这个状态
shouldParkAfterFailedAcquire()
方法简单来说, 就是把当前节点之前已被 cancel 的节点都删除, 找到前驱并设置其标志位为 SIGNAL, 让其 release 时把自己唤醒.
release()
的工作过程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release 的逻辑非常简单, 它仅仅唤醒 head 的后继, 不会改变队列结构,剩下的事情交给后者的 acquire 循环了;如果tryRelease
失败则什么也不做。
用一副简单的图对 exclusive 的工作模式做个总结:
shared 模式
acquireShared
整个流程和exclusive模式下的acquire几乎完全一样,不同的是,shared模式下,当某个线程acquire成功后,后续阻塞线程依然有可能acquire成功,因此在自己成为head后,需要将其后继节点(如果是工作在shared模式下)唤醒。后者醒来后如果acquire成功,则会继续唤醒后继,如此将有多个节点被唤醒(acquire成功并依次出队),直到遇到一个exclusive节点,或者资源用尽,同步状态不允许acqurie成功。
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
逻辑和exclusive模式下的release是一样的:在tryReleaseShared
成功后调用doReleaseShared
方法,该方法和acquireShared
流程中 “唤醒后继“ 时调用的方法是同一个,它的作用可以简单地理解为将head的状态标记为PROPAGATE,并唤醒后继节点。
若tryReleaseShared
失败,则什么也不做。
可以看到,两个模式下的release动作都是只唤醒head的后继节点,并没有做其他事情,也不会修改队列结构。
一个shared模式的例子(假设每次 acquire 都是成功的):
4. condition
AQS 定义了一个内部类ConditionObject
作为 condition 的实现, 必须与 exclusive 模式的同步器搭配使用,它可以提供典型的管程风格的await、signal和signalAll操作。 当用户请求 condition 时new ConditionObject()
返回即可. 和 AQS 一样, 每个 condition 在内部也是用一个 FIFO 队列维护所有等待线程的.
在Condition中,wait
,notify
和notifyAll
方法分别对应了await
,signal
和signalAll
方法. 简单来说, 它们的实现逻辑如下,注意, 只有锁的持有者,即 AQS queue head节点对应的线程才能调用以下操作:
//await
释放锁
将当前线程放入 condition 的等待队列
阻塞线程直到自己重新回到了锁的等待队列
进入 acquire 循环, 尝试获取锁
// signal
把 condition 等待队列的第一个线程移除并放入锁的等待队列
// signalAll
把 condition 等待队列的所有线程移除并放入锁的等待队列
// 基本上就是把节点在 锁的等待队列 和 condition的等待队列 之间来回移动
await:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1) 新增一个新的Node到 condition queue 中;
Node node = addConditionWaiter();
// 2) release锁,这调用的是 AQS 的 release() 方法,结果是 AQS 的 head 节点(当前线程)被移除;
int savedState = fullyRelease(node);
int interruptMode = 0;
// 其他线程的Signal会将该Node从Condition queue移回AQS queue,并唤醒该Node, 因此:
// 3) 循环检测自己是否被移回 AQS queue, 没有则继续阻塞;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4) 此时该线程已接收到其他线程的 signal 信号, 醒来并重新进入了 AQS queue,需重新进入竞争锁的循环(acquireQueued)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 把 condition queue 的第一个节点移到 aqs queue 的最末
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
signal
就干了一件事:把 condition queue 的第一个节点移到 aqs queue 的最末。
isHeldExclusively()
方法用来判断当前线程是不是 exclusively acquire 同步器的线程,它留给子类实现;可以看到,condition 强制只能由 exclusive 模式下的线程进行 signal。另一方面,AQS 因为要支持混合模式,所以它的 acquire 和 release 是通用的,没有这个限定;子类需要这个约束时(比如二元锁)应当在tryRelease
的实现中加上这个判断,具体可参考ReentrantLock
。
signalAll:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 把 condition queue 的所有节点移到 aqs queue
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
综上所述,condition 的整个工作流程可以简单地理解为:
* 将线程在 AQS 队列和 Condition 队列中来回移动 *
用图表示如下:
5. 公平性
如果 acquire 总是按照线程入队的顺序进行,则称为公平的。AQS可能出现这样的情况,当 head release,后继准备acquire时,刚好有一个新的外来线程抢先acquire成功。这被称为 barging
现象,对队列中排队的其他节点显然是不公平的。
如果子类要保证公平性,可以在tryAcquire
的实现中调用 AQS 提供的 hasQueuedPredecessors()
方法判断当队列是否为空,非空则 acquire 失败。
二. 高级同步器
AQS的典型使用方式:
根据需要定义一个类(Sync
)继承AQS,覆盖tryAcquired/tryShared/...
等方法提供自己的逻辑;再用一个更加具体的类包装该类,对外提供意义更加明确的方法,这些方法的实现都是代理给Sync
的。
1. FutureTask
作用
FutureTask
实现了Future
和Runnable
接口,它包装Runnable
/Callable
并提供取消方法(cancel
)和可阻塞调用线程的获取执行结果的方法(get
)。FutureTask
可以保证即使调用了多次run
,都只会执行一次Runnable
或Callable
任务。
典型的使用场景是线程池。向线程池提交一个Runnable
时,线程池内部会将其包装成一个FutureTask
并返回;客户线程可调用其get()
等待任务完成或被取消。
也可单独使用FutureTask
,这时手动调用其run()
方法即可。
实现
FutureTask
内部定义了一个AQS的子类Sync
,所有的操作都是由它完成的。get
在任务未完成或未取消时阻塞,run
/cancel
在任务完成或取消后需要唤醒阻塞在get
的所有线程,很显然前者对应AQS的acquire
,后者对应release
,且Sync
应该工作在 shared 模式下。
Sync
的 state 表示任务的执行情况:READY/RUNNING/RAN/CANCELED
。
Acquire
FutureTask#get()
最终会调用AQS的acquireSharedInterruptibly
方法。Sync#tryAcquireShared()
的逻辑是:如果任务已完成或被取消(state == RAN || CENCELED
)返回1,即acquire成功,AQS会继续唤醒队列中下一个阻塞线程;否则返回-1,acquire失败。
Release
FutureTask#run()
首先将Sync#state
设为RUNNING
,接着调用内部Callable
的run
方法,执行完成后调用AQS的releaseShared(0)
;FutureTask#cancel()
也会调用该方法进行 release 动作。
releaseShared(0)
始终返回true表示动作成功,AQS(最终)会唤醒所有阻塞在FutureTask#get()
上的线程。
2. Semaphore
作用
信号量。
实现
内部类Sync
继承AQS,工作在shared模式。Semaphore#acquire()
/ Semaphore#release()
分别对应它的 acquire / release 动作。
Sync#state
代表当前资源数量,覆盖tryReleaseShared/tryAcquireShared
方法对资源数进行增减,这一步使用了lock-free
算法保证操作的原子性。前者在操作成功后返回当前剩余资源数,失败则返回-1;后者成功后返回true,失败则说明遇到了不合法的调用,抛出Error
。
Sync
根据acquire
时是否公平派生出两个子类:NonfairSync
和FairSync
,Semaphore
在初始化时根据fair
参数选择对应的子类。它们的唯一不同在于,后者在tryAcquireShared
中会先通过hasQueuedPredecessors()
判断队列是否为空,如果不为空则直接返回-1,保证公平性。
3. CountDownLatch
作用
倒计时器:
实现
CountDownLatch
的实现比较简单。内部Sync
:
- shared工作模式;
state
表示 当前未完成的任务数;CountDownLatch#await()
–> acquire,Sync#tryAcquireShared()
在state==0
时成功,返回1,否则失败;CountDownLatch#countDown()
–> release,Sync#tryReleaseShared()
用 CAS 将state
减1,如果state==0
,则release成功,阻塞在await
上的线程将被唤醒;否则失败。
4. CyclicBarrier
作用
让一组线程阻塞直到它们都到达了某处,再继续执行该组线程。
即调用await
的线程数到达了设定的数量后,这些线程才继续往下执行。
CyclicBarrier
在构造时也可传入一个Runnable
作为 barrierAction。当barrier开放时会首先执行该任务。
实现
CyclicBarrier
的实现和其他同步器不同,它是基于ReentrantLock / Condition
实现的,内部持有一个锁lock
和对应的 condition trip
。
内部属性count
表示 未到达barrier的线程数。
await()
大致实现如:lock
加锁,count减1,如果count为0则执行 barrierAction 并调用trip#signalAll
;否则调用trip#await
阻塞。
其实还用到了一个内部类Generation
,没搞明白什么用。
5. ReentrantLock
作用
可重入锁,synchronized
的增强版,支持多个condition
,支持设置超时时间。JDK 1.6 后二者性能相差不大了。
实现
内部类Sync
,工作在exclusive模式,根据acquire
是否公平派生出两个子类:NonfairSync
和FairSync
;state
表示重入的次数。
tryAcquire
如果state == 0
,说明未加锁,CAS将其设置为1,并调用setExclusiveOwnerThread
将当前线程设置为锁的持有线程,返回true。
否则判断当前线程是否锁的持有线程,如果是则说明是锁的重入,将state
加1即可;否则 acquire 失败。
公平版本的FairSync
会在真正的 acquire 动作之前先通过AQS提供的hasQueuedPredecessors
方法判断等待队列是否为空。
tryRelease
先检查,如果不是锁持有线程调用该方法则抛出异常。
state
自减1,如果state==0
,说明锁被释放,此时release成功,返回true;否则只是退出了一次重入,release失败。
6. ReentrantReadWriteLock
作用
可重入读写锁:
- 读锁的
lock
:当前没有线程持有写锁则成功,即并发读是不冲突的,但一旦有写动作则阻塞读。 - 写锁的
lock
:当前没有线程持有读锁或写锁则成功,即写动作要等待所有其他读/写动作完成后才可进行
读写锁分离的好处是在读多写少的场景中大幅提升读的性能。
锁的升/降级
在同一线程中:
- 持有读锁后,再调用写锁的
lock
会失败,进而造成死锁。读锁不可升级。 - 持有写锁后,再调用读锁的
lock
可以成功,之后再unlock
写锁,则写锁将降级为读锁。
实现
基于AQS实现。
比较复杂,写不动了。