Java:线程池的使用

2021/8/5 11:36:06

本文主要是介绍Java:线程池的使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、线程池

1、什么是线程池

其实就是重复利用资源的过程,把宝贵的线程资源放入一个池子中,每次使用都从里边获取,用完之后再放回池子中供其它任务使用。

2、为什么要用线程池

因为创建线程和销毁线程的花销很大,大过了线程空转的花销,所以还不如把线程放进池子里重复利用。
另外线程池有一些参数可以调整,能最大限度利用系统资源解决高并发任务,并且还能防止资源过度消耗。

二、Executor源码

1、newFixedThreadPool

创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多N个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关机之前的执行过程中由于故障而终止,那么如果需要执行后续任务,将使用一个新线程代替它。池中的线程将一直存在,直到显式关闭。

//nThreads 池中的线程数
//ExecutorService 新创建的线程池
public static ExecutorService newFixedThreadPool(int nThreads)
//nThreads 池中的线程数
//threadFactory 创建新线程时要使用的工厂
//ExecutorService 新创建的线程池
public static ExecutorService newFixedThreadPool(int nThreads,
                                                 ThreadFactory threadFactory)

2、newWorkStealingPool

创建一个线程池,该线程池维护足够的线程以支持给定的并行级别,并且可以使用多个队列来减少争用。并行级别对应于主动执行或可用于执行任务处理的线程的最大数量。线程的实际数量可能会动态地增长和收缩。工作窃取池不能保证提交的任务的执行顺序。

//parallelism 目标并行度级别
//ExecutorService 新创建的线程池
public static ExecutorService newWorkStealingPool(int parallelism)

使用所有可用的处理器作为其目标并行级别,创建窃取工作的线程池。

public static ExecutorService newWorkStealingPool()

3、newSingleThreadExecutor

创建一个执行器,该执行器使用在无界队列上运行的单个工作线程(但是请注意,如果此单个线程在关机之前的执行过程中由于故障而终止,则在需要执行后续任务时,将替换一个新线程。)任务保证按顺序执行,并且在任何给定时间都不会有多个任务处于活动状态。与其他等效的newFixedThreadPool(1)不同,返回的执行器保证不可重新配置以使用其他线程。

//ExecutorService 新创建的单线程执行器
public static ExecutorService newSingleThreadExecutor()
//threadFactory - 创建新线程时使用的工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

4、newCachedThreadPool

创建一个线程池,该线程池根据需要创建新线程,但在以前构建的线程可用时将重用它们。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,执行调用将重用以前构造的线程。如果没有现有线程可用,则将创建一个新线程并将其添加到池中。未使用60秒的线程将被终止并从缓存中移除。因此,保持足够长时间空闲的池不会消耗任何资源。

public static ExecutorService newCachedThreadPool()
//threadFactory - 创建新线程时使用的工厂
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

5、newSingleThreadScheduledExecutor

创建一个单线程执行器,它可以安排命令在给定延迟后运行,或者定期执行。(但是请注意,如果这个线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,将会有一个新的线程代替它。)任务保证按顺序执行,并且在任何给定的时间内都不会有超过一个任务处于活动状态。与等价的newScheduledThreadPool(1)不同,返回的执行器保证不会重新配置以使用其他线程。

public static ScheduledExecutorService newSingleThreadScheduledExecutor()
//threadFactory - 创建新线程时使用的工厂
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

6、newScheduledThreadPool

创建一个线程池,该线程池可以安排命令在给定延迟后运行,或定期执行。

//corePoolSize ——池中保留的线程数,即使它们处于空闲状态
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
//corePoolSize - 要保留在池中的线​​程数,即使它们处于空闲状态
//threadFactory - 执行程序创建新线程时使用的工厂
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
                                                              ThreadFactory threadFactory)

7、unconfigurableExecutorService

