FutureTask(未来任务) 源码解析
2021/7/25 11:38:03
本文主要是介绍FutureTask(未来任务) 源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
FutureTask(未来任务)
一、前情回顾(重要)
首先我们先回顾一下多线程创建的方式
- 直接继承Thread方式
- 实现Runnable 方式
- 实现Callable方式
- 线程池方式
这四种方式主要分为两类:没返回值的(1,2) 有返回值的(3,4)
没返回值的相信已经烂熟于心了。这次我们讲讲有返回值的,下面先给出3,4的两种创建多线程的示例:
1.实现Callable方式
public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建任务 FutureTask task = new FutureTask(() -> { System.out.println(Thread.currentThread().getName() + " is running"); return 200; }); //创建线程 new Thread(task,"A").start(); //获得任务的返回值 System.out.println(task.get()); } } /** 执行结果: A is running 200 **/
提问?
1.Thread为什么能接收FutureTask? 2.为什么线程能执行Callable实现的call方法?
2.线程池方式
//实现Runnable class RunnableDemo implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName() + " runnable is running"); } } //实现Callable class CallableDemo implements Callable{ @Override public Object call() throws Exception { System.out.println(Thread.currentThread().getName() + " callable is running"); return 200; } } public class ThreadPoolTest { public static void main(String[] args) throws Exception { //创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(5); //提交并获得任务 Future callableTask = threadPool.submit(new CallableDemo()); Future runnableTask = threadPool.submit(new RunnableDemo(), 400); //根据任务获得返回值 System.out.println(callableTask.get()); System.out.println(runnableTask.get()); //关闭线程池 threadPool.shutdown(); } } /** pool-1-thread-1 callable is running 200 pool-1-thread-2 runnable is running 400 **/
提问?
1. 为何线程池中能传入Callable 或 Runnable的实现? 2. submit的方法返回的Future类是什么? 3. 为何也都可以根据任务获取返回值,且Runnable也有返回值?
你在学习以上两种方式创建多线程的时候是否会有这些疑问呢,根据上面的几个提问,来一点点获取线索,相信自己最终可以破案。
3.获取线索
我们首先要找出 实现Callable方式
与 线程池方式
两者的共同点。
-
实现Callable方式
中new FutureTask()
-
线程池的方式
这里我们要先看一下submit()
方法做了什么?
public interface ExecutorService extends Executor{ <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); }
不难发现,ExecutorService
是一个接口,那么就直接去看他的一个实现类
public abstract class AbstractExecutorService implements ExecutorService { //提交Runnable实现 不带任务返回值 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } //提交Runnable实现 带任务返回值 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } //提交Callable实现 public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } }
有三个submit()
的重载方法,其中有个共同的方法就是newTaskFor()
将submit
的传参的值原封不动的传了进去,且返回了个 RunnableFuture
,那么直接来看这个newTaskFor()
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
惊奇的发现它内部也是 new FutureTask()
。
那么接下来就来到了我们今天的主题 解密 FutureTask
二、继承与实现
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V>
从FutureTask
的实现关系来看,它实际上是实现了 Runnable
和 Future
这两个接口
这里也就解释了之前的一个问题:
问:Thread为什么能接收FutureTask? 答:因为FutureTask实现了Runnable接口。
三、构造方法
//接收Callable实现的构造 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); //赋值 Callable的实现 this.callable = callable; //将当前任务标记为 NEW(任务尚未执行) this.state = NEW; // ensure visibility of callable } //接收Runnable实现的构造 public FutureTask(Runnable runnable, V result) { //将Runnable包装成Callable this.callable = Executors.callable(runnable, result); //将当前任务标记为 NEW(任务尚未执行) this.state = NEW; // ensure visibility of callable }
接收Runnable
实现的构造中,使用了适配器模式 将Runnable
包装成Callable
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); //使用的是适配器模式将runnable转换为了 callable接口,外部线程 通过get获取当前任务执行结果时 //结果可能为null 也可以能为 传进来的result值 return new RunnableAdapter<T>(task, result); } //实现了Callable,并实现了 static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { //将runnable的实现赋值给task this.task = task; this.result = result; } public T call() { task.run(); return result; } }
RunnableAdapter
实现了Callable
并在实现call()
方法中调用了Runnable
实现的run()
方法。这样就成功将Runnable
包装成了 Callable
四、成员变量及内部类
//当前任务的状态 private volatile int state; //以下7个对应任务有哪些状态 //当前任务尚未执行 private static final int NEW = 0; //当前任务正在结束,一种临界状态 private static final int COMPLETING = 1; //当前任务正常结束,且有正常的返回值 private static final int NORMAL = 2; //当前任务异常结束,且会向上抛异常 private static final int EXCEPTIONAL = 3; //当前任务被取消 private static final int CANCELLED = 4; //当前任务中断中... private static final int INTERRUPTING = 5; //当前任务已中断, 中断就是个标记,并不代表已经结束了。 private static final int INTERRUPTED = 6; //构造方法中传进来的 runnable/callable, //callable直接赋值 //runnable 经过 RunnableAdapter包装成Callable后赋值 private Callable<V> callable; //存放任务的执行结果,有两种结果 //正常情况下,任务正常执行结束, outcome保存 callable的call方法的返回值 //异常情况下,callable的 call方法向上抛出异常,outcome保存异常 private Object outcome; //当前任务被线程执行期间,保存当前执行任务的线程的对象引用,有且仅有一个线程。 private volatile Thread runner; //因为会有很多线程来get当前任务的结果, 所以这里使用了链表,来存放这些线程。 private volatile WaitNode waiters; static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
五、成员方法
看下面的代码时,需要规定一个场景
场景 : 多线程并发
1.执行入口run()方法
为什么说run()是执行入口?
当我们启动线程时,最基本的就是
new Thread().start()
start()
内部实际上又调用了native start0()
方法
通过操作系统为我们创建一个线程,并调用run()
方法。
最终会执行target.run()
也就会调用Runnable
实现的run()
方法 (详情查看Thread
源码)由于
FutureTask
是Runnable
的实现,因此该run()
方法是执行入口。
public void run() { /** 判断当前任务状态是否为未执行 state != NEW 线程池中的所有线程会争抢该task任务, 通过CAS控制并发,赋值runner属性,也就意味着只有一个线程能够抢到该任务去执行 !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()) **/ //该判断的目的主要是为了保证只能有一个线程来执行该任务。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //执行到这里,当前task一定是NEW状态,且只有一个当前线程抢占成功,runner属性是当前线程的引用。 try { //callable就是程序员自己实现的callable/通过装饰后的runnable(RunnableAdapter) Callable<V> c = callable; //相当于是一个DCL //条件一:c != null 防止空指针 //条件二:state==NEW 防止被其它线程Cancel了,所以再次确认下任务状态 if (c != null && state == NEW) { //执行call的返回结果的引用 V result; //true 表示callable.call 执行成功 未抛出异常 //false 表示callable.call 执行失败 抛出异常 boolean ran; try { //执行程序员实现的业务逻辑 result = c.call(); //若没抛异常,则设置为true ran = true; } catch (Throwable ex) { //若抛异常,则设置为false,返回值也置为null result = null; ran = false; //TODO 后面会说... (透露:会将异常信息设置到outcome中) setException(ex); } //如果为true 代表执行成功, if (ran) //TODO 后面会说... (透露:会将成功返回值结果设置到outcome中) set(result); } } finally { //将执行当前任务的线程引用置为null, 表示该任务没有线程对其进行执行了 runner = null; // 判断当前任务状态是不是中断状态 int s = state; if (s >= INTERRUPTING) //如果是中断状态,那么该任务会一直死循环在该处,直到由外部对其进行cancel 或者 unpark handlePossibleCancellationInterrupt(s); } }
下面是 run()
中调用的方法
protected void set(V v) { //使用CAS方式,设置当前任务状态完成中... //有没有可能失败呢? 外部线程等不及了,直接在set执行CAS前,将task取消了 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //赋值正常返回值 outcome = v; //结果赋值结束后,马上会将当前任务状态修改为 NORMAL状态 表示正结束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //TODO 该方法后面再说... finishCompletion(); } }
protected void setException(Throwable t) { //使用CAS方式,设置当前任务状态完成中... //有没有可能失败呢? 外部线程等不及了,直接在setException执行CAS前,将task取消了 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //赋值异常信息 outcome = t; //将当前任务状态修改为 EXCEPTIONAL状态 表示异常状态 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //TODO 该方法后面再说... finishCompletion(); } }
private void handlePossibleCancellationInterrupt(int s) { //若任务状态是中断中... 则一直在此死循环,且一直释放CPU,直到该状态不是中断中... if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
2.获取任务结果get()方法
get()方法有两种,一种是不带超时的,一种是带超时的。这里我们主要看不带超时的。
public V get() throws InterruptedException, ExecutionException { //获得当前任务状态 int s = state; //条件成立:当前任务状态为 NEW,COMPLETING 表示当前任务还未执行或正在完成中.. if (s <= COMPLETING) //所有调用get()方法的线程 会被阻塞在此 //直到会得到结果(任务的状态返回值) 或者 向上抛出中断异常 s = awaitDone(false, 0L); //返回 结果或者抛异常 return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //timed = false //deadline = 0L final long deadline = timed ? System.nanoTime() + nanos : 0L; //引用当前线程 封装成WaitNode对象 WaitNode q = null; //表示当前get()的线程有没有入队 boolean queued = false; //自旋 for (;;) { //条件成立: 当前执行get的线程被interrupt 叫醒 if (Thread.interrupted()) { //将引用当前线程的WaitNode对象 移除队,且在移除的过程中同样会移除其它q=null的节点对象 removeWaiter(q); //向上抛出中断的异常 throw new InterruptedException(); } //获取当前任务的状态 int s = state; //条件成立:说明当前任务已经有结果了,可能是好结果,也可以能使抛出异常的结果 if (s > COMPLETING) { //条件成立:说明当前线程已经创建过WaitNode节点, if (q != null) //此时由于得到了结果,需要将q.thread = null //helpGC 或者 当有线程执行removeWaiter方法时 会主动将该节点移除 q.thread = null //直接返回当前任务的状态 return s; } //条件成立:说明当前任务正在完成中,还差赋值和设置成NORMAL或者EXCEPTIONAL两步 else if (s == COMPLETING) // cannot time out yet //由于任务快要完成了,因此主动释放CPU,进行下一次CPU的抢占,目的是尽量让执行任务的线程去抢到CPU Thread.yield(); //条件成立: 第一次自旋,当前任务是NEW状态,当前线程还未创建WaitNode对象 else if (q == null) //为当前线程创建WaitNode对象 q = new WaitNode(); //条件成立:第二次自旋,当前任务是NEW状态,当前线程已经创建WaitNode对象,但是WaitNode对象还为入队 else if (!queued) //头插法入队 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //条件成立:第三次自旋 当前任务是NEW状态,由于规定timed=false,则不会进入这个判断,会进入下面的else //也就意味着 如果是带超时的会走当前判断, 不带超时的会走下一个else判断 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //不带超时的 //当前get操作的线程就会被park,线程会进入waiting状态 休眠 //除非有其它线程将其unpark(thread) 唤醒 或者 将当前线程中断 才会进入下一次自旋 else LockSupport.park(this); } }
返回给用户结果的方法,该结果可能是好结果,也可能是抛异常的结果
private V report(int s) throws ExecutionException //outcome 保存的是Callable运行结束的结果,可能是正常的好结果,也可以能是异常的信息 Object x = outcome; //条件成立:当前任务状态正常结束 if (s == NORMAL) //直接返回好结果 return (V)x; //条件成立:任务被取消 或 中断 if (s >= CANCELLED) //抛出异常 throw new CancellationException(); //执行到这 说明任务状态为EXCEPTIONAL 程序员的Callable实现的代码有bug了 throw new ExecutionException((Throwable)x); }
3.finishCompletion()方法
private void finishCompletion() { // assert state > COMPLETING; //循环Waiters链表 for (WaitNode q; (q = waiters) != null;) { //使用CAS设置Waiters为null,是因为防止外部线程使用cancel()方法取消当前任务 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //获取当前Node节点封装的thread Thread t = q.thread; //条件成立:说明当前下次呢很难过不会为null if (t != null) { //help GC q.thread = null; //唤醒get()阻塞的的线程 LockSupport.unpark(t); } WaitNode next = q.next; //如果到链表末尾了就退出循环 if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //可扩展的点 done(); //将callable 设置为null help GC callable = null; // to reduce footprint }
4.取消任务 cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) { //也就意味着 当前任务状态如果是NEW 不允许被cancel 将状态改为INTERRUPTING 或者CANCELLED if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; //执行到这就说明 当前任务是从非NEW状态 被改为了INTERRUPTING 或CANCELLED 的 //mayInterruptIfRunning // true: INTERRUPTING false:CANCELLED try { //如果是true 也就是状态改成了INTERRUPTING if (mayInterruptIfRunning) { try { //将执行当前任务的线程取出来 Thread t = runner; //判断是否有线程执行该任务 if (t != null) //有的话 则中断该线程 t.interrupt(); } finally { // final state //最终将该任务的状态设置为 INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //然后唤醒所有执行get()阻塞的线程 finishCompletion(); } return true; }
这篇关于FutureTask(未来任务) 源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-19JAVA分布式id教程:轻松入门与实践
- 2024-11-19Java高并发教程:入门与实践指南
- 2024-11-19JAVA高并发直播教程:新手入门指南
- 2024-11-19Java高并发直播教程:入门与实践指南
- 2024-11-19Java微服务教程:初学者快速入门指南
- 2024-11-19JAVA微服务教程:新手入门的详细指南
- 2024-11-19Java微服务教程:从零开始搭建你的第一个微服务应用
- 2024-11-19Java项目开发教程:初学者必备指南
- 2024-11-19Java项目开发教程:新手快速入门指南
- 2024-11-19Java项目开发教程:零基础入门到实战