Executor 之 线程池及定时器
1. Executor系列接口

Executor用于解耦任务(Runnable)提交者和执行者,它只有一个方法void execute(Runnable command),通过调用它向执行者提交任务,但无法知道执行的结果/进度,也无法拿到任务返回值。
ExecutorService 继承Executor,是一个更具体的接口。它额外提供了以下方法:
关闭执行者
shutdown(); // 拒绝接受任务,但继续执行旧任务shutdownNow(); // 拒绝接受任务,且返回所有未执行的旧任务awaitTermination(long,TimeUnit); // 等待执行者完全关闭查询执行者当前状态
isShutdown();isTerminated();submit和invokeall系列方法
接受Runnable/Callable类型的参数,返回Future,让提交者可以了解任务执行的进度/拿到执行结果。Future是提交者和执行者之间的通信手段,它代表任务的执行情况:
其中,get方法会阻塞直到任务被完成,常用于异步转同步的场景。
ScheduledExecutorService是一个定时器,它在ExecutorService的基础上增加了若干定时/延迟/循环调度任务的方法:
// 延迟执行public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);// 固定频率执行(不管单个任务执行时间,间隔到了就执行)public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);// 固定延迟执行(任务1执行完和任务2开始执行之间的间隔是固定的)public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
2. 线程池ThreadPoolExecutor
ThreadPoolExecutor是 线程池 和 任务队列(BlockingQueue) 实现的ExecutorService,和所有的池一样,它的主要功能有两个:
- 降低频繁创建/销毁线程的开销;
- 防止滥用线程资源
作为一个对外提供服务的容器,它很多方面的实现都值得参考。
2.1 服务能力的逐步降级 & 占用资源的自动伸缩
提交任务时调用的是BlockingQueue#offer()方法,如果不成功则立即返回false,这意味着任务提交者不会因为队列满而被阻塞。
submit(...)方法对传入的Runnable/Callable包装进一个FutureTask,再调用execute(...)处理,最后返回这个FutureTask对象。
FutureTask是一个实现了Runnable和Future的类,它本质上是一个任务,同时又是一个同步器,提供了具有阻塞/唤醒语义的查询和执行方法。
execute(...)处理策略如下:
- 轻负载 (当前线程数 <
corePoolSize):创建一个线程(内部类Worker),该任务作为其初始任务。 - 中负载 (当前线程数 =
corePoolSize):任务入队列(offer()方法,客户不会阻塞)。 - 重负载 (当前线程数 ∈ [
corePoolSize,maximumPoolSize],且任务队列已满):继续创建线程。 - 超负载 (当前线程数 =
maximumPoolSize,且任务队列已满): 拒绝新任务。
随着负载增加,线程池的处理速度 逐渐下降;超负荷时拒绝新请求,保证容器不挂:
快 –> 部分快,部分慢 –> 慢 –> 慢,且拒绝新任务
Worker线程的逻辑
Worker不停地从任务队列中拿任务执行,如果拿到 null,则退出。
Worker从BlockingQueue中取元素有两种选择:poll(timeout) 或 take(),如队列空二者均会阻塞,但前者有超时时间。选择策略如下:
当前线程数 <=
corePoolSize(轻负载):使用take(),当队列为空时线程一直阻塞而不退出,原因是线程池在大部分时间都会处在轻/中负载的状态,避免此时线程的频繁创建和销毁。也可以更改这个默认行为。如果手动设置
allowCoreThreadTimeOut,将用poll(timeout),超过指定时间还没任务线程就退出。当前线程数 >
corePoolSize(重负载):使用poll(timeout),负载回落后多余线程退出,避免不必要的资源占用。
不管哪种情况,调用poll的超时时间由参数keepAliveTime指定。
Worker通过以上行为保证了容器占用的资源随负载程度而自动弹性伸缩。
2.2 资源的懒加载和预加载
默认线程是懒加载的,即只有当新任务提交时才会创建Worker。可以调用prestartAllCoreThreads()或prestartCoreThread()方法预启动 corePoolSize 个或1个线程,一来避免任务队列初始就有元素时没有线程执行;二来避免接受任务时在线程创建上消耗时间。
2.3 拒绝服务的方式
当线程数和队列容量都达到上限时,容器将拒绝新来的服务请求,这是RejectedExecutionHandler接口负责的。
ThreadPoolExecutor内部提供了以下 4 个实现:
CallerRunsPolicy:由客户线程自己执行任务;AbortPolicy(默认):抛出RejectedExecutionException;DiscardPolicy:忽略,什么也不做;DiscardOldestPolicy:移除任务队列里呆的最久的任务(poll()),重新提交。
2.4 状态管理
线程池的状态图如下:

初始状态为 running;
进入 shutDown 和 stop 区别是:
- shutDown: 不接受新任务,但会继续执行已有任务
- stop: 不接受新任务,也不执行已有任务;队列中的剩余任务被返回给客户
当队列为空且线程数为0时,进入 tidying 状态。该状态下会调用一个预留给子类实现的terminated方法进行额外的资源清理工作。结束后进入 terminated 状态。
2.4 任务队列的选择
ThreadPoolExecutor 的任务队列是可配置的。
如果要求任务立即得到处理,可以使用SynchronousQueue。
SynchronousQueue是一个容量为0的阻塞队列,当 offer 元素时,如果当前没有线程阻塞在take / poll(timeout)上就立即失败;否则将元素交给其中的一个线程。因此它更像是线程之间传递对象的工具,而不是一个队列。
使用SynchronousQueue时,线程池将无法缓存任务,每个任务都会立即创建一个线程来执行它,任务可以得到快速处理。这种策略下线程池最多只能处理maximumPoolSize个任务。
如果可以接受任务执行的延迟,但需要尽可能多的处理任务,可以使用无界的LinkedBlockingQueue或有界的ArrayBlockingQueue。若是前者,线程数将不超过corePoolSize,参数maximumPoolSize无效;若是后者,则需要在线程数和队列长度间权衡:短队列需要更多的线程,但这会带来更多的资源占用和 context switch;少线程则需要更长的队列,但此时吞吐量较低。
2.5 定时器ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是ScheduledExecutorService定时器接口的实现,它继承自ThreadPoolExecutor,因此底层的工作原理和线程池是一样的,但其任务队列默认是DelayBlockingQueue的变种(不可配置)。Worker依次从中获取最快到期的任务执行。对于需要重复执行的任务,第一次执行完毕后计算其下次执行时间,再重新入队。
这个不太了解,以后看。
3. 工厂类 Executors
提供了创建线程池、定时器的工厂方法。
线程池:
newCachedThreadPool()corePoolSize = 0, maximumPoolSize = int 最大值,keepAliveTime = 1分钟,缓存队列为
SynchronousQueue。每个任务都会重用已有线程或创建一个新线程执行,直到线程数达到上限;没有任务时所有线程都会自然退出。
newFixedThreadPoolcorePoolSize = n, maximumPoolSize = n,keepAliveTime = 0,缓存队列为无界的
LinkedBlockingQueue。newSingleThreadExecutorcorePoolSize = 1, maximumPoolSize = 1,keepAliveTime = 0,缓存队列为无界的
LinkedBlockingQueue。效果和
newFixedThreadPool(1)一样。newScheduledThreadPool 定时器
创建一个定时器ScheduledThreadPoolExecutor,corePoolSize = n, maximumPoolSize = int 最大值,keepAliveTime = 0,缓存队列为DelayedWorkQueue。