返回一个对象,该对象将所有定义的 ExecutorService 方法委托给给定的执行程序,但不委托其他任何可以使用强制类型转换访问的方法。这为安全地“冻结”配置和不允许调优给定的具体实现提供了一种方法。

public static ExecutorService unconfigurableExecutorService(ExecutorService executor)

8、unconfigurableScheduledExecutorService

返回一个对象,该对象将所有定义的ScheduledExecutorService方法委托给给定的执行器,但不委托任何其他可能通过强制类型转换访问的方法。这提供了一种安全“冻结”配置的方法,并禁止对给定的具体实现进行调优。

public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)

9、defaultThreadFactory

返回用于创建新线程的默认线程工厂。这个工厂在同一个线程组中创建Executor使用的所有新线程。如果有一个SecurityManager,它使用System.getSecurityManager()的组,否则使用调用这个defaultThreadFactory方法的线程的组。每个新线程都被创建为一个非守护线程,优先级设置为较小的thread。NORM_PRIORITY和线程组中允许的最大优先级。新线程的名称可以通过pool-N-thread-M的thread . getname()来访问,其中N是这个工厂的序列号,M是这个工厂创建的线程的序列号。

public static ThreadFactory defaultThreadFactory()

10、privilegedThreadFactory

返回一个线程工厂,用于创建与当前线程具有相同权限的新线程。这个工厂使用与defaultThreadFactory()相同的设置创建线程,另外将新线程的AccessControlContext和contextClassLoader设置为与调用这个privilegedThreadFactory方法的线程相同。一个新的privilegedThreadFactory可以在AccessController中创建。doPrivileged操作设置当前线程的访问控制上下文,以创建具有该操作中所选权限设置的线程。

public static ThreadFactory privilegedThreadFactory()

11、callable

返回一个Callable对象,该对象在被调用时运行给定的任务并返回给定的结果。这在将需要Callable的方法应用到无结果的操作时非常有用。

//T - 结果的类型
//task - 要运行的任务
public static <T> Callable<T> callable(Runnable task,
                                       T result)

返回一个Callable对象,该对象在被调用时运行给定的任务并返回null。

public static Callable<Object> callable(Runnable task)

三、ExecutorService源码

1、shutdown

启动一个有序的关闭,先前提交的任务将被执行,但不接受新的任务。如果已经关闭,则调用没有额外效果。
此方法不等待先前提交的任务完成执行。使用awaitterminate来做这件事。

void shutdown()

2、shutdownNow

尝试停止所有正在执行的任务,停止正在等待的任务的处理,并返回正在等待执行的任务的列表。
此方法不等待主动执行的任务终止。使用awaitterminate来做这件事。
除了尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt()取消,因此任何未能响应中断的任务可能永远不会终止。

//List<Runnable> 从未开始执行的任务列表
List<Runnable> shutdownNow()

3、isShutdown

如果这个执行程序已经关闭,则返回true。

boolean isShutdown()

4、isTerminated

如果关闭后所有任务都已完成,则返回true。注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真。

boolean isTerminated()

5、awaitTermination

阻塞,直到在关闭请求后所有任务完成执行,或发生超时,或当前线程被中断,以最先发生的为准。

//timeout - 等待的最长时间
//unit - 超时参数的时间单位
boolean awaitTermination(long timeout,
                         TimeUnit unit)
                  throws InterruptedException

6、submit

提交一个返回值的任务以供执行,并返回一个Future,表示任务的挂起结果。Future的get方法将在任务成功完成后返回任务的结果。
如果你想立即阻塞等待任务,你可以使用result = exec.submit(aCallable).get();

//T-任务结果的类型
<T> Future<T> submit(Callable<T> task)
//T-结果的类型
<T> Future<T> submit(Runnable task,T result)

提交要执行的可运行任务,并返回表示该任务的Future。Future的get方法将在成功完成时返回null。

Future<?> submit(Runnable task)

7、invokeAll

