FutureTask源码解析

2021/7/19 22:05:18

本文主要是介绍FutureTask源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

前言

一、状态

二、运行流程分析

1.run

2.get

3.cancel

4.runAndReset

三、ListenableFutureTask

总结


前言

        实现了Runnable接口的类能够新建线程运行,Future接口规范了线程的生命周期,Callable接口能够获得方法的返回值。FutureTask实现了Runnable和Future接口,同时有Callable属性,能够实现三者的功能。


一、状态

       FutureTask有NEW,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTING和INTERRUPTED七种状态,创建时状态为NEW,结果未赋值时更新为COMPLETING,结果赋值后更新为NORMAL,发生异常后会将状态置为EXCEPTIONAL,调用cancel方法后更新为CANCELLED或者INTERRUPTING状态。

* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
    public class FutureTask<V> implements RunnableFuture<V> {
    private static final int NEW          = 0; //任务尚未开始或处于执行期间
    private static final int COMPLETING   = 1; //任务即将执行完成
    private static final int NORMAL       = 2; //任务执行完毕
    private static final int EXCEPTIONAL  = 3; //任务执行期间出现未捕获异常
    private static final int CANCELLED    = 4; //任务被取消
    private static final int INTERRUPTING = 5; //任务正在被中断
    private static final int INTERRUPTED  = 6; //任务已被中断
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

二、运行流程分析

1.run

        run方法负责任务的执行,在该方法中,执行Callable对象的call方法,如果在运行过程中发生异常,会将异常写入outcome中,正常执行完成将运行结果写入outcome中。

    //运行完成之后将返回值设置为outcome,可以通过get方法获取,call方法有返回值。FutureTask实现了runniable接口,重写run方法如下,
    public void run() {
        if (state != NEW || //如果状态不属于NEW,那么通过CAS将runner变量由null设为当前线程,
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {  //获取构造时传入的Callable任务对象
            Callable<V> c = callable;
            if (c != null && state == NEW) {  //如果状态为NEW
                V result;
                boolean ran;
                try {
                    result = c.call(); //调用Callable的call方法执行任务
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); //设置异常对象,由调用get方法的线程处理这个异常
                }
                if (ran)  //如果任务正常结束则设置返回值和state变量
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)  //如果处于任务正在中断状态,则等待直到任务处于已中断状态位置
                handlePossibleCancellationInterrupt(s);
        }
    }

         当运行call方法出现异常之后,会调用setException方法,在该方法中,首先尝试将状态更新为COMPLETING,再将异常赋值给outcome,然后将状态更新为EXCEPTIONAL。

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将状态更新为将要完成状态
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //将状态更新为发生异常状态
            finishCompletion();
        }
    }

       方法运行结束之后会调用set方法,在该方法中首先将当前状态更新为COMPLETING,然后将运行结果赋值给outcome,之后将状态更新为NORMAL。

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将状态更新为将要完成状态
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //将状态更新为正常完成状态
            finishCompletion();
        }
    }
     *///在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
    }

       在对结果进行赋值之后,会调用finishCompletion方法,在该方法中,首先对等待获取结果的线程进行唤醒,然后执行done方法。

     *///唤醒因为等待结果而阻塞的线程
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //移除等待线程
                for (;;) {//自旋遍历等待线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒等待线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //结束之后调用,可以进行重写
        done();

        callable = null;        // to reduce footprint
    }

2.get

         在get方法中,如果当前状态小于等于COMPLETING状态,即当前Task还在运行中,使用awaitdown方法,当task执行完成之后调用report方法。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

       在awaitDone方法中,还是判断当前任务有没有执行完成,执行完成后将状态返回,多次判断后仍然没有执行完成会将当前线程加入等待队列中,并阻塞。

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {//如果线程中断
            if (Thread.interrupted()) { //获取并清除中断状态
                removeWaiter(q);//移除等待WaitNode
                throw new InterruptedException();
            }

            int s = state;//如果任务执行结束或被取消(中断),方法结束,返回结果
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet //如果任务即将完成,让当前线程让步,让出cpu
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued) //如果没有入队,将这个WaitNode加入到FutureTask的等待队列尾部
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) { //如果设置了超时时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else //阻塞当前线程
                LockSupport.park(this);
        }
    }

        在report方法中,如果任务正常执行完成,就返回结果,如果任务被取消,就抛出异常,如果任务执行过程中出现异常,即处于EXCPTIONAL状态,就抛出所发生的异常。

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)  //如果任务正常执行完成直接返回结果
            return (V)x;
        if (s >= CANCELLED) //如果任务被取消或被中断抛出CancellationException(运行时异常的子类)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

3.cancel

         cancel方法负责将任务取消,更改线程状态,最后还会唤醒等待结果的线程。

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && //如果任务状态为NEW并且成功通过CAS将state状态由NEW改为INTERRUPTING或CANCELLED(视参数而定)
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt(); //调用interrupt中断
                } finally { // final state //将state状态设为INTERRUPTED(已中断)
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();  //激活所有在等待队列中的线程
        }
        return true;
    }

4.runAndReset

       runAndReset方法会在运行完任务之后,将任务状态重置,但不会对结果进行赋值,在定时任务线程池中有所应用。

     *///任务执行完之后会重置stat的状态为NEW
    protected boolean runAndReset() { //和run方法类似,通过CAS操作将成员变量runner设置为当前线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {//不会调用set方法设置返回值(outcome成员变量)
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }   //这个方法不会修改state变量的值
        return ran && s == NEW;
    }

三、ListenableFutureTask

       FutureTask想要获取任务运行的结果,需要等待任务执行完成,如果调用时间太早,会导致调用线程阻塞,影响效率。在有些情况下,我们需要在任务执行完成之后执行相应的操作,同时结果不会影响主业务逻辑,我们希望在任务执行完成之后自动根据结果进行操作,而不是等主线程调用。spring对FutureTask进行了继承,重写了done方法,如果程序正常执行完成,会调用success方法,异常退出,会执行failure方法。

	public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
    protected void done() {
		Throwable cause;
		try {
			T result = get();
			this.callbacks.success(result);
			return;
		}
		catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			return;
		}
		catch (ExecutionException ex) {
			cause = ex.getCause();
			if (cause == null) {
				cause = ex;
			}
		}
		catch (Throwable ex) {
			cause = ex;
		}
		this.callbacks.failure(cause);
	}
}

总结

         本文对FutureTask的源码进行了分析,主要分析run,get,cancel和runAndSet方法,同时,对FutureTask的子类ListenableFutureTask进行了分析,其实现了异步回调功能。



这篇关于FutureTask源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程