1. 参数
1 | ThreadPoolExecutor(int corePoolSize, |
- corePoolSize
核心线程数, 如果运行的线程小于 corePoolSize, 且没有空闲线程, 则创建新线程来执行任务. - maximumPoolSize
可允许创建的线程数.
当队列满时, 如果 corePoolSize 小于 maximumPoolSize, 则开始创建小于 maximumPoolSize 的线程帮助处理任务. - keepAliveTime & unit
当线程池中的线程大于 corePoolSize时, 多余的空闲线程的最大存活时长和单位. - workQueue
保存任务的阻塞队列. - threadFactory
使用 ThreadFactory 创建新线程. - handler
拒绝任务的策略:- AbortPolicy: 抛出异常, 默认策略.
- CallerRunsPolicy: 在调用线程执行.
- DiscardPolicy: 抛弃新提交的任务.
- DiscardOldestPolicy: 抛弃最老的任务.
2. API
- submit & execute
submit 和 execute 都用于提交任务, 区别在于 execute 能打印出任务在执行过程中的异常.
3. Executor
在 Executors 中包含了常用的 ThreadPoolExecutor 创建方式.
- newFixedThreadPool(int nThreads)
返回一个固定线程数的线程池.
如果没有可用核心线程, 则放入队列中, 等待核心线程可用时处理.
使用 LinkedBlockingQueue 队列. - newCachedThreadPool()
返回根据任务情况自动调整线程数的线程池.
如果有空闲线程, 则优先复用空闲线程; 如果没有, 则创建新先处理.
使用 SynchronousQueue 队列. - newSingleThreadExecutor()
返回固定只有一个线程的线程池.
使用 LinkedBlockingQueue 队列. - newScheduledThreadPool()
返回计划任务的自动调整线程数的线程池.
使用 DelayedWorkQueue 队列. - newSingleThreadScheduledExecutor()
返回计划任务的单个线程的线程池.
使用 DelayedWorkQueue 队列.
3.1. ScheduledExecutorService
ScheduledExecutorService 是 newScheduledThreadPool 或 newSingleThreadScheduledExecutor 返回的对象.
它的 API:
- schedule
会在给定时间堆任务进行一次调度. - scheduleAtFixedRate
会以上一个任务的开始时间为起点, 周期性的调度, 调度频率是一致的. - scheduleWithFixedDelay
会以上一个任务的结束时间为起点, 周期性的调度
3.2. BlockingQueue 阻塞队列
- 直接提交队列
该功能由 SynchronousQueue 对象提供. SynchronousQueue 没有容量, 每一个插入操作都要等待一个相应的删除操作, 反之, 每一个删除操作都要等待一个相应的插入操作.
如果使用 SynchronousQueue, 提交的任务不会被真实的保存, 而总是将新任务提交给线程执行, 如果没有空闲线程, 则创建新的线程, 如果线程数量达到上限, 则执行拒绝策略. - 有界队列
有界的任务队列可以使用 ArrayBlockingQueue 实现.
当使用有界的任务队列时, 若有新的任务需要执行, 如果线程池的实际线程数小于 corePoolSize, 则会优先创建新的线程; 若大于 corePoolSize, 则会将新任务加入等待队列. 若等待队列已满, 无法加入, 则在总线程数不大于 maximumPoolSize 的前提下, 创建新的进程执行任务; 若大于maximumPoolSize, 则执行拒绝策略. 可见, 有界队列仅当在任务队列装满时, 才可能将线程数提升到 corePoolSize 以上, 换言之, 除非系统非常繁忙, 否则要确保核心线程数维持在 corePoolSize. - 无界队列
有界的任务队列可以使用 LinkedBlockingQueue 实现.
与有界队列相比, 除非系统资源耗尽, 否则无界的任务队列不存在任务入队失败的情况. 当有新的任务到来, 系统的线程数小于 corePoolSize时, 线程池会生成新的线程执行任务, 但当系统的线程数达到corePoolSize 时, 线程就不会继续增加了. 若后续任由新的任务加入, 而又没有空闲的线程资源, 则任务直接进入队列等待. 若任务创建和处理的速度差异很大, 无界队列会保持快速增长, 直到耗尽系统内存. - 优先任务队列
优先任务队列是带有执行优先级的任务队列. 它通过PriorityBlockingQueue 类实现, 可以控制任务的执行先后顺序. 他是一个特殊的无界队列. 无论是有界队列 ArrayBlockingQueue 类, 还是未指定大小的无界队列 LinkedBlockingQueue 类都是按照先进先出算法处理任务的. 而 PriorityBlockingQueue 类则可以根据任务自身的优先级顺序先后执行, 在确保系统性能的同时, 也能有很好的质量保证 (总是确保高优先级的任务先执行) .
3.3. 自定义线程池线程数量设计
假设:
Ncpu = CPU 的数量
Ucpu = CPU 的使用率
W/C = 等待时间与计算时间的比率 = 阻塞系数
为了保证达到期望的使用率, 最优的线程池大小为:
一般地, CPU 密集型的应用, 为了防止 CPU 上下文切换而增加额外的开销, 线程数量大致为:
而, IO 密集型的应用, 为了充分利用等待时间, 线程数量大致为:
而阻塞系数范围在 0.8 ~ 0.9 之间.
假设是一台 4 核 CPU 的服务器, CPU 密集型服务, 线程池的线程数约 3 ~ 5; IO 密集型的服务, 线程池的线程数约 20 ~ 40.
4. Future
通过 ThreadPoolExecutor.submit(Callable task) 方法,提交任务给线程池, 将返回 Future 对象, 能够异步获取处理结果.
submit 方法会将 Callable 类型的 task 封装成 FutureTask 类型, FutureTask 是 Runnable, Future 的实现类型的.
FutureTask 会持有以下属性:
- state: 处理状态
- callable: 需要运行的任务
- outcome: 结果
- runner: 处理的线程
- waiters: 等待线程链表
ThreadPoolExecutor.submit 方法提交任务后, 交给线程池里的线程执行, 之后调用 Future.get 方法获取结果.
Future.get 方法, 也就是调用 FutureTask.get 方法.
该方法会循环判断判断任务的处理状态.
如果任务正在执行, 会通过 Thread.yield 方法让出 CPU 资源, 等待下次循环再效验.
如果任务执行完成, 则将 outcome 返回.