Java深入研究ThreadPoolExecutor线程池

MannaYang / 97 /

ChatGPT 可用网址,仅供交流学习使用,如对您有所帮助,请收藏并推荐给需要的朋友。
https://ckai.xyz

在Java代码中我们常常会开启异步线程去执行一些网络请求,或是开启子线程去读写文件,这些线程的开启与执行在并发量较小的场景下可以正常运行,如果涉及并发量比较大、线程数量有限、响应速度要快的业务场景下,此时就不允许单独创建线程去执行任务,而是基于线程池管理、分发线程机制去执行线程任务,从而降低资源消耗、提高响应速度,统一管理线程资源

线程池的创建与分类

Exectors类是concurrent包下的用于快速创建线程的工具类,该类中定义了一系列创建不同线程类型的静态方法,实际还是调用ThreadPoolExecutor类的有参函数,下面看下对应的方法源码

--- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
--- 调用有参函数
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
}
  1. newFixedThreadPool : 固定数量的线程池,可用于限制特定线程启用数量的场景,调用ThreadPoolExecutor构造函数中的参数定义如下

    • corePoolSize : 核心线程数量
    • maximumPoolSize : 最大线程数量
    • keepAliveTime : 当线程的数量大于核心线程时,空闲线程在终止之前等待新任务的最大时间
    • unit : 参数keepAliveTime的时间单位
    • workQueue : 存放等待执行任务的阻塞队列,常用的组赛队列如下

      1. ArrayBlockingQueue : 基于数组的有界阻塞队列,遵循FIFO(先进先出)原则,构造函数提供设置队列大小参数,采用ReentrantLock(基于AQS实现)获取重入锁,如果向已满的队列插入则当前线程阻塞
      2. LinkedBlockingQueue : 基于链表的无界阻塞队列,默认大小为Integer.MAX_VALUE,向该队列插入数据时会封装到Node<>节点所对应的链表中,队列内部使用了putLock和takeLock标识添加、删除锁,二者可并发执行
      3. SynchronousQueue : 单向链表同步队列,具体需查看源码(知识盲区,未研究到该队列)
      4. PriorityBlockingQueue : 具有优先级排序的无界阻塞队列,默认以自然排序方式或者通过传入可比较的Comparator比较器进行排序
    • threadFactory : 默认线程创建工厂
    • defaultHandler : 拒绝策略,默认使用ThreadPoolExecutor.AbortPolicy,表示当队列满了并且工作线程大于线程池最大线程数量,此时直接抛出异常,

      1. CallerRunsPolicy : 用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
      2. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务
      3. DiscardPolicy : 默认丢弃被拒绝的任务

      一般场景下默认使用ThreadPoolExecutor.AbortPolicy拒绝策略

<br/>

  1. newSingleThreadExecutor : 单线程的线程池,只有一个核心线程在执行,可用于需要按照特定顺序执行的场景

    public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
             (new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>()));
    }

    通过入参可以看到只使用一个线程,采用LinkedBlockingQueue无界队列,keepAliveTime是0s,说明线程创建了不会超时终止,该线程顺序执行所有任务

  2. newCachedThreadPool : 核心线程为0,非核心线程数为int的最大值

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
    }

    keepAliveTime是60s,采用SynchronousQueue同步阻塞队列,当有新的任务进来此时如果有空闲的线程则重复使用,否则就重新创建一个新的线程,线程空闲60s后会被回收,关于同步阻塞队列可以看这篇文章SynchronousQueue

  3. newScheduledThreadPool : 核心线程为传入的固定数值,非核心线程数为int的最大值,可用于延时或定时任务的执行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
     return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE,
           DEFAULT_KEEPALIVE_MILLIS, 
           MILLISECONDS,
           new DelayedWorkQueue());
    }

    方法参数中的DelayedWorkQueue底层也是基于数组实现的最小堆的具有优先级的队列,队列中的任务按照执行时间升序排列,执行时间越靠近当前时间的任务排在最前面,此时也会最先执行该任务

<br/>

线程池内部执行机制

内部执行机制基本按照ThreadPoolExecutor构造函数传入的参数来处理提交进来的任务

线程池ThreadPoolExecutor中的execute执行方法

先看下execute方法源码

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

方法中的ctl是ThreadPoolExecutor中申明的提供原子操作的Integer对象,用于获取当前线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  1. 判断当前线程数量是否小于核心线程数量corePoolSize,如果满足条件调用addWorker()方法创建一个新的核心线程
  2. 如果大于corePoolSize,接着判断当前线程池是否是运行状态并且通过workQueue.offer()写入阻塞队列
  3. 此时再次检查线程池状态是否正在运行,否则从队列中移除任务并执行拒绝策略;如果是运行状态,调用workerCountOf()判断当前线程池线程数,数量为0就新创建一个新的线程
  4. 如果首次判断线程池状态非运行状态,调用addWorker()创建心线程如果失败,执行拒绝策略

