跳至主要內容

Java 线程池实现原理

chenxi编程Java并发编程线程池大约 8 分钟

Java 线程池实现原理

1 Why 线程池

在 Java 中,为了提高处理任务的效率,往往会通过使用多线程来并发的执行任务。

比起线程的反复创建以及销毁,往往会通过引入线程池的方式来对多线程进行集中管理。

所以,线程池可以理解为是一种基于池化思想的工具,主要的作用有 2 个:

  1. 集中管理多线程资源,易于控制资源的分配。
  2. 减少线程反复创建与销毁所带来的开销。

2 线程池的实现原理

在 Java 中,实现线程池的方式主要有两种:

  1. 基于 Executors 工具类进行创建(会隐式定义阻塞队列大小,不推荐)
  2. 通过 ThreadPoolExecutor 进行创建。

下文会基于 ThreadPoolExecutor 进行讨论。

2.1 线程池参数

在使用 ThreadPoolExecutor 创建线程池时,往往会涉及到相关参数的设置。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数。
  • maximumPoolSize:最大线程数。
  • keepAliveTime:空闲线程的存活时间。
  • unit:存活时间的时间单位。
  • workQueue:任务队列,存放等待执行的任务。
  • threadFactory:线程工厂,用于定制线程的创建方式,例如线程名称,线程优先级。
  • handler:拒绝策略。

对于任务队列来说,常见的有:

  • ArrayBlockingQueue:基于数据实现的有界队列,必须在创建时指定队列大小。适用于任务量确定的场景。
  • LinkedBlockingQueue:基于链表实现,默认无界但也可指定队列大小。适用于任务量无法确定的场景,但可能因为任务堆积过多导致内存溢出。
  • SynchronousQueue:直接交接队列,不进行任务储存,而是直接将任务交给线程执行。适用于任务执行时间敏感的场景,不希望任务进行排队等待。
  • PriorityBlockingQueue:优先级队列,支持任务的优先级排序(任务实现 Comparable 接口,或自定义比较器),高优先级的任务会优先执行。适用于任务有着优先级的场景。
  • DelayQueue:基于时间的延迟队列,队列中的元素必须实现 Delayed 接口,指定延迟时间,任务只有延迟时间到了之后才会被执行(延迟时间最小的元素会优先出队)。适用于定时任务的场景。

对于拒绝策略,常见的有:

  • AbortPolicy:直接抛出 RejectedExecutionException,阻止任务提交。适用于需要在触发拒绝策略时通知到开发者的情景。
  • CallerRunsPolicy:由调用线程(提交任务的线程)直接执行被拒绝的任务。因为不会丢弃任务,适用于允许减缓任务提交的效率,但不允许任务丢失的场景。
  • DiscardPolicy:直接丢弃被拒绝的任务,不进行任何处理。适用于对任务丢失不敏感的场景,例如一些非关键任务。
  • DiscardOldestPolicy:丢弃最早进入队列、尚未执行的任务。适用于任务具有时效性的场景,新任务比旧任务更为重要,允许旧任务的丢失。

当然,除了内置的拒绝策略,也可通过实现 RejectedExecutionHandler 接口来自定义拒绝策略。

2.2 线程池生命周期

不仅线程有着生命周期,线程池也有着其生命周期,分为 5 个周期:

  1. RUNNING:线程池正常运行状态,能接收新任务,也能处理阻塞队列中的任务。
  2. SHUTDOWN:线程池不再接受新提交的任务,但会继续处理阻塞队列中的任务。
  3. STOP:线程池不接受新任务,也不会处理阻塞队列中的任务,会中断正在执行任务的线程。
  4. TIDYING:所有任务都已终止,线程池中的线程也已被回收,线程池将要转为终结状态。
  5. TERMINATED:线程池的终结状态,线程池完全终止。

线程池状态转换流程:

线程池状态转换流程
出自美团技术团队

在 ThreadPoolExecutor 中通过一个 AtomicInteger 类型的变量 ctl 去维护线程池的运行状态和有效线程数量。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

2.3 线程池中的 Worker 线程

线程池中的线程并不单单是 Thread 这么简单,而是对 Thread 进行了封装,一般将其称为 Worker 线程。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{       
    final Thread thread;
    Runnable firstTask;  // Worker 线程执行的任务
}

Worker 是 ThreadPoolExecutor 中的工作线程,持有一个线程 thread 和一个任务 firstTaskthread 是通过 ThreadFactory 创建的,用于执行任务。firstTask 保存了线程启动时的第一个任务,可以为非空或 null

Worker 实现了 Runnable 接口,所以 Worker 线程的核心逻辑,可以来到其 run 方法一探究竟:

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

对上述代码的逻辑进行总结,也就弄懂了 Worker 线程的执行流程:

  1. 如果 firstTask 非空,线程启动后会优先执行这个任务。
  2. 基于 while 循环,不断从阻塞队列中拉取任务进行执行。
  3. 线程池处于 STOP 及以后的状态,中断线程。
  4. 通过继承 AbstractQueuedSynchronizer,使用其提供的独占锁能力,基于此来判断线程的任务是否执行完毕
    1. 任务开始执行时,w.lock() 获取锁。
    2. 任务执行结束后,w.unlock()释放锁。

2.4 线程池执行任务流程

当基于 ThreadPoolExecutor 创建好线程池后,要向线程池中提交任务,可以通过 executesubmitinvokeAllinvokeAny 这样的方法,但无论使用哪种方式都会,最终都会调用 execute 来进行任务的提交,这里可以看下 submit 方法的源码:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

所以 execute 方法是向线程池提交任务的入口,要弄清楚向线程池提交任务时,任务的执行流程,也可以从 execute 方法出发:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();  
    if (workerCountOf(c) < corePoolSize) { 
        if (addWorker(command, true))  
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

其中在 addWorker 方法中会在新增 Worker 线程前进行逻辑判断:

for (int c = ctl.get();;) {
    // Check if queue empty only if necessary.
    if (runStateAtLeast(c, SHUTDOWN)
        && (runStateAtLeast(c, STOP)
            || firstTask != null
            || workQueue.isEmpty()))
        return false;

    for (;;) {
        if (workerCountOf(c)
            >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if (runStateAtLeast(c, SHUTDOWN))
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

总结一下上述源码的逻辑,也就是线程池执行任务的流程:

  1. 线程池仅当处于 RUNNING 状态时,才能进行任务的提交。
  2. 线程池中的 Worker 线程数小于核心线程数时,尝试创建新的 Worker 线程来执行任务。
  3. Worker 线程数已达到核心线程数时,会尝试将任务加入到阻塞队列当中。
  4. 若阻塞队列已满,则尝试创建新的 Worker 线程来执行任务。
  5. 若 Worker 线程数已大于最大线程数,则会触发拒绝策略。

3 参考资料

  1. Java线程池实现原理及其在美团业务中的实践 - 美团技术团队open in new window
  2. JDK17 ThreadPoolExecutor 源码
上次编辑于: