死磕Java并发之线程池
线程池的处理流程
1) 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。
如果核心线程池里的线程都在执行任务,则进入下个流程。
2) 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。
如果工作队列满了,则进入下个流程。
3) 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。
如果已经满了,则交给饱和策略来处理这个任务。
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1. 如果线程数小于基本线程数,则创建线程并执行当前任务
// 2. 如线程数大于等于基本线程或线程创建失败,则将当前任务放到工作队列中
// 3. 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
// 则创建一个线程执行任务
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
线程池的使用
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1)
corePoolSize(线程池的基本大小)
:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,
即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
2)
runnableTaskQueue(任务队列)
:用于保存等待执行的任务的阻塞队列。
- a. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列。
- b. LinkedBlockingQueue:一个基于链表结构的阻塞队列。吞吐量通常要高于ArrayBlockingQueue。
- c. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
- d. PriorityBlockingQueue:一个具有优先级的无线阻塞队列。
3)
maximumPoolSize(线程池最大数量)
:线程池允许创建的最大线程数。如果队列满了,
并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果是使用了无界的任务队列,这个参数就没什么效果。
4)
ThreadFactory
:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
5)
RejectedExecutionHandler(饱和策略)
:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。
- a. AbortPolicy:直接抛出异常
- b. CallerRunsPolicy:只用调用者所在线程来运行任务
- c. DiscardOldestPolicy:丢弃队里最近的一个任务,并执行当前任务
- e. DiscardPolicy:不处理,丢弃掉
6)
keepAliveTime(线程活动保持时间)
:线程池的工作线程空闲后,保持存活的时间。
所以如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提供线程的利用率。
7)
TimeUnit(线程活动保持时间的单位)
使用原则
- 一定要传 ThreadFactory 这个参数,定义有意义的线程名,以便于使用
jstack
排查问题 - 尽量避免局部变量创建线程池
- 线程池大小和队列设置原则
- 当使用有界队列的时候,corePoolSize设置的应该尽可能和maximumPoolSize相等,并且针对队列应该设置监控
- IO密集型线程池大小可以设置的大一些
- CPU密集型设置小一点,可以简单设置为
cpu ~ cpu *2
- 对于核心接口以及没有突发流量情况下,我通过给出的建议是使用SynchronousQueue 这个队列,并且maxPoolSize尽量大一些
- 最好能设计一个可监控的线程池
- handler的监控。一旦任务进入handler说明此时线程池数目在max的时候都处理不过来了,服务肯定会收到影响
- workQueue的大小。如果workQueue里面有挤压,说明线程数在core任务处理不过来,要注意这种情况对服务带来的影响
- 监控activeCount的数目。这样可以了解设置的参数是否合理,比如core设置的太大,浪费资源
- 监控通过线程池创建的线程总数。在创建线程时候+1,销毁的时候-1,这样可以监控是否有资源泄漏
如何监控线程池
private static ExecutorService executorService = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
public static void main(String[] args) throws Exception {
for (int i = 0; i < 100000; i++) {
executorService.execute(() -> {
System.out.print(".");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
});
}
ThreadPoolExecutor threadPoolExecutor = ((ThreadPoolExecutor) executorService);
while (true) {
System.out.println();
int queueSize = threadPoolExecutor.getQueue().size();
System.out.println("当前排队线程数:" + queueSize);
int activeCount = threadPoolExecutor.getActiveCount();
System.out.println("当前活动线程数:" + activeCount);
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
System.out.println("当前线程池执行完成任务数:" + completedTaskCount);
long taskCount = threadPoolExecutor.getTaskCount();
System.out.println("当前线程池任务数量" + taskCount);
Thread.sleep(3000);
}
}