线程池ThreadPoolExector核心ctl, execute, addWorker, reject源码分析
2022/4/3 22:19:43
本文主要是介绍线程池ThreadPoolExector核心ctl, execute, addWorker, reject源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
线程池核心方法execute()解析:
public void execute(Runnable command) {//#1 if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//#2 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//#3 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false))//#4 reject(command); }
先简要分析一下:
- #1:execute方法接收一个Runnable接口的对象作为任务。
- #2:首先判断当前线程数量是否超出核心线程数,若没有超出,则调用addWorker()方法,去创建一个worker(线程)去执行这个Runnable任务。
- #3:否则将任务加入等待队列,若加入等待队列成功:
- 若当前线程池没有处于运行状态,那么将任务移出队列,并且执行拒绝策略
- 若当前线程池没有线程,那么调用addworker()创建
- #4 否则(若添加队列失败),添加救急线程(核心线程之外的线程),若救急线程添加不成功,则执行拒绝
通过分析,线程池工作流程是符合我们认知的。
下图转载自https://www.cnblogs.com/trust-freedom/p/6681948.html#label_3_1
分析完线程池核心方法,带着以下疑问去展开深入:
- ctl变量是什么?
- addWorker方法做了什么?
- reject方法做了什么?
1. ctl变量
看一下ctl变量和ctlof方法:
/*The main pool control state, ctl, is an atomic integer packing two conceptual fields, workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc*/ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是一个原子整型数。官方注释全英文且没分段且标点不清晰,所以这里总结一下:
一个整型数包含了两个线程池重要状态:
- workCount: 有效线程数
- runState: 线程池工作状态,是否在工作或者shut down等等
如何做到的?
首先,一个整型数,占4个byte,也就是二进制32位,官方文档将一个整型数拆成两块来用,我们看一下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
COUNT_BITS=Integer.SIZE - 3,也就是29,将状态位左移29位也就是说:
- 定义高3位为状态位,用来表示线程池5个工作状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
- 定义低29位为workCount,也就是说workCount最大为(2^29)-1,约五百万个(说如果以后不够用的话,把int换成long就行了)。
COUNT_MAST是把1左移29位后减去1,也就是高3位为0,低29位为1,实际上就相当于掩码
那么只需要通过掩码取出高3位或者低29位,即可获取runState和workerCount了:
// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~COUNT_MASK; } private static int workerCountOf(int c) { return c & COUNT_MASK; }
然后通过原子类的方法,进行一系列的状态判断与改变:
private static boolean runStateLessThan(int c, int s) {return c < s;} private static boolean runStateAtLeast(int c, int s) {return c >= s;} private static boolean isRunning(int c) {return c < SHUTDOWN;} private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);} private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);} private void decrementWorkerCount() {ctl.addAndGet(-1);}
啥也不说了,高!
2. addWorker方法
addWorker方法判断条件过于复杂(可读性很差),全面的分析我将其折叠:
// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功 private boolean addWorker(Runnable firstTask, boolean core) {
点击打开折叠
retry: // 注意这是一个死循环 - 最外层循环 for (int c = ctl.get();;) { // 这个是十分复杂的条件,这里先拆分多个与(&&)条件: // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0) // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空 // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; // 注意这也是一个死循环 - 二层循环 for (;;) { // 这里每一轮循环都会重新获取工作线程数wc // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败 // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 成功通过CAS更新工作线程数wc,则break到最外层的循环 if (compareAndIncrementWorkerCount(c)) break retry; // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN c = ctl.get(); // Re-read ctl // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行 if (runStateAtLeast(c, SHUTDOWN)) continue retry; // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可 // else CAS failed due to workerCount change; retry inner loop } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try { // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 把创建的工作线程实例添加到工作线程集合 workers.add(w); int s = workers.size(); // 尝试更新历史峰值工作线程数,也就是线程池峰值容量 if (s > largestPoolSize) largestPoolSize = s; // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例 workerAdded = true; } } finally { mainLock.unlock(); } // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例 if (workerAdded) { t.start(); // 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程集合移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
转载自https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/
我们分析其核心部分:
将Runnable对象的任务提交给一个Worker,并提取出该Worker的成员线程为t:
w = new Worker(firstTask); final Thread t = w.thread;
如果成功添加工作线程,则调用Worker内部的线程实例t的Thread.start()方法启动真实的线程实例:
if (workerAdded) { t.start(); workerStarted = true; }
然后我们在去看看Worker类是什么情况(我理解实际上就是Thread包装了一层类似代理的壳子。。):
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
所以addWorker的核心部分就是通过Thread(Runnable)的方式,创建一个Thread,然后调用这个Thread的start方法,而我们都知道start方法会去调用我们实现Runnable接口时重写的run方法,这就是将任务提交给线程池的本质。
3. reject方法
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
套了层壳。。。调用了RejectedExecutionHandler的一个实例方法
那我们继续深挖handler:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
...好吧,继续深挖实现类。
一共4个实现类,对应4种拒绝策略,终于在里面找到rejectedExecution方法了,挨个分析:
CallerRunsPolicy: 谁(线程)提交的这个任务,谁去执行:
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
确实是这样,如果不通过start方法,而直接调用run方法,那么不会创建线程,也就是在主线程中去执行我们重写的run方法。
AbortPolicy: 直接丢弃该任务(什么都不做),抛出一个异常:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
除了抛出异常外,什么都没有做
DiscardPolicy: 直接丢弃该任务,什么都不做,异常也不抛出
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
看看,拒绝执行函数干脆就是个空的。。。
DiscardOldestPolicy: 喜新厌旧策略
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
等待队列的队头出队,然后执行新来的这个任务。
请指正补充。。
这篇关于线程池ThreadPoolExector核心ctl, execute, addWorker, reject源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享