Lock-Free 算法
算法介绍介绍
Lock-free
这种非阻塞同步算法的核心是
自旋 + 观察旧值 & 计算新值 + CAS(旧值,新值) # 循环尝试
假设在用CAS更新时,变量没有被修改过,依然保存着旧值;如果CAS失败了,则说明有其他线程并发地在修改, 此时线程不阻塞,而是不停地重试直到成功。
很多时候也被称为 乐观锁,因为在访问资源时“乐观地”假设没有并发问题,不加锁就直接拿来用,在最后真正更新的时候再判断冲突。这和 hibernate 的乐观锁机制是一个思路,后者利用一个额外的版本号字段判断冲突。
JUC 的 AotomicInteger
等原子类正是用 CAS 保证线程安全的更新动作, 如自增:
public final int incrementAndGet() {
for (;;) { // <-- 1. 自旋
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) // <-- 2. CAS, 失败则继续尝试
return next;
}
}
Spinlock 也是Lock-free
算法的一个典型应用,它的思路是当锁被占有时让 CPU 空转等待, 锁被释放时再试图 原子地 加锁, 加锁失败则继续自旋. 假设一个二元标志位, 1代表锁被占有, 0代表锁空闲, 则以下是 spinlock 的一个例子:
volatile int lock = 0;
public void lockAndDoSth(){
for(;;){ // <-- 1. 循环
if(lock == 1) continue;
if(compareAndSet(lock,0,1)){ // <-- 2. CAS, 原子的 read-modify-write 指令
// 已经获取锁, do sth
lock = 1; // 最后释放锁
}else{
Thread.yield(); // 获取锁失败, 主动出让 CPU
}
}
}
lock-free
算法避免了加锁和阻塞, 性能更好,适用于 竞争较少, 且临界区较短 的场景, 否则会造成大量的 CPU 空转, 浪费 CPU 资源;此外,该算法没有解决ABA问题,即在CAS之前变量已被修改过但被还原成了旧值,当前线程无法感知到;最后,lock-free
只能作用于单个变量,导致大部分基于CAS的实现都比较复杂。
Lock-free
的并发数据结构
JUC中,基于非阻塞算法实现的并发容器包括:ConcurrentLinkedQueue,SynchronousQueue,Exchanger 和 ConcurrentSkipListMap。ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,SynchronousQueue 是一个没有容量的阻塞队列,它使用双重数据结构 来实现非阻塞算法。Exchanger 是一个能对元素进行配对和交换的交换器。它使用 消除 技术来实现非阻塞算法 。ConcurrentSkipListMap 是一个可以根据 Key 进行排序的可伸缩的并发 Map。
lock-free
的无界stack
(Treiber算法)
public class ConcurrentStack<E> {
AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();
// push和pop都发生在head处
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = head.get();
newHead.next = oldHead;
} while (!head.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = head.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!head.compareAndSet(oldHead,newHead));
return oldHead.item;
}
static class Node<E> {
final E item;
Node<E> next;
public Node(E item) { this.item = item; }
}
}
push
查看当前的head指针,并构建一个新的节点,如果head在这个过程中没有发生改变,再用原子地改变head;如果CAS失败了,说明有别的线程在“插队”,过程就会重新开始。pop
类似。
选择在链表首部进行push
和pop
比较取巧,此时只需要CAS一个head
指针即可;如果更新发生在链表尾部,则有两个指针需要更新,一个是tail
,一个是tail.next
,但无法对两个指针进行原子性的CAS,此时需要找到一种方式用两次CAS更新两个指针,但又能保证数据结构的一致性。
lock-free
的无界Queue
Queue的put
是上述问题的一个例子,如果线程A用 CAS 顺序更新tail.next
以及tail
,那么在二者之间肯定存在一个不一致的状态,即tail
指向的不是最后一个节点。假设线程B在A完成第一步后调用put,此时B可以判断出A的更新完成了还是正在进行(tail.next == null ?
),如果B选择自旋等待A完成,那么可能出现A第二个CAS失败,B会一直等待下去的情况。因此,B必须选择 帮助A完成剩下的更新动作,等A切换回来执行第二个CAS时,可以不管CAS的结果,因为如果失败了,它知道有另一个线程帮它做完了剩下的工作。更详细的步骤解释可以参考Java 理论与实践: 非阻塞算法简介。
public class LinkedQueue <E> {
private static class Node <E> {
final E item;
final AtomicReference<Node<E>> next;
Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<Node<E>>(next);
}
}
private AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(new Node<E>(null, null));
private AtomicReference<Node<E>> tail = head;
public boolean put(E item) {
Node<E> newNode = new Node<E>(item, null);
while (true) {
Node<E> curTail = tail.get();
Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext == null) { // 看到的是静止状态
if (curTail.next.compareAndSet(null, newNode)) { // A 的第一个CAS
tail.compareAndSet(curTail, newNode) ; // A 的第二个CAS,不用判断结果,因为如果失败了,说明B(或其他线程)帮它完成了这个动作
return true;
}
} else { // 看到中间状态
tail.compareAndSet(curTail, tailNext); // B 帮 A 完成第二个更新动作
}
}
}
}
}
JUC的ConcurrentLinkedQueue
就是基于上述实现思路的一个无锁并发容器。
lock-free
的有界Queue
Disruptor 的 RingBuffer 是无锁环形队列,是ArrayBlockingQueue
的无锁版本。