Lock-Free 算法

算法介绍介绍

Lock-free 这种非阻塞同步算法的核心是

自旋 + 观察旧值 & 计算新值 + CAS(旧值,新值)  # 循环尝试

假设在用CAS更新时,变量没有被修改过,依然保存着旧值;如果CAS失败了,则说明有其他线程并发地在修改, 此时线程不阻塞,而是不停地重试直到成功。

很多时候也被称为 乐观锁,因为在访问资源时“乐观地”假设没有并发问题,不加锁就直接拿来用,在最后真正更新的时候再判断冲突。这和 hibernate 的乐观锁机制是一个思路,后者利用一个额外的版本号字段判断冲突。

JUC 的 AotomicInteger 等原子类正是用 CAS 保证线程安全的更新动作, 如自增:

public final int incrementAndGet() {
    for (;;) {                            // <-- 1. 自旋
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next)) // <-- 2. CAS, 失败则继续尝试
            return next;
    }
}

Spinlock 也是Lock-free算法的一个典型应用,它的思路是当锁被占有时让 CPU 空转等待, 锁被释放时再试图 原子地 加锁, 加锁失败则继续自旋. 假设一个二元标志位, 1代表锁被占有, 0代表锁空闲, 则以下是 spinlock 的一个例子:

volatile int lock = 0;

public void lockAndDoSth(){
    for(;;){                            // <-- 1. 循环
        if(lock == 1) continue;
        if(compareAndSet(lock,0,1)){    // <-- 2. CAS, 原子的 read-modify-write 指令
            // 已经获取锁, do sth        

            lock = 1;                 // 最后释放锁
        }else{
            Thread.yield();             // 获取锁失败, 主动出让 CPU
        }
    }
}

lock-free算法避免了加锁和阻塞, 性能更好,适用于 竞争较少, 且临界区较短 的场景, 否则会造成大量的 CPU 空转, 浪费 CPU 资源;此外,该算法没有解决ABA问题,即在CAS之前变量已被修改过但被还原成了旧值,当前线程无法感知到;最后,lock-free只能作用于单个变量,导致大部分基于CAS的实现都比较复杂。

Lock-free 的并发数据结构

JUC中,基于非阻塞算法实现的并发容器包括:ConcurrentLinkedQueue,SynchronousQueue,Exchanger 和 ConcurrentSkipListMap。ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,SynchronousQueue 是一个没有容量的阻塞队列,它使用双重数据结构 来实现非阻塞算法。Exchanger 是一个能对元素进行配对和交换的交换器。它使用 消除 技术来实现非阻塞算法 。ConcurrentSkipListMap 是一个可以根据 Key 进行排序的可伸缩的并发 Map。

lock-free的无界stack (Treiber算法)

public class ConcurrentStack<E> {
    AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();
    // push和pop都发生在head处
    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = head.get();
            newHead.next = oldHead;
        } while (!head.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = head.get();
            if (oldHead == null) 
                return null;
            newHead = oldHead.next;
        } while (!head.compareAndSet(oldHead,newHead));
        return oldHead.item;
    }
    static class Node<E> {
        final E item;
        Node<E> next;
        public Node(E item) { this.item = item; }
    }
}

push 查看当前的head指针,并构建一个新的节点,如果head在这个过程中没有发生改变,再用原子地改变head;如果CAS失败了,说明有别的线程在“插队”,过程就会重新开始。pop类似。

选择在链表首部进行pushpop比较取巧,此时只需要CAS一个head指针即可;如果更新发生在链表尾部,则有两个指针需要更新,一个是tail,一个是tail.next,但无法对两个指针进行原子性的CAS,此时需要找到一种方式用两次CAS更新两个指针,但又能保证数据结构的一致性。

lock-free的无界Queue

Queue的put是上述问题的一个例子,如果线程A用 CAS 顺序更新tail.next以及tail,那么在二者之间肯定存在一个不一致的状态,即tail指向的不是最后一个节点。假设线程B在A完成第一步后调用put,此时B可以判断出A的更新完成了还是正在进行(tail.next == null ?),如果B选择自旋等待A完成,那么可能出现A第二个CAS失败,B会一直等待下去的情况。因此,B必须选择 帮助A完成剩下的更新动作,等A切换回来执行第二个CAS时,可以不管CAS的结果,因为如果失败了,它知道有另一个线程帮它做完了剩下的工作。更详细的步骤解释可以参考Java 理论与实践: 非阻塞算法简介

public class LinkedQueue <E> {
    private static class Node <E> {
        final E item;
        final AtomicReference<Node<E>> next;
        Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<Node<E>>(next);
        }
    }
    private AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(new Node<E>(null, null));
    private AtomicReference<Node<E>> tail = head;

    public boolean put(E item) {
        Node<E> newNode = new Node<E>(item, null);
        while (true) {
            Node<E> curTail = tail.get();
            Node<E> tailNext = curTail.next.get();
            if (curTail == tail.get()) {
                if (tailNext == null) { // 看到的是静止状态
                    if (curTail.next.compareAndSet(null, newNode)) {    // A 的第一个CAS
                        tail.compareAndSet(curTail, newNode) ;  // A 的第二个CAS,不用判断结果,因为如果失败了,说明B(或其他线程)帮它完成了这个动作
                        return true;
                    }
                } else {            // 看到中间状态
                    tail.compareAndSet(curTail, tailNext);  // B 帮 A 完成第二个更新动作
                }
            }
        }
    }
}

JUC的ConcurrentLinkedQueue就是基于上述实现思路的一个无锁并发容器。

lock-free的有界Queue

Disruptor 的 RingBuffer 是无锁环形队列,是ArrayBlockingQueue的无锁版本。

Loading Disqus comments...
目录