20211018-ThreadPoolExecutor
2022/5/22 23:05:38
本文主要是介绍20211018-ThreadPoolExecutor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
成员变量
ctl变量
/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;// 32-3=29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 2的29次方-1,0001... 低29位表示线程数最大数,高3位表示executors状态 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }// 运行状态,即上面的 RUNNING等 private static int workerCountOf(int c) { return c & CAPACITY; } // worker即工人数量 private static int ctlOf(int rs, int wc) { return rs | wc; }// runState 与 workerCount的和
mainLock+works
private final ReentrantLock mainLock = new ReentrantLock();// 访问works的锁 private final HashSet<Worker> workers = new HashSet<Worker>();
ThreadPoolExecutor
execute
1、worker<coreSize,新增worker
2、worker>=coreSize,queue未满,加入任务队列
3、worker>=coreSize,queue满了,但是worker<maxSize,新增worker
4、worker>=maxSize,queue满了,拒绝策略拒绝
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
addWorker
1、第一个for
自旋+CAS:增加 ctl内 worker数量
2、第二个for
new Worker,再加入 works
worker.start
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))// 数量限制与workers数量比较,决定能否新增worker return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker
public void run() { runWorker(this); }
runWorker
1、取任务,来自 firstTask 或者 getTask()
2、有任务,task.run(),进入下一个while
3、无任务,processWorkerExit
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true;// 突然中断,如果while条件未满足则非突然的,其他都是突然的 try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask
1、是否淘汰(核心线程运行超时 或 worker数量大于corePoolSize)
2、是淘汰-超时时间内获取任务
3、不淘汰-不限时阻塞获取任务
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 是否淘汰 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 需要淘汰,超时时间内获取任务,此处使用空闲时间 workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
processWorkerExit
1、完成任务计数 更新
2、移除当前worker
3、非正常完成,新增worker
4、正常完成,worker数量满足最小要求,直接退出;不满足min要求,新增worker
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();// 是突然的,ctl的work计数未调整,此处调整 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;// 完成任务计数 更新 workers.remove(w);// 移除当前worker } finally { mainLock.unlock(); } tryTerminate();// 不太清楚有什么用 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) {// 正常完成 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许核心线程空闲超时时死亡,则线程池最小线程数为0;否则最小线程数是corePoolSize if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
参数配置
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1、需要配置哪些
int corePoolSize 核心线程数 int maximumPoolSize 最大线程数 queueSize 队列长度
2、如何配置
依照
每秒请求数(QPS,如 100~1000)+
每个请求耗时(COST,0.5s)+
系统最大响应时间(MAXRSP,2s)
corePoolSize = QPS/(1/COST) = QPS/2 = 50~500
其中 (1/COST) 可以理解为单个线程 1s内可以完成的请求数 n(0<n<无限大),此处为 2,即 1s内一个线程能完成 2个请求
哦,网上还说了个什么 8020原则,貌似希望核心线程数满足 80% 的最大请求数,那么此处应该就是 400
queueSize = (MAXRSP-COST) * (max(QPS)-corePoolSize*(1/COST)) = 1.5 * (1000-800) = 300
太大:接入了无法满足最大响应时间的请求
太小:能满足最大响应时间的请求又拒绝了
队列大小应该满足最大响应时间,目前看是队列满时,最后一个任务出队完成刚好满足最大响应时间
最大响应时间 2s - 请求耗时 0.5s = 最长待 1.5秒,即 1.5s内核心线程数可以堆积的任务数
maximumPoolSize = max(QPS)/(1/COST) = 500
太大,创建过多线程,OOM;应该大于corePoolSize=400 但是小于最大 QPS 所需线程数=500
原则上最大线程数与队列都满负荷运作,应该满足最大请求数,此处QPS=1000
3、问
进入队列的请求与下一秒新的请求,谁会先执行
队列内的请求由以往work完成
新的请求看情况是
1、入队-新请求后执行
2、新增worker-新请求应该会先执行
3、拒绝
这篇关于20211018-ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享