AQS 和 高级同步器

JUC 包提供了一系列支持中等规模并发的同步器,它们都是基于AbstractQueuedSynchronizer这个通用同步器框架实现的。

一. AQS 的实现

同步器是这样这样一种抽象数据结构:同步器内部维护一个状态, 同时它支持两类操作,一种是acquire,另一种是release。acquire操作在状态不满足条件时阻塞调用线程,直到同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。对这两种操作, 同步器应额外提供可超时的 + 可中断的版本. 此外, AQS 还同时支持工作在 exclusive 模式(如 lock) / shared 模式(如 信号量) / 混合模式 下.

AQS 需要3个组件来完成以上任务:

  1. 状态的原子性管理
  2. 线程的阻塞和唤醒
  3. 队列

1. 状态的原子性管理

AQS 用一个 volatile int 表示同步状态, 并用 CAS 操作 (unsafe.compareAndSwapInt) 保证其 条件判断与更新动作 的原子性.

同步状态具体的含义 / acquire release 的语义由子类决定, 因此 AQS 预留了如下方法给子类实现,这是模板方法模式的典型应用。通常在这些方法中对同步状态进行改变,如果条件不允许则立即返回false或负数:

// exclusive
protected boolean tryAcquire(int arg);
protected boolean tryRelease(int arg);
// shared
protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);

并提供了以下方法操作 state:

protected final int getState();
protected final boolean compareAndSetState(int expect, int update);
protected final void setState(int newState);

真正的 acquire / release 动作则是以下方法, 一般子类会选择包裹这些方法, 为其提供更明确的方法名(如 lock / unlock)

// exclusive
public final void acquire(int arg);
public final boolean release(int arg) ;
// shared
public final void acquireShared(int arg) ;
public final boolean releaseShared(int arg);

// ... 超时版本/可中断版本的 acquire/release 方法

2. 线程的阻塞和唤醒

JUC 提供了 LockSupport 类支持线程的阻塞和唤醒:

// 阻塞当前线程
public static void park() {
    unsafe.park(false, 0L);
}
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

// 唤醒某个线程
public static void unpark(Thread thread) {
    if (thread != null)
        unsafe.unpark(thread);
}

Thread.suspend()和Thread.resume()存在的问题是一来可能造成死锁, 二来如果在 suspend() 调用之前 resume 了, 线程依然会被阻塞. LockSupport 的 unpark 会给予当前线程一个标记但不会累计, park 消费这个标记, 这样可以解决这个问题.

3. 队列

AQS 用一个 FIFO 队列管理被阻塞的线程, 它的 head (大部分情况下)保存着 最后 一个 acquire 成功的线程, acquire 失败的线程挂接在后面并处在一个循环中,等待被唤醒后继续尝试acquire。被阻塞的节点支持两种模式: exclusive 和 shared,不同节点的工作模式可以不一样

exclusive 模式

此时acquire()的工作流程如下所示:

Alt text

首次 tryAcquire 即成功, 则立即退出, 注意, 这时线程是不会进入队列的; 失败则生成一个节点并入队.

入队相关的方法如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果队列为空, 用一个 dummy node 初始化队列
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

当队列为空, 但之前已经有线程 acquire 成功导致当前线程需要入队时, 用一个 dummy node 初始化队列, 并将当前线程的节点插入最末. 注意, 除了这种情况, 其他时刻 head 始终是最后一个 acquire 成功的线程.

入队后线程进入 acquire 循环, 不停地阻塞 / 尝试获取锁:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;    // 返回acquire阻塞时是否被中断过
        for (;;) {
            final Node p = node.predecessor();
            // 如果自己是第二个节点且 tryAcquire 成功
            if (p == head && tryAcquire(arg)) {
                // head 出队, 自己成为 head
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 否则 park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())    // 检查park时是否被中断过
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }

因为第一个节点表示的是当前锁的持有者(exclusive 模式), 按照 FIFO 的规则, head 的后继是锁被释放后第一个获取锁的线程. 因此在 acquire 循环中, 每当线程被唤醒(无论是因为锁被释放了还是线程中断), 都要检查一下自己是不是第二个节点, 如果不是则继续 park.

如果自己是第二个节点, 且 tryAccquire 成功了, 这意味着 head 已经释放了锁, 并且自己成功获取了锁, 于是当前节点将head 踢出队列, 自己成为新的 head, 整个 acquire 过程结束.

队列中的节点有一个字段 waitStatus 记录该节点当前所处的状态:

  • CANCELLED = 1: 节点因为超时或interrupt而被取消, 一旦达到此状态节点将被踢出。
  • SIGNAL = -1: 后继节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park()操作),因此当前节点在 release / 取消后需要唤醒(LockSupport.unpack())它的后继。
  • CONDITION = -2:表明节点正处在 condition 队列中;
  • PROPAGATE = -3: 只有在shared模式下的head才会处在这个状态,表示同步状态可能允许其他节点acquire成功,唤醒动作需要propagate到后续节点;
  • NORMAL = 0 : 新生节点都处在这个状态

