Java线程池系列之execute和submit区别

2022/1/8 12:03:49

本文主要是介绍Java线程池系列之execute和submit区别,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

这是几年前的一道面试题了,网上的答案一般都是

1. execute只能提交Runnable类型的任务,没有返回值,而submit既能提交Runnable类型任务也能提交Callable类型任务,返回Future类型。
2. execute方法提交的任务异常是直接抛出的,而submit方法是是捕获了异常的,当调用FutureTask的get方法时,才会抛出异常。

我们可能看过n多次但又忘了n多加1次,那么我们到底应该如何才能彻底理解和记住这两者的区别呢,这就要从ThreadPoolExecutor的根上说起了。根是什么呢,就是下图的继承体系:

可以看到ThreadPoolExecutor的根上是两个接口,分别是Executor和ExecutorService,查看接口里的方法如下:

从图中就可以很清楚的看到execute和submit的第一个区别了,

execute是Executor接口的方法,而submit是ExecutorService的方法,并且ExecutorService接口继承了Executor接口。

接下来,我们就从这四个类依次入手进行分析。

Executor

Executor接口源码贴在下面了,源码注释很多,对示例部分做了删减。

/**
 * An object that executes submitted {@link Runnable} tasks. This
 * interface provides a way of decoupling task submission from the
 * mechanics of how each task will be run, including details of thread
 * use, scheduling, etc.  An {@code Executor} is normally used
 * instead of explicitly creating threads. 
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

从类注释可以看出,作用是解耦任务提交和任务执行,比如执行一个Runnable任务,你可能会使用

new Thread(new(RunnableTask())).start()

而使用Executor之后,你就可以这样写了

Executor executor = anExecutor;
executor.execute(new RunnableTask());

看起来好像代码量增加了,其实不然。对第一种写法,任务的提交和执行就被固定死了,只能新起一个线程去跑,可以说毫无扩展性;而第二种写法,我们可以通过切换anExecutor到anotherExecutor,便可以很容易的改变任务的执行方式,如直接run、起新线程跑、存优先级队列跑,这就是把任务提交和执行解耦的好处。

最后,从方法定义上可以看到,入参是Runnable类型,没有返回值。 

ExecutorService

ExecutorService在Executor基础上,提供了更多的功能,如管理执行器的生命周期和异步任务的执行等。代码如下,还是删减了与本地主题关系不大的注释和方法

/**
 * An {@link Executor} that provides methods to manage termination and
 * methods that can produce a {@link Future} for tracking progress of
 * one or more asynchronous tasks.
 
 * <p>Method {@code submit} extends base method {@link
 * Executor#execute(Runnable)} by creating and returning a {@link Future}
 * that can be used to cancel execution and/or wait for completion.
 *
 * @since 1.5
 * @author Doug Lea
*/
public interface ExecutorService extends Executor {

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);
}

ExecutorService增加了三个submit方法,都可以返回一个Future对象。通过Future对象,就可以做更多的事情:获取任务执行结果、判断任务是否执行完毕、取消任务等等。

这样我们就发现execute和submit的第二个区别了,

execute只接受Runnable参数,没有返回值;而submit可以接受Runnable参数和Callable参数,并且返回了Future对象,可以进行任务取消、获取任务结果、判断任务是否执行完毕/取消等操作。

AbstractExecutorService

AbstractExecutorService看类名就知道是一个抽象类,它实现了ExecutorService接口,并提供了一些通用的方法实现,其中就实现了三个submit方法。

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

在submit方法中,首先会判断入参是否为null,如果是就抛出空指针异常,显然线程池不允许提交null任务。

然后第二步,不管入参是Runnable还是Callable,统统封装成RunnableFuture类型的任务,也就是submit方法会对任务做一次适配。RunnableFuture看名字就知道它同时继承了Runnable接口和Future接口,这样就可以作为返回值了。

第三步,调用execute方法。原来是submit会调用execute方法,而AbstractExecutorService没有实现execute方法,显然是留给子类去实现了,这里就用到了模板方法模式。

最后,返回了ftask,上面说了,ftask既是Runnable又是Future,因此我们就可以通过ftask来获取最终任务结果了。

这样我们就发现execute和submit的一个联系了,

submit会对Runnable或Callable入参封装成RunnableFuture对象,调用execute方法并返回。

还有一点值得提一下,这里的newTaskFor方法返回了FutureTask对象,也就是FutureTask是实现了RunnableFuture接口的,而newTaskFor方法是protected的,也就是说我们可以在子类中重写该方法,以达到我们特殊的目的。 

ThreadPoolExecutor

终于到了本文主角——线程池了。

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
    }
}

ThreadPoolExecutor最终对execute方法做了实现,方法上的注释也解释的很清楚,线程池是如何处理提交过来的任务的。这里不得不赞一下JDK源码的注释,往往比代码都长。

这里只提一下一个容易忽略或看不明白的地方,就是任务放阻塞队列时的一个子分支

else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

这里为什么放完阻塞队列之后还要再次判断下worker的数量呢,并且如果是0的话就会调用addWorkder方法。

其实,举一个实际例子就清楚了,我们在创建线程池的时候,可能想着不保持核心线程,有任务到来时才启动线程,即

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(size));

这样的话,当第一个任务到来时,因为已经达到核心线程数了(其实是0),因此任务会自动放入阻塞队列中,那岂不是永远不会执行了?因此会判断当前有没有工作线程,如果没有就添加一个,并且不需要给添加的这个工作线程指定初始任务,因为它会自动取刚刚的阻塞队列中取任务。

而这里的worker又是什么呢,这里其实我们就可以理解为对一个线程的封装,关键代码如下

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
}

runWorker是ThreadPoolExecutor的方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到runWorker中,循环不断地从阻塞队列中获取任务,并简单的通过task.run()来执行任务,如果出现异常则会向外抛出,即通过execute来执行任务,执行线程会抛出异常。

FutureTask

对于submit提交的任务又是如何执行的呢,我们就需要查看newTaskFor返回的FutureTask了,关键部分代码如下

public class FutureTask<V> implements RunnableFuture<V> {
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
}

可以看出FutureTask和runWorker的区别了,runWorker是catch到异常接着throw出去,完全不接锅;而FutureTask则是将catch到的异常存下来,在get的时候再去判断任务执行状态,如果任务状态时异常,则再抛出ExecutionException异常。

因此execute和submit的第三个区别就是

通过execute方法提交的任务如果出现异常则直接抛出原异常,是在线程池中的线程中;而submit方法是捕获了异常的,只有当调用Future的get方法时,才会抛出ExecutionException异常,且是在调用get方法的线程。

实际效果如图

总结

经过以上分析,我们得出了execute和submit的区别了

1. execute是Executor接口的方法,而submit是ExecutorService的方法,并且ExecutorService接口继承了Executor接口。
2. execute只接受Runnable参数,没有返回值;而submit可以接受Runnable参数和Callable参数,并且返回了Future对象,可以进行任务取消、获取任务结果、判断任务是否执行完毕/取消等操作。其中,submit会对Runnable或Callable入参封装成RunnableFuture对象,调用execute方法并返回。
3. 通过execute方法提交的任务如果出现异常则直接抛出原异常,是在线程池中的线程中;而submit方法是捕获了异常的,只有当调用Future的get方法时,才会抛出ExecutionException异常,且是在调用get方法的线程。

最后,附上线程池相关的类图,以便清晰地了解这些类之间的关联关系

 



这篇关于Java线程池系列之execute和submit区别的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程