执行给定的任务,当所有任务完成时返回一个future列表,其中包含它们的状态和结果。对于返回列表中的每个元素,Future.isDone()都为真。注意,已完成的任务可以正常终止,也可以通过抛出异常终止。如果在执行此操作时修改了给定的集合,则此方法的结果未定义。

//T-从任务返回的值的类型
//List<Future<T>> 表示任务的 Futures 列表,其顺序与迭代器为给定任务列表生成的顺序相同,每个任务都已完成
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                       throws InterruptedException

执行给定的任务,当所有任务完成或超时到期(以最先发生的为准)时返回一个future列表,其中包含它们的状态和结果。对于返回列表中的每个元素,Future.isDone()都为真。

//tasks - 任务的集合
//timeout - 等待的最长时间
//unit - 超时参数的时间单位
//List<Future<T>> 表示任务的future列表,其顺序与给定任务列表的迭代器产生的顺序相同。如果操作没有超时,则每个任务都将完成。如果它超时了,其中一些任务将无法完成。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout,
                              TimeUnit unit)
                       throws InterruptedException

8、invokeAny

执行给定的任务,返回已经成功完成的任务的结果(即,没有抛出异常),如果有的话。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果未定义。

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
         throws InterruptedException,
                ExecutionException

执行给定的任务,返回成功完成的任务的结果(即,没有抛出异常),如果在给定的超时时间过去之前有任何任务执行的话。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果未定义。

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout,
                TimeUnit unit)
         throws InterruptedException,
                ExecutionException,
                TimeoutException

四、ScheduledExecutorService源码

1、schedule

创建并执行在给定延迟后启用的一次性操作。

//command - 要执行的任务
//delay - 从现在开始延迟执行的时间
//unit - 延迟参数的时间单位
//ScheduledFuture<?> 一个ScheduledFuture表示任务尚未完成,它的get()方法将在完成时返回null
ScheduledFuture<?> schedule(Runnable command,
                            long delay,
                            TimeUnit unit)
<V> ScheduledFuture<V> schedule(Callable<V> callable,
                                long delay,
                                TimeUnit unit)

2、scheduleAtFixedRate

创建并执行一个周期性动作,该动作在给定的初始延迟之后首先启用,然后在给定的周期之后启用;也就是说,执行将在initialDelay之后开始,然后initialDelay+period,然后initialDelay+ 2 * period,以此类推。如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只能通过取消或终止执行器来终止。如果该任务的任何执行花费的时间超过了它的周期,那么后续执行可能会延迟开始,但不会并发执行。

//command - 要执行的任务
//initialDelay - 延迟第一次执行的时间
//period - 连续执行之间的时间段
//unit - initialDelay 和 period 参数的时间单位
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                       long initialDelay,
                                       long period,
                                       TimeUnit unit)

3、scheduleWithFixedDelay

创建并执行一个周期性操作,该操作在给定的初始延迟之后首先启用,然后在一次执行终止和下一次执行开始之间启用给定的延迟。如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只能通过取消或终止执行器来终止。

//command - 要执行的任务
//initialDelay - 延迟第一次执行的时间
//delay - 从一个执行终止到下一个执行开始之间的延迟
//unit - initialDelay 和 delay 参数的时间单位
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                          long initialDelay,
                                          long delay,
                                          TimeUnit unit)

五、Future源码

1、cancel

试图取消此任务的执行。如果任务已完成、已被取消或由于其他原因无法取消,则此尝试将失败。如果成功,且在调用cancel时该任务尚未启动,则该任务永远不应运行。如果任务已经启动,那么mayInterruptIfRunning参数将确定在试图停止任务时是否应该中断执行该任务的线程。
该方法返回后,对isDone()的后续调用总是返回true。如果该方法返回true,对isCancelled()的后续调用将始终返回true。

//mayInterruptIfRunning—如果执行此任务的线程应该被中断,则为true;否则,允许正在执行的任务完成
//return - 如果任务不能被取消,则为 false,通常是因为它已经正常完成; 否则为 true
boolean cancel(boolean mayInterruptIfRunning)

