Executor 之 线程池及定时器
1. Executor系列接口
Executor
用于解耦任务(Runnable
)提交者和执行者,它只有一个方法void execute(Runnable command)
,通过调用它向执行者提交任务,但无法知道执行的结果/进度,也无法拿到任务返回值。
ExecutorService
继承Executor
,是一个更具体的接口。它额外提供了以下方法:
关闭执行者
shutdown(); // 拒绝接受任务,但继续执行旧任务shutdownNow(); // 拒绝接受任务,且返回所有未执行的旧任务awaitTermination(long,TimeUnit); // 等待执行者完全关闭查询执行者当前状态
isShutdown();isTerminated();submit
和invokeall
系列方法
接受Runnable/Callable
类型的参数,返回Future
,让提交者可以了解任务执行的进度/拿到执行结果。Future
是提交者和执行者之间的通信手段,它代表任务的执行情况:
其中,get
方法会阻塞直到任务被完成,常用于异步转同步的场景。
ScheduledExecutorService
是一个定时器,它在ExecutorService
的基础上增加了若干定时/延迟/循环调度任务的方法:
// 延迟执行public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);// 固定频率执行(不管单个任务执行时间,间隔到了就执行)public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);// 固定延迟执行(任务1执行完和任务2开始执行之间的间隔是固定的)public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
2. 线程池ThreadPoolExecutor
ThreadPoolExecutor
是 线程池 和 任务队列(BlockingQueue) 实现的ExecutorService
,和所有的池一样,它的主要功能有两个:
- 降低频繁创建/销毁线程的开销;
- 防止滥用线程资源
作为一个对外提供服务的容器,它很多方面的实现都值得参考。
2.1 服务能力的逐步降级 & 占用资源的自动伸缩
提交任务时调用的是BlockingQueue#offer()
方法,如果不成功则立即返回false
,这意味着任务提交者不会因为队列满而被阻塞。
submit(...)
方法对传入的Runnable
/Callable
包装进一个FutureTask
,再调用execute(...)
处理,最后返回这个FutureTask
对象。
FutureTask
是一个实现了Runnable
和Future
的类,它本质上是一个任务,同时又是一个同步器,提供了具有阻塞/唤醒语义的查询和执行方法。
execute(...)
处理策略如下:
- 轻负载 (当前线程数 <
corePoolSize
):创建一个线程(内部类Worker
),该任务作为其初始任务。 - 中负载 (当前线程数 =
corePoolSize
):任务入队列(offer()
方法,客户不会阻塞)。 - 重负载 (当前线程数 ∈ [
corePoolSize
,maximumPoolSize
],且任务队列已满):继续创建线程。 - 超负载 (当前线程数 =
maximumPoolSize
,且任务队列已满): 拒绝新任务。
随着负载增加,线程池的处理速度 逐渐下降;超负荷时拒绝新请求,保证容器不挂:
快 –> 部分快,部分慢 –> 慢 –> 慢,且拒绝新任务
Worker
线程的逻辑
Worker
不停地从任务队列中拿任务执行,如果拿到 null,则退出。
Worker
从BlockingQueue
中取元素有两种选择:poll(timeout)
或 take()
,如队列空二者均会阻塞,但前者有超时时间。选择策略如下:
当前线程数 <=
corePoolSize
(轻负载):使用take()
,当队列为空时线程一直阻塞而不退出,原因是线程池在大部分时间都会处在轻/中负载的状态,避免此时线程的频繁创建和销毁。也可以更改这个默认行为。如果手动设置
allowCoreThreadTimeOut
,将用poll(timeout)
,超过指定时间还没任务线程就退出。当前线程数 >
corePoolSize
(重负载):使用poll(timeout)
,负载回落后多余线程退出,避免不必要的资源占用。
不管哪种情况,调用poll
的超时时间由参数keepAliveTime
指定。
Worker
通过以上行为保证了容器占用的资源随负载程度而自动弹性伸缩。
2.2 资源的懒加载和预加载
默认线程是懒加载的,即只有当新任务提交时才会创建Worker
。可以调用prestartAllCoreThreads()
或prestartCoreThread()
方法预启动 corePoolSize 个或1个线程,一来避免任务队列初始就有元素时没有线程执行;二来避免接受任务时在线程创建上消耗时间。
2.3 拒绝服务的方式
当线程数和队列容量都达到上限时,容器将拒绝新来的服务请求,这是RejectedExecutionHandler
接口负责的。
ThreadPoolExecutor
内部提供了以下 4 个实现:
CallerRunsPolicy
:由客户线程自己执行任务;AbortPolicy
(默认):抛出RejectedExecutionException
;DiscardPolicy
:忽略,什么也不做;DiscardOldestPolicy
:移除任务队列里呆的最久的任务(poll()
),重新提交。
2.4 状态管理
线程池的状态图如下:
初始状态为 running;
进入 shutDown 和 stop 区别是:
- shutDown: 不接受新任务,但会继续执行已有任务
- stop: 不接受新任务,也不执行已有任务;队列中的剩余任务被返回给客户
当队列为空且线程数为0时,进入 tidying 状态。该状态下会调用一个预留给子类实现的terminated
方法进行额外的资源清理工作。结束后进入 terminated 状态。
2.4 任务队列的选择
ThreadPoolExecutor
的任务队列是可配置的。
如果要求任务立即得到处理,可以使用SynchronousQueue
。
SynchronousQueue
是一个容量为0的阻塞队列,当 offer 元素时,如果当前没有线程阻塞在take / poll(timeout)
上就立即失败;否则将元素交给其中的一个线程。因此它更像是线程之间传递对象的工具,而不是一个队列。
使用SynchronousQueue
时,线程池将无法缓存任务,每个任务都会立即创建一个线程来执行它,任务可以得到快速处理。这种策略下线程池最多只能处理maximumPoolSize
个任务。
如果可以接受任务执行的延迟,但需要尽可能多的处理任务,可以使用无界的LinkedBlockingQueue
或有界的ArrayBlockingQueue
。若是前者,线程数将不超过corePoolSize
,参数maximumPoolSize
无效;若是后者,则需要在线程数和队列长度间权衡:短队列需要更多的线程,但这会带来更多的资源占用和 context switch;少线程则需要更长的队列,但此时吞吐量较低。
2.5 定时器ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
是ScheduledExecutorService
定时器接口的实现,它继承自ThreadPoolExecutor
,因此底层的工作原理和线程池是一样的,但其任务队列默认是DelayBlockingQueue
的变种(不可配置)。Worker
依次从中获取最快到期的任务执行。对于需要重复执行的任务,第一次执行完毕后计算其下次执行时间,再重新入队。
这个不太了解,以后看。
3. 工厂类 Executors
提供了创建线程池、定时器的工厂方法。
线程池:
newCachedThreadPool()
corePoolSize = 0, maximumPoolSize = int 最大值,keepAliveTime = 1分钟,缓存队列为
SynchronousQueue
。每个任务都会重用已有线程或创建一个新线程执行,直到线程数达到上限;没有任务时所有线程都会自然退出。
newFixedThreadPool
corePoolSize = n, maximumPoolSize = n,keepAliveTime = 0,缓存队列为无界的
LinkedBlockingQueue
。newSingleThreadExecutor
corePoolSize = 1, maximumPoolSize = 1,keepAliveTime = 0,缓存队列为无界的
LinkedBlockingQueue
。效果和
newFixedThreadPool(1)
一样。newScheduledThreadPool 定时器
创建一个定时器ScheduledThreadPoolExecutor
,corePoolSize = n, maximumPoolSize = int 最大值,keepAliveTime = 0,缓存队列为DelayedWorkQueue
。