Junhc

岂止于博客

死磕Java并发之线程池

线程池的处理流程

1) 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。
如果核心线程池里的线程都在执行任务,则进入下个流程。

2) 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。
如果工作队列满了,则进入下个流程。

3) 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。
如果已经满了,则交给饱和策略来处理这个任务。

public class ThreadPoolExecutor extends AbstractExecutorService {

  public void execute(Runnable command) {
      if (command == null)
          throw new NullPointerException();
      // 1. 如果线程数小于基本线程数,则创建线程并执行当前任务
      // 2. 如线程数大于等于基本线程或线程创建失败,则将当前任务放到工作队列中
      // 3. 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
      // 则创建一个线程执行任务  
      int c = ctl.get();
      if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true))
              return;
          c = ctl.get();
      }
      if (isRunning(c) && workQueue.offer(command)) {
          int recheck = ctl.get();
          if (! isRunning(recheck) && remove(command))
              reject(command);
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      else if (!addWorker(command, false))
          reject(command);
  }
...
}
线程池的使用
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

1) corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,
即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

2) runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。

  • a. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列。
  • b. LinkedBlockingQueue:一个基于链表结构的阻塞队列。吞吐量通常要高于ArrayBlockingQueue。
  • c. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
  • d. PriorityBlockingQueue:一个具有优先级的无线阻塞队列。

3) maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,
并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果是使用了无界的任务队列,这个参数就没什么效果。

4) ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

5) RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。

  • a. AbortPolicy:直接抛出异常
  • b. CallerRunsPolicy:只用调用者所在线程来运行任务
  • c. DiscardOldestPolicy:丢弃队里最近的一个任务,并执行当前任务
  • e. DiscardPolicy:不处理,丢弃掉

6) keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。
所以如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提供线程的利用率。

7) TimeUnit(线程活动保持时间的单位)

使用原则
  • 一定要传 ThreadFactory 这个参数,定义有意义的线程名,以便于使用jstack排查问题
  • 尽量避免局部变量创建线程池
  • 线程池大小和队列设置原则
    • 当使用有界队列的时候,corePoolSize设置的应该尽可能和maximumPoolSize相等,并且针对队列应该设置监控
    • IO密集型线程池大小可以设置的大一些
    • CPU密集型设置小一点,可以简单设置为cpu ~ cpu *2
    • 对于核心接口以及没有突发流量情况下,我通过给出的建议是使用SynchronousQueue 这个队列,并且maxPoolSize尽量大一些
  • 最好能设计一个可监控的线程池
    • handler的监控。一旦任务进入handler说明此时线程池数目在max的时候都处理不过来了,服务肯定会收到影响
    • workQueue的大小。如果workQueue里面有挤压,说明线程数在core任务处理不过来,要注意这种情况对服务带来的影响
    • 监控activeCount的数目。这样可以了解设置的参数是否合理,比如core设置的太大,浪费资源
    • 监控通过线程池创建的线程总数。在创建线程时候+1,销毁的时候-1,这样可以监控是否有资源泄漏
如何监控线程池
private static ExecutorService executorService = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
public static void main(String[] args) throws Exception {
    for (int i = 0; i < 100000; i++) {
        executorService.execute(() -> {
            System.out.print(".");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        });
    }
    ThreadPoolExecutor threadPoolExecutor = ((ThreadPoolExecutor) executorService);
    while (true) {
        System.out.println();

        int queueSize = threadPoolExecutor.getQueue().size();
        System.out.println("当前排队线程数:" + queueSize);

        int activeCount = threadPoolExecutor.getActiveCount();
        System.out.println("当前活动线程数:" + activeCount);

        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
        System.out.println("当前线程池执行完成任务数:" + completedTaskCount);

        long taskCount = threadPoolExecutor.getTaskCount();
        System.out.println("当前线程池任务数量" + taskCount);

        Thread.sleep(3000);
    }
}
参考资料