<br/>
ThreadPoolExecutor中提供了shutdown()、shutdownNow()方法用于关闭线程池,调用shutdown时不再接受新的任务,之前提交的任务等执行结束再关闭线程池,调用shutdownNow时会尝试停止线程池中的任务然后再关闭,并且返回未处理完的List<> tasks任务列表

ForkJoinPool初探

ThreadPoolExecutor阻塞队列中的任务都是单个线程去执行,如果此时需要进行密集型计算任务(比如大数组排序、遍历系统文件夹并计算文件数量),就可能出现线程池中一个线程繁忙而其他线程空闲,导致CPU负载不均衡系统资源浪费,ForkJoinPool就是用于将单个密集型计算任务拆分成多个小任务,通过fork让线程池其它线程来执行小任务,通过join合并线程执行任务结果,采用并行执行机制,提高CPU的使用率

ForkJoinPool构造函数

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
        defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
    
public ForkJoinPool(int parallelism,
    ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {
        this(checkParallelism(parallelism),checkFactory(factory),handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

构造函数参数定义如下

  • parallelism : 并行线程数,默认为Runtime.getRuntime().availableProcessors(),最小为1
  • factory : 线程创建工厂,对象类型为ForkJoinWorkerThread
  • handler : 线程执行时异常处理
  • asyncMode : 为true表示处理任务的线程以队列形式按照先进先出(FIFO)顺序,此时不支持任务合并,false则是按照栈的形式后进先出(LIFO)顺序,默认是false支持任务合并(join)

获取ForkJoinPool对象可以直接使用commonPool()方法,

val pool = ForkJoinPool.commonPool()

public static ForkJoinPool commonPool() {
    // assert common != null : "static init error";
    return common;
}

而common对应的初始化放在静态代码块中,且最终调用了ForkJoinPool的构造函数

static{
    //...
    common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
    //...
}

private static ForkJoinPool makeCommonPool() {
    //...
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

ForkJoinTask任务

ForkJoinTask抽象类实现了Future接口,同时提供了RecursiveAction和RecursiveTask两个实现类,该实现类提供泛型类型申明,源码如下

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    V result;                                //返回结果
    protected abstract V compute();         //执行任务

    public final V getRawResult() {         //获取result
        return result;
    }

    protected final boolean exec() {        //ForkJoinTask中的抽象方法
        result = compute();
        return true;
    }
}

RecursiveAction相对应RecursiveTask返回值为void,下面看下ForkJoinTask的三个核心方法

  • fork : 在任务执行过程中将大任务拆分为多个小的子任务
  • join : 调用子任务的join()方法等待任务返回结果,如果子任务执行异常,join()会抛出异常,quietlyJoin()方法不会抛出异常,需要调用getException()或getRawResult()手动处理异常和结果
  • invoke : 在当前线程同步执行该任务,同join一样,如果子任务执行异常,invoke()会抛出异常,quietlyInvoke()方法不会抛出异常,需要调用getException()或getRawResult()手动处理异常和结果

执行ForkJoinTask任务

使用ForkJoinPool时,可以通过以下三个方法执行ForkJoinTask任务,

public <T> T invoke(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public void execute(ForkJoinTask<?> task)
  • invoke : 执行有返回值的任务,同步阻塞方法直到任务执行完毕
  • submit : 执行没有返回值的任务
  • execute : 执行带有ForkJoinTask对象返回的任务,非阻塞方法,调用后ForkJoinPool会立即执行并返回当前执行的task对象

invoke()、submit()是对ExecutorService接口的方法实现,同时ForkJoinPool 也定义了用来执行ForkJoinTask的execute方法

work-stealing模式

关于work-stealing模式描述可参见下面这篇博文,核心就是每个工作线程都有自己的任务队列,当某个线程完成任务后会去"拿"别的队列里的任务去执行,work-stealing模式

ForkJoinPool 与 ThreadPoolExecutor 比较

ThreadPoolExecutor 与 ForkJoinPool 都实现了ExecutorService,不同之处在于前者只能执行 Runnable 和 Callable 任务,执行顺序是按照其在阻塞队列中的顺序来执行;后者除了能执行前者的任务类型外还扩展处ForkJoinTask类型任务,从而满足work-stealing这种算法模式,ForkJoinPool涉及的技术点还有很多,需要继续深入探索,例如ForkJoinPool中的线程状态、fork()、compute()、join()的调用顺序、子任务拆分粒度等细节内容...


Java深入研究ThreadPoolExecutor线程池
作者
MannaYang
许可协议
CC BY 4.0
发布于
2023-09-25
修改于
2024-07-27
Bonnie image
尚未登录