【java笔记】线程池ThreadPoolExecutor

2022/6/24 14:19:30

本文主要是介绍【java笔记】线程池ThreadPoolExecutor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

===============================================

 2022/6/23_第2次修改                       ccb_warlock

 

更新说明:

2022/6/23:

1.补充了“为什么不允许通过Executors创建线程”的内容;

===============================================

在java中使用线程池,是在一个项目接口的业务后新加一个接口转发,但是该转发不能因为转发业务的响应慢而阻塞该项目接口。

之前看《阿里巴巴Java开发手册》在“并发控制”中写明了“线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能让编写代码的工程师更加明确线程池的运行规则,规避资源耗尽的风险”,于是在项目中开始使用ThreadPoolExecutor创建线程。

应用了一段时间后,发现ThreadPoolExecutor在不同的应用场景需要做不同的配置,于是我针对这块做了试验和整理。

 


一、为什么不允许通过Executors创建线程 

根据《阿里巴巴Java开发手册》提到的Executors,学习后发现其实java的工具类提供了2套线程池的方案,我理解是Executors对ThreadPoolExecutor进行一层封装,可以减少学习上手就用。

但是由于JDK 1.8之前的Executors创建出来的线程池队列长度都是Integer的最大值,大量创建任务时会导致队列中塞入过多的任务,从而造成内存溢出情况,这获取就是手册标记“强制”不允许使用Executors创建线程池的原因吧。

不过JDK 1.8开始有了一种新的线程池newWorkStealingPool是基于ForkJoinPool创建的,以后有应用后再来补充。

 

Executors有newFixedThreadPool、newWorkStealingPool、newSingleThreadExecutor、newCachedThreadPool、newScheduledThreadPool5种方式创建线程池。

 

1.1 newFixedThreadPool

基于ThreadPoolExecutor。

创建一个多并发的线程池,任务量达到最大并发数时,超出的线程进入队列等待执行。

弊端:队列长度为Integer.MAX_VALUE,当大量任务堆积在队列中,从而导致OOM。

 

1.2 newSingleThreadExecutor

基于ThreadPoolExecutor。

创建一个单线程的线程池,其中只有一个工作线程执行任务。

弊端:队列长度为Integer.MAX_VALUE,当大量任务堆积在队列中,从而导致OOM。

 

1.3 newCachedThreadPool

基于ThreadPoolExecutor。

创建一个无工作线程的线程池,来一个任务就执行一个任务。

弊端:最大线程数是Integer.MAX_VALUE,当大量任务时会创建大量线程,从而导致OOM。

 

1.4 newScheduledThreadPool

基于ThreadPoolExecutor。

创建一个可定时执行的线程池。

弊端:最大线程数是Integer.MAX_VALUE,当大量任务时会创建大量线程,从而导致OOM。

 

1.5 newWorkStealingPool

JDK1.8引入,基于ForkJoinPool。

创建一个抢占式操作的线程池,每个并行线程都有各自的队列,每个线程执行各自队列的任务。

 


二、ThreadPoolExecutor构造参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor的构造函数由corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler7个参数组成。

 

2.1 corePoolSize

核心线程数。

线程池中初始的线程数,即使执行任务数小于核心线程数,线程池也不会回收这些线程。

 

2.2 maximumPoolSize

最大线程数。

该线程池最多创建多少个线程同时执行任务。

当任务数超过核心线程数(核心线程数+队列最大长度)时,线程池开始创建新线程来执行任务。

 

2.3 keepAliveTime

空闲线程保留时间。

工作线程数超过核心线程数时,多余的空闲线程会在保留时间后回收线程进行销毁。

 

2.4 unit

空闲线程保留时间的单位。

 

2.5 workQueue

任务队列。包括SynchronousQueue、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。

1)ArrayBlockingQueue

有界队列。该队列是基于数组的有界队列,所以在初始化的时候需要指定队列的长度。

当任务添加到线程池后,线程池会分配线程执行。当任务数超过corePoolSize后,多出来的任务添加到队列中。当多出来的任务已经塞满了队列后,将根据线程池的拒绝策略进行处理(拒绝策略后面会写)。

 

2)SynchronousQueue

同步队列。该队列我理解是长度为1的队列,put(生产与take(消费都是阻塞操作,当该任务生产后没有被消费将发生阻塞,当队列中没有可消费的任务时阻塞,这种机制也叫作配对通信机制。

同步队列使用CAS实现线程管理,而其他BlockingQueue使用AQS实现线程管理。

当任务添加到使用同步队列的线程池后,任务通过阻塞的方式进入队列,然后该任务将选择空闲线程执行,当然如果没有空闲线程则创建新线程执行。

 

3)LinkedBlockingQueue

无界队列。该队列是基于链表的无界队列。

所以使用这种队列的时候maximumPoolSize实际是无效的,因为当线程池中的工作线程达到corePoolSize后,所有新任务将会持续增加到队列中。

 

4)PriorityBlockingQueue

优先队列。该队列是基于数组的无界队列。

