JDK 中的并发集合

1. Collections 类提供的并发集合

java.util.Collections提供了一系列方法将一个普通的集合包装成线程安全的集合,如Collections.synchronizedCollection() / Collections.synchronizedSet() 等。

它们的实现很简单,Collections内部定义了一系列集合类,它们的作用就是包装用户传进来的集合并把操作都代理给后者,唯一不同的是,这些内部集合类的每个方法都是synchronized的,保证每个方法的互斥,虽然正确,但是效率不高,不推荐使用。

2. JUC 提供的并发集合

ConcurrentHashMap

拆分锁

一个动作只会影响结构的一部分,则把整体拆分成若干部分,每个部分一个锁,部分A被锁不会影响部分B,从而提高并发程度。

内部用若干 segment 保存 entry;每个segment是一个小hashmap,它继承 ReentrantLock ,内部的 update 动作均须先加锁。segment个数由参数 concurrencyLevel 决定。

put/remove首先找segmengt,后者先加锁,再操作。put 插入时是插在链表头;remove 先找元素,再执行一个普通的链表节点删除操作。

Segment 的 rehash 是不加锁的,它先创建一个新的空数组,接着将元素 rehash 到该数组,最后将新数组和旧数组切换。

get/contains/iterator 读取操作不加锁,这是因为put/remove动作对数据结构的改变最终是个原子动作(put是一个对数组元素/Entry 指针的赋值操作;remove是一个对 entry.next 的赋值操作,rehash是一个对数组引用的赋值操作),因此read不会看到一个更新动作的中间状态;但它可能和并发的put/remove方法调用重叠,它所看到的状态是其所在Segment在最后一个完成的update动作后的状态,正在进行但未完成的put/remove对read是不可见的,如果前者先于read完成,read是有可能看到脏数据的。

没有提供锁全部segment的方法,size的实现是先走几次fast-path,即不加锁统计所有segment的count和modcount两次,如果modcount发生改变,说明有并发操作,需要重新统计。如果重复该动作3次依然有问题,则依次对所有segment加锁,统计count。

hashcode 决定桶的位置,equals决定两个对象是否相同。

CopyOnWriteArrayList

* copyonwrite *

所有的update动作都加锁,且对当前结构创建一个snapshot,在snapshot上完成update动作后,再将其转正,丢弃原结构 **

内部是个数组;

add/remove/set 均使用同一把 reentrantlock 实现互斥,并复制一份当前的数组,在该数组上完成write动作,最后用一个原子的引用赋值动作将snapshot切换为当前数组;即内部数组永远不会改变结构(readonly),只会发生整个数组的切换。

get不加锁,和ConcurrentHashMap类似,由于write动作最终实质上是个原子的引用切换动作,因此get看到的要么是修改完成前的数组,要么是完成后的数组,它不会看到一个不稳定的中间状态,它也是不用加锁的。read看到的也是最后一个完成的write后的数组,但很可能read时依然有进行中的write动作,这对read而言是不可见的,但如果它先于read完成,read是有可能读到脏数据的。

iterator引用创建时的内部数组,不可对该数组write,因此它的remove/set/add都不可用;后续对CopyOnWriteArrayList的write动作对该iterator也是不可见的,这个道理很容易明白。

* write 每次创建snapshot,代价很大;read 不加锁,很快;适合读多写少的场景。*

CopyOnWriteArraySet

基于 CopyOnWriteArrayList 实现,add时创建数组副本,并用equals判重。

不是hashset那种实现,和hashcode没关系。

BlockingQueue

提供了几套api

Queue:      <--- 使用BlockingQueue时,不要用这些API
    // 抛异常
    add
    remove

    // 返回false/null
    offer 
    poll 

BlockingQueue:
    // timed 阻塞
    offer的超时版本
    poll的超时版本

    //一直阻塞
    put
    take

所有的子类都是基于lockcondition实现的,实现依据不同条件阻塞和唤醒线程

ArrayBlockingQueue

基于有界环形数组的阻塞队列,生产者消费者模型中 buffer 的典型实现。

使用一个ReentrantLock保证数组的互斥访问,使用它派生的两个condition让线程在full-put和empty-take时阻塞和互相唤醒 (有两个独立的场合需要协作, 因此需要两个 condition), 它的核心实现如下(有改动):

/** The queued items */
final Object[] items;

// 首尾指针
/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

