通用线程池
2022/1/11 23:07:17
本文主要是介绍通用线程池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
通用线程池
1. 架构模型
2. 核心参数
3. 继承体系
- Executor: 顶级接口,任务执行器
- ExecutorService:即Executor Service,跟我们正常写方法比较类似,定义了线程池的通用方法
- AbstractExecutorService: 典型的模版方法模式实现,主流程有抽象类实现,提供钩子方法,由子类实现。
4. AbstractExecutorService实现
4.1 submit
将Runable和Callable包装成RunnableFuture对象,调用子类实现的execute(RunableFuture)防范】
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); // 钩子函数,由子类实现具体的调度逻辑 execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
4.2 invokeAll
- 首先将集合中的所有Callable包装成RunnableFuture,并调用execute(Runable)方法
- 依次调用Future.get(),注意主动cancel和执行异常会被吞掉
- 超时后,会尝试中断未执行完的线程
4.3 invokeAny
- 有任何一个执行完成就结束
- 完成后,会尝试中断正在执行的任务(不一定能取消掉)
- 超时会中断所有正在执行的任务
5. ThreadPoolExecutorService实现
- 线程池状态:
- 运行状态
- RUNNING:接收新的任务,处理队列中的任务
- SHUTDOWN:不接受新任务,但是处理队列中的任务
- STOP:不接受新任务,不处理队列中的任务,同时打断队列中的任务
- TIDYING:所有任务都终止,工作线程数量为0,在转换为TIDYING状态后会执行钩子函数terminated()
- TERMINATED: terminated() 执行结束
// 用高3位表示线程池的状态, 总共5个状态,3位正好可以表示 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
5.1 shutdown()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // CAS 设置保证执行状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 这里会尝试终止,实际不一定能终止,最后一个线程会调用终止 tryTerminate(); }
5.2 shutdownNow()
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // CAS 设置保证执行状态为STOP advanceRunState(STOP); // 中断所有的执行线程 interruptWorkers(); // 取出所有未执行的任务返回,给业务线程机会是否处理该线程 tasks = drainQueue(); } finally { mainLock.unlock(); } // 这里会尝试终止,实际不一定能终止,最后一个线程会调用终止 tryTerminate(); return tasks; }
5.3 awaitTerminated() & tryTerminate()
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { // 线程池为Terminated才会正常结束 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; // 利用条件变量,类似wait notify,但是这里支持等待时长 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 只有一个线程会执行到下面的代码,其他线程在上面return了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // tidying状态才能终止,线程数为0,队列是空 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 钩子方法 terminated(); } finally { // 状态标记为终止 ctl.set(ctlOf(TERMINATED, 0)); // 条件标量通知等待结束的线程可以放行了,之所以是signall->多个线程等待都会被放行 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
5.4 核心方法:execute
执行下面的操作:
- 一言以蔽之:先添加核心线程,然后添加到队列,队列满了后创建非核心线程。最后执行拒绝策略。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 小于核心线程数,需要启动新任务 if (workerCountOf(c) < corePoolSize) { // 会自动检测runState和workerCount, // 如果添加失败,如果返回false,要么线程数超过核心线程数,要么runState已经变更,执行后续的处理 if (addWorker(command, true)) return; c = ctl.get(); } // 如果是运行状态说明,添加失败的原因是超过核心线线程数,先添加到队列中 if (isRunning(c) && workQueue.offer(command)) { // 多线程场景,double-check int recheck = ctl.get(); // 不在运行态,直接回滚 if (! isRunning(recheck) && remove(command)) // 执行拒绝策略 reject(command); // 运行态,运行线程数等于0 else if (workerCountOf(recheck) == 0) // 第一个任务为null, 会从队列中取1个任务作为第一个任务执行 addWorker(null, false); } // 添加到队列失败,创建非核心线程,执行任务 else if (!addWorker(command, false)) // 执行拒绝策略 reject(command); }
5.5 核心方法addWorker
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 创建线程的runable对象传的是this,即worker对象,t.start会执行worker的run方法,调用runWorker(this) this.thread = getThreadFactory().newThread(this); } /** * firstTask: 第一个需要执行的任务 * core: 是否创建核心线程数 **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 如果线程池runState 为Stop状态,直接返回false // shutDown状态,会执行队列中的任务,但不会执行新的任务,所以不需要创建新线程 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { // 大于核心线程数还是大雨最大线程数,取决于core参数,超过了就不能创建新线程了,返回false,外层调用者会执行拒绝策略 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // CAS保证线程安全+1,即workerCount+1 if (compareAndIncrementWorkerCount(c)) // break跳出循环,执行循环后面的内容,continue不会跳出循环 break retry; // CAS失败,说明workerCount已经被其他线程变更, 重新取值判断 c = ctl.get(); // Re-read ctl // 运行状态到了SHUT DOWN以后(STOP, TIDYing)重新跳出到外层循环 if (runStateAtLeast(c, SHUTDOWN)) continue retry; // 其他情况运行状态不变,只需要重新执行下内层循环判断数量 } } // 工作线程数已经+1, 如果真正启动失败,会回滚 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 向容器中添加工作对象 workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动的时候,会执行Worker对象的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 添加失败,这里会回滚线程数 addWorkerFailed(w); } return workerStarted; }
5.6 Woker.runWork(Worker w)方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // keepAliveTime, getTask会调用阻塞队列的poll方法一直到取到为止, // 如果超时(keepAliveTime)未取到,会抛出中断异常,processWorkerExit会执行,删除工作线程,由GC回收 while (task != null || (task = getTask()) != null) { w.lock(); // shutdown的时候需要清除中断标志位,因为当前线程还要执行线程中的任务 // shutDownNow, 需要确保处于中断状态, 所以在任务中调用中断后,下一次任务会清除中断标志位 // 中断的时候,join,wait, notify等都可以响应中断标志位 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //子类实现钩子函数 beforeExecute(wt, task); try { // 执行任务 task.run(); // 子类实现 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } // 有异常的情况下,该值位true completedAbruptly = false; } finally { // 有异常的情况下,该值位true, 会减少workerCount, // 以便能重新创建线程,所有抛出异常并不会导致没有线程可用 processWorkerExit(w, completedAbruptly); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 保证至少一个线程运行 if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
这篇关于通用线程池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-10百万架构师第十三课:源码分析:Spring 源码分析:Spring核心IOC容器及依赖注入原理|JavaGuide
- 2025-01-10便捷好用的电商API工具合集
- 2025-01-09必试!帮 J 人团队解决物流错发漏发的软件神器!
- 2025-01-09不容小觑!助力 J 人物流客服安抚情绪的软件!
- 2025-01-09为什么医疗团队协作离不开智能文档工具?
- 2025-01-09惊叹:J 人团队用啥软件让物流服务快又准?
- 2025-01-09如何利用数据分析工具优化项目资源分配?4种工具推荐
- 2025-01-09多学科协作难?这款文档工具可以帮你省心省力
- 2025-01-09团队中的技术项目经理TPM:工作内容与资源优化策略
- 2025-01-09JIT生产管理法:优化流程,提升竞争力的秘诀