当任务添加到线程池后,线程池会分配线程执行。当任务数超过corePoolSize后,多出来的任务在添加到队列后会根据任务的优先级重新排列,优先级高的排在队列前面。换句话说由于优先级不同,会出现“后进先出”的情况。

由于优先队列也属于无界队列,所以使用这种队列的时候maximumPoolSize实际是无效的,道理和LinkedBlockingQueue是一样的。

 

5)LinkedBlockingDeque

双端无界队列。该队列是基于双向链表的无界队列,由于基于双向队列所以可以从队列的两端插入和移除元素,支持FIFO和FILO。

 

6)DelayQueue

延迟队列。该队列是基于数组的无界队列,由于该队列在PriorityQueue基础上实现的,先按延迟优先级排序。 

当任务添加到线程池后,线程池会分配线程执行。当任务数超过corePoolSize后,多出来的任务在添加到队列后会根据任务的延迟优先级重新排列。延迟时间短的排在队列前面。换句话说由于优先级不同,会出现“后进先出”的情况。

 

7)LinkedTransferQueue

//todo: 待学习补充

 

2.6 threadFactory

线程工厂。

用于创建线程,一般使用Executors.defaultThreadFactory()创建。

 

2.7 handler

拒绝策略。包括AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

当任务数超过(最大线程数+队列最大长度)时,如何处理多出来的任务。

1)AbortPolicy

该策略拒绝多出来的任务添加到线程池中,直接抛异常(RejectedExecutionException)。

 

2)CallerRunsPolicy

该策略将多出来的任务放到调用添加线程池的线程中执行。

 

3)DiscardPolicy

该策略将丢弃多出来的任务。

 

4)DiscardOldestPolicy

该策略将丢弃线程池中最早的任务(处于队头的任务),然后将多出来的任务添加到线程池中

 


三、创建ThreadPoolExecutor线程池

线程池的各项参数要根据实际的业务进行配置,下面是一个例子。

public class Test{
    private ThreadPoolExecutor executor;

    @PostConstruct
    private void initExecutor(){
        if(null == executor){
            executor = new ThreadPoolExecutor(
                    1,
                    10,
                    15,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(15),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }
}

 


四、通过ThreadPoolExecutor创建线程

4.1 创建单个不等待执行完成的异步线程

由于示例的线程池使用了AbortPolicy策略,所以runTask方法不会被阻塞。

public class Test{
    private ThreadPoolExecutor executor;

    @PostConstruct
    private void initExecutor(){
        if(null == executor){
            executor = new ThreadPoolExecutor(
                    1,
                    10,
                    15,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(15),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }

    public void runTask(){
        CompletableFuture.runAsync(() ->{
            // todo: 业务代码
        }, executor);
    }
}

 

4.2 创建单个等待执行完成的异步线程

在CompletableFuture.runAsync后加上join,只要没有触发AbortPolicy策略,runTask方法在执行后会被线程阻塞,直到线程结束。

public class Test{
    private ThreadPoolExecutor executor;

    @PostConstruct
    private void initExecutor(){
        if(null == executor){
            executor = new ThreadPoolExecutor(
                    1,
                    10,
                    15,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(15),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }

    public void runTask(){
        CompletableFuture.runAsync(() ->{
            // todo: 业务代码
        }, executor).join();
    }
}

 

4.3 创建多个不等待执行完成的异步线程

在项目中,经常需要创建多个线程业务,且希望这些业务不会阻塞外部方法,通过CompletableFuture.allOf可以创建多个线程。

例如针对每个order,我都需要执行各自的业务,但是希望线程任务不阻塞runTask方法。

public class Test{
    private ThreadPoolExecutor executor;

    @PostConstruct
    private void initExecutor(){
        if(null == executor){
            executor = new ThreadPoolExecutor(
                    1,
                    10,
                    15,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(15),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }

    public void runTask(){
        List<Order> orders = new ArrayList<>();

        CompletableFuture.allOf(orders.stream().map(t ->
                CompletableFuture.runAsync(() ->{
                    // todo:业务代码
                }, executor)
        ).toArray(CompletableFuture[]::new));
    }
}

 

4.4 创建多个等待执行完成的异步线程

在项目中,经常需要先完成多个异步线程后,再进行后续的操作,只需要在CompletableFuture.allOf后执行join就可以实现。

例如针对每个order,我都需要执行各自的业务,但是希望线程任务都完成后再继续执行runTask方法。

public class Test{
    private ThreadPoolExecutor executor;

    @PostConstruct
    private void initExecutor(){
        if(null == executor){
            executor = new ThreadPoolExecutor(
                    1,
                    10,
                    15,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(15),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }

    public void runTask(){
        List<Order> orders = new ArrayList<>();

        CompletableFuture.allOf(orders.stream().map(t ->
                CompletableFuture.runAsync(() ->{
                    // todo:业务代码
                }, executor)
        ).toArray(CompletableFuture[]::new)).join();
    }
}

 



这篇关于【java笔记】线程池ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程