2、isCancelled

如果该任务在正常完成之前被取消,则返回true。

boolean isCancelled()

3、isDone

如果任务完成,返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,该方法将返回true。

boolean isDone()

4、get

在必要时等待计算完成,然后检索其结果。

V get()
throws InterruptedException,
      ExecutionException

如果有必要,最多等待给定的时间来完成计算,然后检索其结果(如果可用)。

//timeout - 等待的最长时间
//unit - 超时参数的时间单位
V get(long timeout,
      TimeUnit unit)
throws InterruptedException,
      ExecutionException,
      TimeoutException

六、ThreadPoolExecutor自定义线程池

在这里插入图片描述

上述Executor使用工厂创建线程池的方法固然方便,但是不够自由,并且该类的好几个方法也是通过ThreadPoolExecutor方式实现,因此在生产场景下必须使用ThreadPoolExecutor构建线程池,这样可以明确线程池的运行规则,创建符合自己业务场景需要的线程池,避免资源耗尽的风险。

一、ThreadPoolExecutor构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去。
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量。
keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被摧毁。
unit:keepAliveTime的单位。
workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;分为直接提交队列,有界任务队列,无界任务队列,优先任务队列。
threadFactory:线程工厂,用于创建线程。
handler:拒绝策略,当任务太多来不及处理的时候,会用这种策略拒绝任务。

二、corePoolSize和maximumPoolSize

ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, 
														new ArrayBlockingQueue<Runnable>(50));

上边代码意思是有5个核心线程数,10个最大线程数,任务队列是50个线程。
在运行时,JVM首先为前5个新任务创建新线程,此时再来任务就放入任务队列中,直到任务队列已放满,此时再来新任务,JVM就会创建新线程,直到此时线程池中达到10个线程了就停止创建,即达到了最大线程数,此时再来新任务就会使用配置的拒绝策略新任务的提交。

三、workQueue任务队列

分为直接提交队列,有界任务队列,无界任务队列,优先任务队列。

1、直接提交队列

设置SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingEueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才能被唤醒,反之每一个删除操作也都要等待对应的插入操作。

public class ThreadPool {
    private static ExecutorService pool;

    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2,
                1000,
                TimeUnit.MICROSECONDS,
                new SynchronousQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 3; i++) {
            pool.execute(new ThreadTask());
        }
        
		pool.shutdown();
    }
}

