Java多线程:线程池ThreadPoolExecutor详解

线程池是 Java 并发编程的基础设施。直接 new Thread 创建线程开销大且难以管理,线程池通过复用线程来解决这个问题。本文详解 ThreadPoolExecutor 的核心参数和使用注意事项。

核心构造参数

public ThreadPoolExecutor(
    int corePoolSize,       // 核心线程数
    int maximumPoolSize,    // 最大线程数
    long keepAliveTime,     // 非核心线程空闲存活时间
    TimeUnit unit,          // 存活时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory,        // 线程工厂
    RejectedExecutionHandler handler    // 拒绝策略
)

任务提交后的执行流程:

  1. 当前线程数 < corePoolSize → 创建核心线程执行任务
  2. 核心线程满 → 任务放入 workQueue
  3. workQueue 满 → 创建非核心线程(直到 maximumPoolSize)
  4. 线程数达到 maximumPoolSize 且队列满 → 执行拒绝策略

注意顺序:先进队列,队列满了才创建新线程,不是先创建线程到 maxPoolSize 再进队列。

参数选择

corePoolSize

  • CPU 密集型:CPU 核数 + 1
  • IO 密集型:CPU 核数 * 2(或根据 IO 等待时间调整)
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU密集型
int coreSize = cpuCores + 1;
// IO密集型
int coreSize = cpuCores * 2;

workQueue 常见选择

  • LinkedBlockingQueue:无界队列(默认 Integer.MAX_VALUE),maximumPoolSize 实际不生效
  • ArrayBlockingQueue(n):有界队列,推荐使用
  • SynchronousQueue:不存储元素,每个 put 必须等待一个 take

四种拒绝策略

// 1. AbortPolicy(默认):直接抛出 RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy()

// 2. CallerRunsPolicy:由提交任务的线程直接执行
new ThreadPoolExecutor.CallerRunsPolicy()

// 3. DiscardPolicy:静默丢弃,不抛异常
new ThreadPoolExecutor.DiscardPolicy()

// 4. DiscardOldestPolicy:丢弃队列中最老的任务,重新提交当前任务
new ThreadPoolExecutor.DiscardOldestPolicy()

生产环境推荐 CallerRunsPolicy:不丢弃任务,且能通过让调用者线程执行任务来实现反压(back pressure),降低提交速度。

完整示例

import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
            2,                                    // 核心线程 2
            4,                                    // 最大线程 4
            60, TimeUnit.SECONDS,                 // 空闲60秒回收
            new ArrayBlockingQueue<>(10),          // 队列容量10
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            pool.execute(() -> {
                System.out.printf("[%s] 执行任务 %d%n",
                    Thread.currentThread().getName(), taskId);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        pool.shutdown();
        try {
            pool.awaitTermination(60, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            pool.shutdownNow();
        }
    }
}

Executors 工厂方法的坑

JDK 提供了几个便捷方法,但 阿里巴巴 Java 开发规范明确禁止使用

// 1. newFixedThreadPool — 使用 LinkedBlockingQueue(无界),可能 OOM
Executors.newFixedThreadPool(10);
// 等价于 new ThreadPoolExecutor(10, 10, 0, new LinkedBlockingQueue<>())

// 2. newCachedThreadPool — maximumPoolSize 为 Integer.MAX_VALUE,可能创建大量线程
Executors.newCachedThreadPool();
// 等价于 new ThreadPoolExecutor(0, MAX_VALUE, 60s, new SynchronousQueue<>())

// 3. newSingleThreadExecutor — 同样使用无界队列
Executors.newSingleThreadExecutor();

正确做法是直接用 ThreadPoolExecutor 构造函数,显式指定队列容量和拒绝策略。

监控线程池

// 运行时获取线程池状态
System.out.printf("活跃线程: %d, 队列大小: %d, 已完成: %d%n",
    pool.getActiveCount(),
    pool.getQueue().size(),
    pool.getCompletedTaskCount());

生产环境建议定时采集这些指标接入监控系统,及时发现队列堆积。

小结

使用线程池记住三点:1)用 ThreadPoolExecutor 而不是 Executors;2)根据任务类型选择核心线程数;3)使用有界队列 + 合理的拒绝策略。线程池配置没有银弹,需要根据实际负载做压测调优。