java-线程池+CompletableFuture
2022/6/8 1:21:25
本文主要是介绍java-线程池+CompletableFuture,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
使用线程池
线程池的基本概念
线程池,本质上是一种对象池,用于管理线程资源。
在任务执行前,需要从线程池中拿出线程来执行。
在任务执行完成之后,需要把线程放回线程池。
通过线程的这种反复利用机制,可以有效地避免直接创建线程所带来的坏处。
线程池的优缺点
优点
- 降低资源的消耗。线程本身是一种资源,创建和销毁线程会有CPU开销;创建的线程也会占用一定的内存。
- 提高任务执行的响应速度。任务执行时,可以不必等到线程创建完之后再执行。
- 提高线程的可管理性。线程不能无限制地创建,需要进行统一的分配、调优和监控。
缺点
- 频繁的线程创建和销毁会占用更多的CPU和内存
- 频繁的线程创建和销毁会对GC产生比较大的压力
- 线程太多,线程切换带来的开销将不可忽视
- 线程太少,多核CPU得不到充分利用,是一种浪费
线程池创建流程
通过上图,我们看到了线程池的主要处理流程。我们的关注点在于,任务提交之后是怎么执行的。大致如下:
- 判断核心线程池是否已满,如果不是,则创建线程执行任务
- 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
- 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
- 如果线程池也满了,则按照拒绝策略对任务进行处理
在jdk
里面,我们可以将处理流程描述得更清楚一点。来看看ThreadPoolExecutor
的处理流程。
我们将概念做一下映射。
corePool
-> 核心线程池maximumPool
-> 线程池BlockQueue
-> 队列RejectedExecutionHandler
-> 拒绝策略
入门级例子
为了更直观地理解线程池,我们通过一个例子来宏观地了解一下线程池用法。
public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { executor.submit(() -> { System.out.println("thread id is: " + Thread.currentThread().getId()); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
在这个例子中,首先创建了一个固定长度为5的线程池。然后使用循环的方式往线程池中提交了10个任务,每个任务休眠1秒。在任务休眠之前,将任务所在的线程id进行打印输出。
所以,理论上只会打印5个不同的线程id,且每个线程id会被打印2次。
Executors
Executors
是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。
// 创建单一线程的线程池 public static ExecutorService newSingleThreadExecutor(); // 创建固定数量的线程池 public static ExecutorService newFixedThreadPool(int nThreads); // 创建带缓存的线程池 public static ExecutorService newCachedThreadPool(); // 创建定时调度的线程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize); // 创建流式(fork-join)线程池 public static ExecutorService newWorkStealingPool();
1. 创建单一线程的线程池
顾名思义,这个线程池只有一个线程。若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE
)。当线程空闲的时候,按照FIFO的方式进行处理。
2. 创建固定数量的线程池
和创建单一线程的线程池
类似,只是这儿可以并行处理任务的线程数更多一些罢了。若多个任务被提交到此线程池,会有下面的处理过程。
- 如果线程的数量未达到指定数量,则创建线程来执行任务
- 如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
- 如果没有线程是空闲的,则将任务缓存到队列(队列长度为
Integer.MAX_VALUE
)。当线程空闲的时候,按照FIFO的方式进行处理
3. 创建带缓存的线程池
这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE
。由于本身使用SynchronousQueue
作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。
4. 创建定时调度的线程池
和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor
类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解。
我们写一个例子来看看如何使用。
public class ThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 定时调度,每个调度任务会至少等待`period`的时间, // 如果任务执行的时间超过`period`,则等待的时间为任务执行的时间 executor.scheduleAtFixedRate(() -> { try { Thread.sleep(10000); System.out.println(System.currentTimeMillis() / 1000); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 2, TimeUnit.SECONDS); // 定时调度,第二个任务执行的时间 = 第一个任务执行时间 + `delay` executor.scheduleWithFixedDelay(() -> { try { Thread.sleep(5000); System.out.println(System.currentTimeMillis() / 1000); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 2, TimeUnit.SECONDS); // 定时调度,延迟`delay`后执行,且只执行一次 executor.schedule(() -> System.out.println("5 秒之后执行 schedule"), 5, TimeUnit.SECONDS); } }
-
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
,定时调度,每个调度任务会至少等待period
的时间,如果任务执行的时间超过period
,则等待的时间为任务执行的时间 -
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
,定时调度,第二个任务执行的时间 = 第一个任务执行时间 +delay
-
schedule(Runnable command, long delay, TimeUnit unit)
,定时调度,延迟delay
后执行,且只执行一次
手动创建线程池
理论上,我们可以通过Executors
来创建线程池,这种方式非常简单。但正是因为简单,所以限制了线程池的功能。比如:无长度限制的队列,可能因为任务堆积导致OOM,这是非常严重的bug,应尽可能地避免。怎么避免?归根结底,还是需要我们通过更底层的方式来创建线程池。
抛开定时调度的线程池不管,我们看看ThreadPoolExecutor
。它提供了好几个构造方法,但是最底层的构造方法却只有一个。那么,我们就从这个构造方法着手分析。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
这个构造方法有7个参数,我们逐一来进行分析。
corePoolSize
,线程池中的核心线程数,也是线程池中常
驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务maximumPoolSize
,线程池中的最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue
队列填满时才会创建多于corePoolSize
的线程(线程池总线程数不超过maxPoolSize
)keepAliveTime
,非核心线程的空闲时间超过keepAliveTime
就会被自动终止回收掉,注意当corePoolSize=maxPoolSize
时,keepAliveTime
参数也就不起作用了(因为不存在非核心线程)unit
,keepAliveTime
的时间单位,可以是毫秒、秒、分钟、小时和天,等等workQueue
,等待队列,线程池中的线程数超过核心线程数corePoolSize
时,任务将放在等待队列,它是一个BlockingQueue
类型的对象threadFactory
,创建线程的工厂类,默认使用Executors.defaultThreadFactory()
,也可以使用guava
库的ThreadFactoryBuilder
来创建handler
,拒绝策略,当线程池和等待队列(队列已满且线程数达到maximunPoolSize)
都满了之后,需要通过该对象的回调函数进行回调处理
这些参数里面,基本类型的参数都比较简单,我们不做进一步的分析。我们更关心的是workQueue
、threadFactory
和handler
,接下来我们将进一步分析。
1. 等待队列-workQueue
等待队列是BlockingQueue
类型的,理论上只要是它的子类,我们都可以用来作为等待队列。
同时,jdk内部自带一些阻塞队列,我们来看看大概有哪些。
ArrayBlockingQueue
,(有界队列):队列长度受限,当队列满了就需要创建多余的线程来执行任务LinkedBlockingQueue
,队列可以有界,也可以无界。基于链表实现的阻塞队列。当请求越来越多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致内存占用过多或OOMSynchronousQueue
,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()
的默认队列。可以简单理解为队列长度为零PriorityBlockingQueue
,带优先级的无界阻塞队列
通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。
2. 线程工厂-threadFactory
ThreadFactory
是一个接口,只有一个方法。既然是线程工厂,那么我们就可以用它生产一个线程对象。来看看这个接口的定义。
public interface ThreadFactory { /** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); }
Executors
的实现使用了默认的线程工厂-DefaultThreadFactory
。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
很多时候,我们需要自定义线程名字。我们只需要自己实现ThreadFactory
,用于创建特定场景的线程即可。
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build(); ExecutorService service = new ThreadPoolExecutor(1,1,200L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue(),namedThreadFactory);
3. 拒绝策略-handler
所谓拒绝策略,就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他...
jdk自带4种拒绝策略
CallerRunsPolicy
// 在调用者线程执行,即让提交任务的线程去执行任务(对比前三种比较友好一丢丢)AbortPolicy
// 直接抛出RejectedExecutionException
异常DiscardPolicy
// 默默丢弃任务,不进行任何通知DiscardOldestPolicy
// 丢弃队列里最旧的那个任务,再尝试执行当前任务
这四种策略各有优劣,比较常用的是DiscardPolicy
,但是这种策略有一个弊端就是任务执行的轨迹不会被记录下来。所以,我们往往需要实现自定义的拒绝策略, 通过实现RejectedExecutionHandler
接口的方式。
提交任务的几种方式
往线程池中提交任务,主要有两种方法,execute()
和submit()
。
execute()
用于提交不需要返回结果的任务,我们看一个例子。
public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(() -> System.out.println("hello")); }
submit()
用于提交一个需要返回果的任务。该方法返回一个Future
对象,通过调用这个对象的get()
方法,我们就能获得返回结果。get()
方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit)
,这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException
。
public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); Future<Long> future = executor.submit(() -> { System.out.println("task is executed"); return System.currentTimeMillis(); }); System.out.println("task execute time is: " + future.get()); }
关闭线程池
在线程池使用完成之后,我们需要对线程池中的资源进行释放操作,这就涉及到关闭功能。我们可以调用线程池对象的shutdown()
和shutdownNow()
方法来关闭线程池。
这两个方法都是关闭操作,又有什么不同呢?
shutdown()
会将线程池状态置为SHUTDOWN
,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。shutdownNow()
会将线程池状态置为SHUTDOWN
,对所有线程执行interrupt()
操作,清空队列,并将队列中的任务返回回来。
另外,关闭线程池涉及到两个返回boolean
的方法,isShutdown()
和isTerminated
,分别表示是否关闭和是否终止。
如何正确配置线程池的参数
前面我们讲到了手动创建线程池涉及到的几个参数,那么我们要如何设置这些参数才算是正确的应用呢?实际上,需要根据任务的特性来分析。
- 任务的性质:CPU密集型、IO密集型和混杂型
- 任务的优先级:高中低
- 任务执行的时间:长中短
- 任务的依赖性:是否依赖数据库或者其他系统资源
不同的性质的任务,我们采取的配置将有所不同。在《Java并发编程实践》中有相应的计算公式。
通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。
PS
:我们可以通过Runtime.getRuntime().availableProcessors()
来获取CPU的个数。
线程池监控
如果系统中大量用到了线程池,那么我们有必要对线程池进行监控。利用监控,我们能在问题出现前提前感知到,也可以根据监控信息来定位可能出现的问题。
那么我们可以监控哪些信息?又有哪些方法可用于我们的扩展支持呢?
首先,ThreadPoolExecutor
自带了一些方法。
long getTaskCount()
,获取已经执行或正在执行的任务数long getCompletedTaskCount()
,获取已经执行的任务数int getLargestPoolSize()
,获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过int getPoolSize()
,获取线程池线程数int getActiveCount()
,获取活跃线程数(正在执行任务的线程数)
其次,ThreadPoolExecutor
留给我们自行处理的方法有3个,它在ThreadPoolExecutor
中为空实现(也就是什么都不做)。
protected void beforeExecute(Thread t, Runnable r)
// 任务执行前被调用protected void afterExecute(Runnable r, Throwable t)
// 任务执行后被调用protected void terminated()
// 线程池结束后被调用
针对这3个方法,我们写一个例子。
public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("beforeExecute is called"); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("afterExecute is called"); } @Override protected void terminated() { System.out.println("terminated is called"); } }; executor.submit(() -> System.out.println("this is a task")); executor.shutdown(); } }
输出结果如下:
beforeExecute is called this is a task afterExecute is called terminated is called
一个特殊的问题
任何代码在使用的时候都可能遇到问题,线程池也不例外。楼主在现实的系统中就遇到过很奇葩的问题。我们来看一个例子。
public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executor.submit(new DivTask(100, i)); } } static class DivTask implements Runnable { int a, b; public DivTask(int a, int b) { this.a = a; this.b = b; } @Override public void run() { double result = a / b; System.out.println(result); } } }
该代码执行的结果如下。
我们循环了5次,理论上应该有5个结果被输出。可是最终的执行结果却很让人很意外--只有4次输出。我们进一步分析发现,当第一次循环,除数为0时,理论上应该抛出异常才对,但是这儿却没有,异常被莫名其妙地吞掉了!
这又是为什么呢?
我们进一步看看submit()
方法,这个方法是一个非阻塞方法,有一个返回对象,返回的是Future
对象。那么我们就猜测,会不会是因为没有对Future
对象做处理导致的。
我们将代码微调一下,重新运行,异常信息终于打印出来了。
for (int i = 0; i < 5; i++) { Future future= executor.submit(new DivTask(100, i)); try { future.get(); } catch (Exception e) { e.printStackTrace(); } }
PS
:在使用submit()
的时候一定要注意它的返回对象Future
,为了避免任务执行异常被吞掉的问题,我们需要调用Future.get()
方法。另外,使用execute()
将不会出现这种问题。
异步编程利器:CompletableFuture
异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金 (juejin.cn)
我们异步执行一个任务时,一般是用线程池Executor
去创建。如果不需要有返回值, 任务实现Runnable接口;如果需要有返回值,任务实现Callable
接口,调用Executor
的submit
方法,再使用Future
获取即可。如果多个线程存在依赖组合的话,我们怎么处理呢?可使用同步组件CountDownLatch
、CyclicBarrier
等,但是比较麻烦。其实有简单的方法,就是用CompeletableFuture
。
一个例子回顾 Future
因为CompletableFuture
实现了Future
接口,我们先来回顾Future
吧。
Future
是Java5
新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future
获取计算结果。
来看个简单例子吧,假设我们有两个任务服务,一个查询用户基本信息,一个是查询用户勋章信息。如下,
public class UserInfoService { public UserInfo getUserInfo(Long userId) throws InterruptedException { Thread.sleep(300);//模拟调用耗时 return new UserInfo("666", "捡田螺的小男孩", 27); //一般是查数据库,或者远程调用返回的 } } public class MedalService { public MedalInfo getMedalInfo(long userId) throws InterruptedException { Thread.sleep(500); //模拟调用耗时 return new MedalInfo("666", "守护勋章"); } }
接下来,我们来演示下,在主线程中是如何使用Future来进行异步调用的。
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() { @Override public UserInfo call() throws Exception { return userInfoService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); Thread.sleep(300); //模拟主线程其它操作耗时 FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() { @Override public MedalInfo call() throws Exception { return medalService.getMedalInfo(userId); } }); executorService.submit(medalInfoFutureTask); UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果 MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
运行结果:
总共用时806ms
如果我们不使用Future进行并行异步调用,而是在主线程串行进行的话,耗时大约为300+500+300 = 1100 ms
。可以发现,future+线程池
异步配合,提高了程序的执行效率。
但是Future
对于结果的获取,不是很友好,只能通过阻塞
或者轮询的方式
得到任务的结果。
Future.get()
就是阻塞调用,在线程获取结果之前get方法会一直阻塞
。Future
提供了一个isDone
方法,可以在程序中轮询这个方法查询
执行结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源
。因此,JDK8设计出CompletableFuture
。CompletableFuture
提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
一个例子走进CompletableFuture
我们还是基于以上Future
的例子,改用CompletableFuture
来实现
public class FutureTest { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { UserInfoService userInfoService = new UserInfoService(); MedalService medalService = new MedalService(); long userId =666L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); Thread.sleep(300); //模拟主线程其它操作耗时 CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId)); UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果 MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
可以发现,使用CompletableFuture
,代码简洁了很多。CompletableFuture
的supplyAsync
方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture
使用了默认线程池是ForkJoinPool.commonPool
。
CompletableFuture
提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理
等方面。我们一起来学习吧
CompletableFuture使用场景
- 创建异步任务
- 简单任务异步回调
- 多个任务组合处理
创建异步任务
CompletableFuture
创建异步任务,一般有supplyAsync
和runAsync
两个方法
supplyAsync
执行CompletableFuture
任务,支持返回值runAsync
执行CompletableFuture
任务,没有返回值。
supplyAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定义线程,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync方法
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
实例代码如下:
public class FutureTest { public static void main(String[] args) { //可以自定义线程池 ExecutorService executor = Executors.newCachedThreadPool(); //runAsync的使用 CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,关注公众号:捡田螺的小男孩"), executor); //supplyAsync的使用 CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.print("supply,关注公众号:捡田螺的小男孩"); return "捡田螺的小男孩"; }, executor); //runAsync的future没有返回值,输出null System.out.println(runFuture.join()); //supplyAsync的future,有返回值 System.out.println(supplyFuture.join()); executor.shutdown(); // 线程池需要关闭 } } //输出 run,关注公众号:捡田螺的小男孩 null supply,关注公众号:捡田螺的小男孩捡田螺的小男孩
任务异步回调
1. thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action);
CompletableFuture
的thenRun
方法,通俗点讲就是,做完第一个任务后,再做第二个任务
。某个任务执行完成后,执行回调方法;但是前后两个任务 没有参数传递,第二个任务也没有返回值
public class FutureThenRunTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("先执行第一个CompletableFuture方法任务"); return "捡田螺的小男孩"; } ); CompletableFuture thenRunFuture = orgFuture.thenRun(() -> { System.out.println("接着执行第二个任务"); }); System.out.println(thenRunFuture.get()); } } //输出 先执行第一个CompletableFuture方法任务 接着执行第二个任务 null
thenRun
和thenRunAsync
有什么区别呢?可以看下源码哈:
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); }
如果你执行第一个任务的时候,传入了一个自定义线程池:
- 调用
thenRun
方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
。 - 调用
thenRunAsync
执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
TIPS
: 后面介绍的thenAccept
和thenAcceptAsync
,thenApply
和thenApplyAsync
等,它们之间的区别也是这个哈。
这篇关于java-线程池+CompletableFuture的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程
- 2024-11-23JAVA对接阿里云智能语音服务入门教程
- 2024-11-23Java副业入门:初学者的简单教程
- 2024-11-23JAVA副业入门:初学者的实战指南