Executor 之 线程池及定时器

1. Executor系列接口

Alt text

Executor用于解耦任务(Runnable)提交者和执行者,它只有一个方法void execute(Runnable command),通过调用它向执行者提交任务,但无法知道执行的结果/进度,也无法拿到任务返回值。

ExecutorService 继承Executor,是一个更具体的接口。它额外提供了以下方法:

  1. 关闭执行者

    shutdown(); // 拒绝接受任务,但继续执行旧任务
    shutdownNow(); // 拒绝接受任务,且返回所有未执行的旧任务
    awaitTermination(long,TimeUnit); // 等待执行者完全关闭
  2. 查询执行者当前状态

    isShutdown();
    isTerminated();
  3. submitinvokeall系列方法
    接受Runnable/Callable类型的参数,返回Future,让提交者可以了解任务执行的进度/拿到执行结果。

    Future是提交者和执行者之间的通信手段,它代表任务的执行情况:
    Alt text
    其中,get方法会阻塞直到任务被完成,常用于异步转同步的场景。

ScheduledExecutorService是一个定时器,它在ExecutorService的基础上增加了若干定时/延迟/循环调度任务的方法:

// 延迟执行
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit);
// 固定频率执行(不管单个任务执行时间,间隔到了就执行)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 固定延迟执行(任务1执行完和任务2开始执行之间的间隔是固定的)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

2. 线程池ThreadPoolExecutor

ThreadPoolExecutor线程池任务队列(BlockingQueue) 实现的ExecutorService,和所有的一样,它的主要功能有两个:

  1. 降低频繁创建/销毁线程的开销;
  2. 防止滥用线程资源

作为一个对外提供服务的容器,它很多方面的实现都值得参考。

2.1 服务能力的逐步降级 & 占用资源的自动伸缩

提交任务时调用的是BlockingQueue#offer()方法,如果不成功则立即返回false,这意味着任务提交者不会因为队列满而被阻塞

submit(...)方法对传入的Runnable/Callable包装进一个FutureTask,再调用execute(...)处理,最后返回这个FutureTask对象。

FutureTask是一个实现了RunnableFuture的类,它本质上是一个任务,同时又是一个同步器,提供了具有阻塞/唤醒语义的查询和执行方法。

execute(...)处理策略如下:

  1. 轻负载 (当前线程数 < corePoolSize):创建一个线程(内部类Worker),该任务作为其初始任务。
  2. 中负载 (当前线程数 = corePoolSize):任务入队列(offer()方法,客户不会阻塞)。
  3. 重负载 (当前线程数 ∈ [corePoolSize, maximumPoolSize],且任务队列已满):继续创建线程。
  4. 超负载 (当前线程数 = maximumPoolSize,且任务队列已满): 拒绝新任务。

随着负载增加,线程池的处理速度 逐渐下降;超负荷时拒绝新请求,保证容器不挂:

快 –> 部分快,部分慢 –> 慢 –> 慢,且拒绝新任务

Worker线程的逻辑

Worker不停地从任务队列中拿任务执行,如果拿到 null,则退出。

WorkerBlockingQueue中取元素有两种选择:poll(timeout)take(),如队列空二者均会阻塞,但前者有超时时间。选择策略如下:

  1. 当前线程数 <= corePoolSize(轻负载):使用take(),当队列为空时线程一直阻塞而不退出,原因是线程池在大部分时间都会处在轻/中负载的状态,避免此时线程的频繁创建和销毁。

    也可以更改这个默认行为。如果手动设置allowCoreThreadTimeOut,将用poll(timeout),超过指定时间还没任务线程就退出。

  2. 当前线程数 > corePoolSize(重负载):使用poll(timeout),负载回落后多余线程退出,避免不必要的资源占用。

不管哪种情况,调用poll的超时时间由参数keepAliveTime指定。

Worker通过以上行为保证了容器占用的资源随负载程度而自动弹性伸缩。

2.2 资源的懒加载和预加载

默认线程是懒加载的,即只有当新任务提交时才会创建Worker。可以调用prestartAllCoreThreads()prestartCoreThread()方法预启动 corePoolSize 个或1个线程,一来避免任务队列初始就有元素时没有线程执行;二来避免接受任务时在线程创建上消耗时间。

2.3 拒绝服务的方式

当线程数和队列容量都达到上限时,容器将拒绝新来的服务请求,这是RejectedExecutionHandler接口负责的。

ThreadPoolExecutor内部提供了以下 4 个实现:

  1. CallerRunsPolicy:由客户线程自己执行任务;
  2. AbortPolicy(默认):抛出RejectedExecutionException
  3. DiscardPolicy:忽略,什么也不做;
  4. DiscardOldestPolicy:移除任务队列里呆的最久的任务(poll()),重新提交。

2.4 状态管理

线程池的状态图如下:
Alt text

初始状态为 running;

进入 shutDown 和 stop 区别是:

  • shutDown: 不接受新任务,但会继续执行已有任务
  • stop: 不接受新任务,也不执行已有任务;队列中的剩余任务被返回给客户

当队列为空且线程数为0时,进入 tidying 状态。该状态下会调用一个预留给子类实现的terminated方法进行额外的资源清理工作。结束后进入 terminated 状态。

2.4 任务队列的选择

ThreadPoolExecutor 的任务队列是可配置的。

如果要求任务立即得到处理,可以使用SynchronousQueue

SynchronousQueue是一个容量为0的阻塞队列,当 offer 元素时,如果当前没有线程阻塞在take / poll(timeout)上就立即失败;否则将元素交给其中的一个线程。因此它更像是线程之间传递对象的工具,而不是一个队列。

使用SynchronousQueue时,线程池将无法缓存任务,每个任务都会立即创建一个线程来执行它,任务可以得到快速处理。这种策略下线程池最多只能处理maximumPoolSize个任务。

如果可以接受任务执行的延迟,但需要尽可能多的处理任务,可以使用无界的LinkedBlockingQueue或有界的ArrayBlockingQueue。若是前者,线程数将不超过corePoolSize,参数maximumPoolSize无效;若是后者,则需要在线程数和队列长度间权衡:短队列需要更多的线程,但这会带来更多的资源占用和 context switch;少线程则需要更长的队列,但此时吞吐量较低。

2.5 定时器ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutorScheduledExecutorService定时器接口的实现,它继承自ThreadPoolExecutor,因此底层的工作原理和线程池是一样的,但其任务队列默认是DelayBlockingQueue的变种(不可配置)。Worker依次从中获取最快到期的任务执行。对于需要重复执行的任务,第一次执行完毕后计算其下次执行时间,再重新入队。

这个不太了解,以后看。

3. 工厂类 Executors

提供了创建线程池、定时器的工厂方法。

线程池:

  1. newCachedThreadPool()

    corePoolSize = 0, maximumPoolSize = int 最大值,keepAliveTime = 1分钟,缓存队列为SynchronousQueue

    每个任务都会重用已有线程或创建一个新线程执行,直到线程数达到上限;没有任务时所有线程都会自然退出。

  2. newFixedThreadPool

    corePoolSize = n, maximumPoolSize = n,keepAliveTime = 0,缓存队列为无界的LinkedBlockingQueue

  3. newSingleThreadExecutor

    corePoolSize = 1, maximumPoolSize = 1,keepAliveTime = 0,缓存队列为无界的LinkedBlockingQueue

    效果和newFixedThreadPool(1)一样。

  4. newScheduledThreadPool 定时器
    创建一个定时器ScheduledThreadPoolExecutor,corePoolSize = n, maximumPoolSize = int 最大值,keepAliveTime = 0,缓存队列为DelayedWorkQueue

Loading Disqus comments...
目录