shouldParkAfterFailedAcquire()方法简单来说, 就是把当前节点之前已被 cancel 的节点都删除, 找到前驱并设置其标志位为 SIGNAL, 让其 release 时把自己唤醒.

release() 的工作过程:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release 的逻辑非常简单, 它仅仅唤醒 head 的后继, 不会改变队列结构,剩下的事情交给后者的 acquire 循环了;如果tryRelease失败则什么也不做。

用一副简单的图对 exclusive 的工作模式做个总结:
Alt text

shared 模式

acquireShared

Alt text

整个流程和exclusive模式下的acquire几乎完全一样,不同的是,shared模式下,当某个线程acquire成功后,后续阻塞线程依然有可能acquire成功,因此在自己成为head后,需要将其后继节点(如果是工作在shared模式下)唤醒。后者醒来后如果acquire成功,则会继续唤醒后继,如此将有多个节点被唤醒(acquire成功并依次出队),直到遇到一个exclusive节点,或者资源用尽,同步状态不允许acqurie成功。

releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

逻辑和exclusive模式下的release是一样的:在tryReleaseShared成功后调用doReleaseShared方法,该方法和acquireShared流程中 “唤醒后继“ 时调用的方法是同一个,它的作用可以简单地理解为将head的状态标记为PROPAGATE,并唤醒后继节点。

tryReleaseShared失败,则什么也不做。

可以看到,两个模式下的release动作都是只唤醒head的后继节点,并没有做其他事情,也不会修改队列结构。

一个shared模式的例子(假设每次 acquire 都是成功的):
Alt text

4. condition

AQS 定义了一个内部类ConditionObject 作为 condition 的实现, 必须与 exclusive 模式的同步器搭配使用,它可以提供典型的管程风格的await、signal和signalAll操作。 当用户请求 condition 时new ConditionObject()返回即可. 和 AQS 一样, 每个 condition 在内部也是用一个 FIFO 队列维护所有等待线程的.

在Condition中,waitnotifynotifyAll方法分别对应了awaitsignalsignalAll方法. 简单来说, 它们的实现逻辑如下,注意, 只有锁的持有者,即 AQS queue head节点对应的线程才能调用以下操作:

//await
释放锁
将当前线程放入 condition 的等待队列
阻塞线程直到自己重新回到了锁的等待队列
进入 acquire 循环, 尝试获取锁                                            

// signalcondition 等待队列的第一个线程移除并放入锁的等待队列

// signalAllcondition 等待队列的所有线程移除并放入锁的等待队列

// 基本上就是把节点在 锁的等待队列 和 condition的等待队列 之间来回移动

await:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();

    // 1) 新增一个新的Node到 condition queue 中;
    Node node = addConditionWaiter();
    // 2) release锁,这调用的是 AQS 的 release() 方法,结果是 AQS 的 head 节点(当前线程)被移除;
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    // 其他线程的Signal会将该Node从Condition queue移回AQS queue,并唤醒该Node, 因此:
    // 3) 循环检测自己是否被移回 AQS queue, 没有则继续阻塞; 
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    // 4) 此时该线程已接收到其他线程的 signal 信号, 醒来并重新进入了 AQS queue,需重新进入竞争锁的循环(acquireQueued)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
// 把 condition queue 的第一个节点移到 aqs queue 的最末
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

signal就干了一件事:把 condition queue 的第一个节点移到 aqs queue 的最末。

isHeldExclusively() 方法用来判断当前线程是不是 exclusively acquire 同步器的线程,它留给子类实现;可以看到,condition 强制只能由 exclusive 模式下的线程进行 signal。另一方面,AQS 因为要支持混合模式,所以它的 acquire 和 release 是通用的,没有这个限定;子类需要这个约束时(比如二元锁)应当在tryRelease的实现中加上这个判断,具体可参考ReentrantLock

signalAll:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
// 把 condition queue 的所有节点移到 aqs queue
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter  = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

综上所述,condition 的整个工作流程可以简单地理解为:

* 将线程在 AQS 队列和 Condition 队列中来回移动 *

用图表示如下:

Alt text

5. 公平性

如果 acquire 总是按照线程入队的顺序进行,则称为公平的。AQS可能出现这样的情况,当 head release,后继准备acquire时,刚好有一个新的外来线程抢先acquire成功。这被称为 barging现象,对队列中排队的其他节点显然是不公平的。

如果子类要保证公平性,可以在tryAcquire的实现中调用 AQS 提供的 hasQueuedPredecessors() 方法判断当队列是否为空,非空则 acquire 失败。

二. 高级同步器

AQS的典型使用方式:

根据需要定义一个类(Sync)继承AQS,覆盖tryAcquired/tryShared/...等方法提供自己的逻辑;再用一个更加具体的类包装该类,对外提供意义更加明确的方法,这些方法的实现都是代理给Sync的。

1. FutureTask

作用

FutureTask实现了FutureRunnable接口,它包装Runnable/Callable并提供取消方法(cancel)和可阻塞调用线程的获取执行结果的方法(get)。FutureTask可以保证即使调用了多次run,都只会执行一次RunnableCallable任务。