// put
public void put(E e) throws InterruptedException {
    lock.lockInterruptibly();
    try {
        while (count == items.length)   // 在 notFull 上 wait
            notFull.await();
        insert(e);                      // 在 notEmpty 上 signal
    } finally {
        lock.unlock();
    }
}
private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

// take
public E take() throws InterruptedException {
    lock.lockInterruptibly();
    try {
        while (count == 0)  // 在 notEmpty 上 wait
            notEmpty.await();
        return extract();   // 在 notFull 上 signal
    } finally {
        lock.unlock();
    }
}
private E extract() {
    E x = this.items[takeIndex];
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}

// helper
/**
 * Circularly increment i.
 */
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

/**
 * Circularly decrement i.
 */
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}

LinkedBlockingQueue

基于链表的 BlockingQueue, 可以指定最大长度. 实现原理和ArrayBlockingQueue类似, 但采用了two lock queue算法实现, 内部为 take 和 put 各用了一把锁(及一个 condition), 更精细也更复杂.

PriorityBlockingQueue

内部维护一个堆, 在空间不够时自动扩容, 实现原理大部分和ArrayBlockingQueue一致, 但没有用 notFull condition, 因为它没有 Full 的概念.

在扩容时有一个小优化, 扩容分为两个步骤, 分配更大数组 + 复制原数组, 类的实现者认为不太可能会在扩容这个地方出现激烈竞争, 因此对第一步数组的分配没有用锁, 而是用一个额外的 spinlock , 允许在扩容时并发地 take. 分配成功后再加锁, 进行第二步的复制.

spinlock 是Lock-free算法的一个典型应用,它的思路是当锁被占有时让 CPU 空转等待, 锁被释放时再试图 原子地 加锁, 加锁失败则继续自旋. 假设一个二元标志位, 1代表锁被占有, 0代表锁空闲, 则以下是 spinlock 的一个例子:

volatile int lock = 0;

public void lockAndDoSth(){
    for(;;){                            // <-- 1. 循环
        if(lock == 1) continue;
        if(compareAndSet(lock,0,1)){    // <-- 2. CAS, 原子的 read-modify-write 指令
            // 已经获取锁, do sth        

            lock = 1;                 // 最后释放锁
        }else{
            Thread.yield();             // 获取锁失败, 主动出让 CPU
        }
    }
}

关于lock-free的更多,可以参考笔记《Lock-Free 算法》。

DelayQueue

基于PriorityQueue, 无界(自动扩容), 存放实现了Delayed接口的对象. Delayed#getDelay()表明这个对象的过期时间离当前时间有多久, 最近一个过期的对象放在堆顶. take 的调用方被阻塞, 直到堆顶的元素到期出队. 新对象的加入会调整堆, 并唤醒所有阻塞在take的线程, 让它们根据新的堆顶元素调整自己的休眠时间.

DelayQueue 的典型使用场景是超时管理、定时器的实现,更多可以参考 笔记《定时器》

SynchronousQueue

LinkedTransferQueue

From TransferQueue JavaDocs:

A BlockingQueue in which producers may wait for consumers to receive elements. A TransferQueue may be useful for example in message passing applications in which producers sometimes (using method transfer(E)) await receipt of elements by consumers invoking take or poll, while at other times enqueue elements (via method put) without waiting for receipt.
In other words, when you use BlockingQueue, you can only put element into queue (and block if queue is full). With TransferQueue, you can also block until other thread receives your element (you must use new transfer method for that). This is the difference. With BlockingQueue, you cannot wait until other thread removes your element (only when you use SynchronousQueue, but that isn’t really a queue).

Other than this, TransferQueue is also a BlockingQueue. Check out new available methods in TransferQueue: http://download.oracle.com/javase/7/docs/api/java/util/concurrent/TransferQueue.html (transfer, tryTransfer, hasWaitingConsumer, getWaitingConsumerCount).

ConcurrentLinkedQueue

基于lock-free算法实现的无界队列,详细介绍参见笔记《Lock-Free 算法》

这不是个BlockingQueue,只实现了Queue接口,和LinkedBlockingQueue的区别在于:

  1. 一个有界插入的时候可能会阻塞,一个无界插入永远不会阻塞。当然LinkedBlockingQueue也可以是无界的;
  2. 并发机制不同,一个lock-free,一个基于锁;
Loading Disqus comments...
目录