class ThreadTask implements Runnable {
    public ThreadTask() {
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.gaowj.java.ThreadTask@7440e464 rejected from java.util.concurrent.ThreadPoolExecutor@49476842[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.gaowj.java.ThreadPool.main(ThreadPool.java:22)

可以看到,当任务队列为SynchronousQueue时,创建的线程数量大于maximumPoolSize时,直接执行了拒绝策略抛出异常。
使用SynchronousQueue队列,提交的任务不会被保存,会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize时,则尝试创建新的线程,如果达到maximumPoolSize设置的最大值,则会根据设置的handler执行拒绝策略。

2、有界任务队列

使用ArrayBlockingQueue实现

pool = new ThreadPoolExecutor(1, 2,
                1000,
                TimeUnit.MICROSECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

使用ArrayBlockingQueue有界任务队列,当有新的任务需要执行的时候,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,会将新的任务加入到等待队列中。当等待队列满了的时候,也会继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量时,则会执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接的关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列满了时,则会以maximumPoolSize为最大线程数上限。

3、无界任务队列

使用LinkedBlockingQueue实现。

pool = new ThreadPoolExecutor(1, 2,
                1000,
                TimeUnit.MICROSECONDS,
                new LinkedBlockingDeque<Runnable>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新任务,而线程池创建的最大线程数量就是corePoolSize设置的数量,也就是说此时maximumPoolSize参数是无效的。当有新的任务加入时,则会直接进入等待队列,所以你一定要注意任务提交与处理直接的协调,要防止等待队列中的任务由于无法及时处理而一直增长,导致资源耗尽。

4、优先任务队列

使用PriorityBlockingQueue实现。

public class ThreadPool {
    private static ExecutorService pool;

    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2,
                1000,
                TimeUnit.MICROSECONDS,
                new PriorityBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask(i));
        }

        pool.shutdown();
    }
}

class ThreadTask implements Runnable, Comparable<ThreadTask> {
    private int priority;

    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
    }

    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //当前对象与其它对象比较,当优先级大时返回-1,优先级小时返回1
    //priority值越小优先级越高
    @Override
    public int compareTo(ThreadTask o) {
        return this.priority > o.priority ? -1 : 1;
    }

    @Override
    public void run() {
        try {
            //阻塞线程,使后续任务进入缓存队列
            Thread.sleep(1000);
            System.out.println("当前线程优先级:" + this.priority + ",线程名字:" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
当前线程优先级:0,线程名字:pool-1-thread-1
当前线程优先级:9,线程名字:pool-1-thread-1
当前线程优先级:8,线程名字:pool-1-thread-1
当前线程优先级:7,线程名字:pool-1-thread-1
当前线程优先级:6,线程名字:pool-1-thread-1
当前线程优先级:5,线程名字:pool-1-thread-1
当前线程优先级:4,线程名字:pool-1-thread-1
当前线程优先级:3,线程名字:pool-1-thread-1
当前线程优先级:2,线程名字:pool-1-thread-1
当前线程优先级:1,线程名字:pool-1-thread-1

除了第一个任务直接创建线程执行外,其它的任务都被放入了优先任务队列,按照优先级进行重新排序执行,且线程池的线程数一直为corePoolSize,也就是一个,说明此时maximumPoolSize设置无效。
也就是说PriorityBlockingQueue是一个特殊的无界队列,无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize设置的数量。

四、threadFactory

线程池中线程是通过ThreadPoolExecutor中的ThreadFactory线程工厂创建。通过自定义ThreadFactory可以按需要对线程池中创建的线程进行一些特殊设置,比如命名,优先级。

public class ThreadPool {
    private static ExecutorService pool;

    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(2, 4,
                1000,
                TimeUnit.MICROSECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        System.out.println("线程 " + r.hashCode() + " 创建");
                        //线程命名
                        Thread th = new Thread(r, "线程名字 " + r.hashCode());
                        return th;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask());
        }

        pool.shutdown();
    }
}

class ThreadTask implements Runnable {

    @Override
    public void run() {
        //输出执行线程的名称
        System.out.println("执行中的线程名字:" + Thread.currentThread().getName());
    }
}
线程 1300109446 创建
线程 1020371697 创建
线程 789451787 创建
执行中的线程名字:线程名字 1300109446
执行中的线程名字:线程名字 1020371697
执行中的线程名字:线程名字 1300109446
执行中的线程名字:线程名字 1300109446
线程 1950409828 创建
执行中的线程名字:线程名字 1300109446
执行中的线程名字:线程名字 789451787
执行中的线程名字:线程名字 789451787
执行中的线程名字:线程名字 1020371697
执行中的线程名字:线程名字 1300109446
执行中的线程名字:线程名字 1950409828

五、handler

为防止资源被耗尽,任务队列都会选择创建有界任务队列,但是这种模式下如果出现任务队列已满并且线程池创建的线程数已达到最大线程数时,就需要指定ThreadPoolExecutor的RejectedExecutionHandler参数提供拒绝策略,来处理线程池超载情况。

ThreadPoolExecutor自带的拒绝策略如下:
1、AbortPolicy策略:该策略直接抛出异常,阻止系统正常工作
2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程中运行
3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的
4、DiscardPolicy策略:该策略会丢弃无法处理的任务,不予任何处理

以上内置的策略都实现了RejectedExecutionHandler接口,当然也可以自定义拒绝策略

public class ThreadPool {
    private static ExecutorService pool;

    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(1, 2,
                1000,
                TimeUnit.MICROSECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + "执行了拒绝策略");
                    }
                });

        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask());
        }

        pool.shutdown();
    }
}