典型的使用场景是线程池。向线程池提交一个Runnable时,线程池内部会将其包装成一个FutureTask并返回;客户线程可调用其get()等待任务完成或被取消。

也可单独使用FutureTask,这时手动调用其run()方法即可。

实现

FutureTask内部定义了一个AQS的子类Sync,所有的操作都是由它完成的。get在任务未完成或未取消时阻塞,run/cancel在任务完成或取消后需要唤醒阻塞在get的所有线程,很显然前者对应AQS的acquire,后者对应release,且Sync应该工作在 shared 模式下。

Sync的 state 表示任务的执行情况:READY/RUNNING/RAN/CANCELED

Acquire
FutureTask#get()最终会调用AQS的acquireSharedInterruptibly方法。Sync#tryAcquireShared()的逻辑是:如果任务已完成或被取消(state == RAN || CENCELED)返回1,即acquire成功,AQS会继续唤醒队列中下一个阻塞线程;否则返回-1,acquire失败。

Release
FutureTask#run()首先将Sync#state设为RUNNING,接着调用内部Callablerun方法,执行完成后调用AQS的releaseShared(0)FutureTask#cancel()也会调用该方法进行 release 动作。

releaseShared(0)始终返回true表示动作成功,AQS(最终)会唤醒所有阻塞在FutureTask#get()上的线程。

2. Semaphore

作用

信号量。

实现
内部类Sync继承AQS,工作在shared模式。Semaphore#acquire() / Semaphore#release()分别对应它的 acquire / release 动作。

Sync#state代表当前资源数量,覆盖tryReleaseShared/tryAcquireShared方法对资源数进行增减,这一步使用了lock-free算法保证操作的原子性。前者在操作成功后返回当前剩余资源数,失败则返回-1;后者成功后返回true,失败则说明遇到了不合法的调用,抛出Error

Sync根据acquire时是否公平派生出两个子类:NonfairSyncFairSyncSemaphore在初始化时根据fair参数选择对应的子类。它们的唯一不同在于,后者在tryAcquireShared中会先通过hasQueuedPredecessors()判断队列是否为空,如果不为空则直接返回-1,保证公平性。

3. CountDownLatch

作用

倒计时器:

Alt text

实现

CountDownLatch的实现比较简单。内部Sync

  1. shared工作模式;
  2. state表示 当前未完成的任务数
  3. CountDownLatch#await() –> acquire,Sync#tryAcquireShared()state==0时成功,返回1,否则失败;
  4. CountDownLatch#countDown() –> release,Sync#tryReleaseShared()用 CAS 将state减1,如果state==0,则release成功,阻塞在await上的线程将被唤醒;否则失败。

4. CyclicBarrier

作用

让一组线程阻塞直到它们都到达了某处,再继续执行该组线程。

Alt text

即调用await的线程数到达了设定的数量后,这些线程才继续往下执行。

CyclicBarrier在构造时也可传入一个Runnable作为 barrierAction。当barrier开放时会首先执行该任务。

实现

CyclicBarrier的实现和其他同步器不同,它是基于ReentrantLock / Condition实现的,内部持有一个锁lock和对应的 condition trip

内部属性count表示 未到达barrier的线程数

await()
大致实现如:lock加锁,count减1,如果count为0则执行 barrierAction 并调用trip#signalAll;否则调用trip#await阻塞。

其实还用到了一个内部类Generation,没搞明白什么用。

5. ReentrantLock

作用

可重入锁,synchronized的增强版,支持多个condition,支持设置超时时间。JDK 1.6 后二者性能相差不大了。

实现
内部类Sync,工作在exclusive模式,根据acquire是否公平派生出两个子类:NonfairSyncFairSyncstate表示重入的次数。

tryAcquire
如果state == 0,说明未加锁,CAS将其设置为1,并调用setExclusiveOwnerThread将当前线程设置为锁的持有线程,返回true。

否则判断当前线程是否锁的持有线程,如果是则说明是锁的重入,将state加1即可;否则 acquire 失败。

公平版本的FairSync会在真正的 acquire 动作之前先通过AQS提供的hasQueuedPredecessors方法判断等待队列是否为空。

tryRelease
先检查,如果不是锁持有线程调用该方法则抛出异常。

state自减1,如果state==0,说明锁被释放,此时release成功,返回true;否则只是退出了一次重入,release失败。

6. ReentrantReadWriteLock

作用

可重入读写锁:

  • 读锁的lock:当前没有线程持有写锁则成功,即并发读是不冲突的,但一旦有写动作则阻塞读。
  • 写锁的lock:当前没有线程持有读锁或写锁则成功,即写动作要等待所有其他读/写动作完成后才可进行

读写锁分离的好处是在读多写少的场景中大幅提升读的性能。

锁的升/降级

在同一线程中:

  • 持有读锁后,再调用写锁的lock会失败,进而造成死锁。读锁不可升级。
  • 持有写锁后,再调用读锁的lock可以成功,之后再unlock写锁,则写锁将降级为读锁。

实现

基于AQS实现。

比较复杂,写不动了。

Loading Disqus comments...
目录