class ThreadTask implements Runnable {
    @Override
    public void run() {
        try {
            //让线程阻塞,使后续任务进入后续队列
            Thread.sleep(1000);
            //输出执行线程的名称
            System.out.println("执行中的线程名字:" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
com.gaowj.java.ThreadTask@2f0e140b执行了拒绝策略
com.gaowj.java.ThreadTask@7440e464执行了拒绝策略
com.gaowj.java.ThreadTask@49476842执行了拒绝策略
执行中的线程名字:pool-1-thread-1
执行中的线程名字:pool-1-thread-2
执行中的线程名字:pool-1-thread-2
执行中的线程名字:pool-1-thread-1
执行中的线程名字:pool-1-thread-2
执行中的线程名字:pool-1-thread-1
执行中的线程名字:pool-1-thread-2

当任务加入了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略。

六、ThreadPoolExecutor扩展

1、beforeExecute:线程池中的任务运行前执行
2、afterExecute:线程池中的任务运行完毕后执行
3、terminated:线程池退出后执行

通过这三个接口,可以监控每个任务的开始和结束时间。

public class ThreadPool {
    private static ExecutorService pool;

    public static void main(String[] args) {
        pool = new ThreadPoolExecutor(2, 4,
                1000,
                TimeUnit.MICROSECONDS,
                new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        System.out.println("线程 " + r.hashCode() + " 创建");
                        //线程命名
                        Thread th = new Thread(r, "线程 " + r.hashCode());
                        return th;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行 " + ((ThreadTask) r).getTaskName());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完毕 " + ((ThreadTask) r).getTaskName());
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask("Task" + i));
        }

        pool.shutdown();
    }
}

class ThreadTask implements Runnable {
    private String taskName;

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public ThreadTask(String taskName) {
        this.taskName = taskName;
    }

    @Override
    public void run() {
        //输出执行线程的名称
        System.out.println("任务名称:" + this.getTaskName() + " 线程名称" + Thread.currentThread().getName());
    }
}
线程 1300109446 创建
线程 1020371697 创建
线程 789451787 创建
准备执行 Task0
准备执行 Task1
任务名称:Task1 线程名称线程 1020371697
执行完毕 Task1
任务名称:Task0 线程名称线程 1300109446
线程 1950409828 创建
准备执行 Task7
任务名称:Task7 线程名称线程 789451787
执行完毕 Task7
执行完毕 Task0
准备执行 Task2
任务名称:Task2 线程名称线程 1020371697
准备执行 Task8
任务名称:Task8 线程名称线程 1950409828
执行完毕 Task8
准备执行 Task5
任务名称:Task5 线程名称线程 1950409828
执行完毕 Task5
准备执行 Task4
任务名称:Task4 线程名称线程 1300109446
执行完毕 Task4
准备执行 Task3
任务名称:Task3 线程名称线程 789451787
执行完毕 Task3
准备执行 Task9
任务名称:Task9 线程名称线程 1300109446
执行完毕 Task9
准备执行 Task6
执行完毕 Task2
任务名称:Task6 线程名称线程 1950409828
执行完毕 Task6
线程池退出

对于这三个方法的重写,可以对线程池中线程的运行状态进行监控,在其执行前后打印相关信息。
使用shutdown方法可以比较安全的关闭线程池,当调用该方法后,线程池不再接受后续添加的任务,但是此时线程池不会马上退出,而是等到添加到线程池中的任务都已经完成处理后,才会退出。

七、线程池使用案例

八、参考文章

Oracle的JDK官方文档
JAVA-8官方文档
java线程池ThreadPoolExecutor类使用详解



这篇关于Java:线程池的使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程