Java多线程(三):线程组与线程池
2021/6/6 1:22:57
本文主要是介绍Java多线程(三):线程组与线程池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录:
- 1. 线程组
- 2. 线程池
- 2.1 Executors(线程池工厂)
- 2.2 计划任务(延时执行,周期执行)
- 2.3 核心线程池的内部实现
- 2.3.1 任务队列(BlockingQueue)
- 2.3.2 线程工厂(ThreadFactory)
- 2.3.3 拒绝策略(RejectedExecutionHandler)
- 2.4 Executors中线程池的实现
- 2.4.1 newFixedThreadPool 的实现
- 2.4.2 newSingleThreadExecutor 的实现
- 2.4.3 newCachedThreadPool 的实现
- 2.5 扩展线程池
- 2.6 分而治之(Fork & Join)
1. 线程组
在一个系统中,如果线程数量较多,且功能分配比较明确,就可以将相同功能的线程放在同一个线程组(ThreadGroup) 中。
ThreadGroup有两个比较重要的功能:
activeCount()
方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值list()
方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助
线程组使用举例:
public class TestThreadGroup { public static void main(String[] args) { ThreadGroup applesThread = new ThreadGroup("apples"); ThreadGroup bananasThread = new ThreadGroup("bananas"); Apple apple = new Apple(); Banana banana = new Banana(); for (int i = 0; i < 3; i++) { new Thread(applesThread, apple, "apple" + i).start(); } for (int i = 0; i < 3; i++) { new Thread(bananasThread, banana, "banana" + i).start(); } try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } // activeCount()方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值 System.out.println("当前活动线程总数: " + applesThread.activeCount()); // list()方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助 System.out.println("bananas组中的所有线程信息: "); bananasThread.list(); } } class Apple implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + ": I'm an apple"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } class Banana implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + ": I'm a banana"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
2. 线程池
线程池是Java并发编程中一项重要的工具。
- 虽然与进程相比,线程是一种相对轻量级的工具,但其创建、关闭依然需要消费相当的资源。
- 如果高频率的创建、关闭线程,对系统性能的影响依旧十分明显的。
为解决此问题,我们可以使用JDK提供的工具——线程池。
- 正如他的名字所表达的,线程池就像一个大池子,里面存放着一些线程;
- 当有需要的时候,就从里面取出线程以完成指定的工作;
- 使用完在将线程重新放回线程池中。
JDK为线程池提供了一套Executor
框架,以帮助开发人员有效的进行线程控制
Executor框架的核心成员:
-
Interface Executor
-
interface ExecutorService extends Executor
-
abstract class AbstractExecutorService implements ExecutorService
-
class ThreadPoolExecutor extends AbstractExecutorService
:- 实现了
Executor
接口,任何Runnable
的对象都可以被ThreadPoolExecutor
线程池调用 - 是线程池工厂(Executors)重要的内部实现支持类。
- 实现了
-
interface ScheduledExecutorService extends ExecutorService
:- 在
ExecutorService
接口之上扩展了在给定时间执行某任务的功能。 - 如固定延时后执行、周期性执行等。
- 在
-
class Executors
:- 线程池工厂,可以利用此工厂构建线程池
2.1 Executors(线程池工厂)
主要的工厂方法:
工厂方法 | 功能 |
---|---|
ExecutorService newFixedThreadPool(int nThreads) | 返回一个固定线程数量的线程池,当有新任务提交时,如有空闲线程则立即执行,否则暂存在任务队列中进行等待(先进先出)。 |
ExecutorService newSingleThreadExecutor() | 相当于newFixedThreadPool(1) 。 |
ExecutorService newCachedThreadPool() | 返回一个可根据实际情况调整的线程池,优先复用线程,线程不够时创建。 |
ScheduledExecutorService newSingleThreadScheduledExecutor() | 返回一个ScheduledExecutorService 对象,线程池大小为1。ScheduledExecutorService 接口在ExecutorService 接口之上扩展了在给定时间执行某任务的功能。 |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 与newSingleThreadScheduledExecutor() 类似,但此方法可以指定线程数量。 |
以 newFixedThreadPool(int nThreads) 为例,介绍线程池的使用:
public class TestPool { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable task = () -> { System.out.println(Thread.currentThread().getName() + ": hello thread pool!"); }; for (int i = 0; i < 4; i++) { executorService.submit(task); } // 执行完所有任务后,关闭线程池(不再接受新任务)。如果线程池已经关闭,则调用没有其他效果。 executorService.shutdown(); } }
2.2 计划任务(延时执行,周期执行)
Java线程池还提供了计划执行的功能,主要包括以下三个方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
方法会在给定的时间,对任务进行一次调度
... scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
对任务进行周期性的调度,任务的调度频率是唯一的(从第一次开始执行到第二次开始执行的时间是一定的)。
... scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
对任务进行周期性的调度,任务的调度频率不是唯一的(从第一次执行结束到第二次开始执行的时间是一定的)。
schedule举例:
public class ScheduledTask { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); Runnable runnable = () -> { System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法"); // 任务需要执行1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束"); }; System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start"); /* 延时2s后调用任务: schedule(Runnable command, long delay, TimeUnit unit) */ scheduledExecutorService.schedule(runnable, 2, TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } }
scheduleAtFixedRate举例:
public class ScheduledTask { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); Runnable runnable = () -> { System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法"); // 任务需要执行1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束"); }; System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start"); /* 每隔2s调用一次runnable任务: scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) */ scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS); } }
scheduleWithFixedDelay举例:
package com.ju.pool; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledTask { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); Runnable runnable = () -> { System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法"); // 任务需要执行1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束"); }; System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start"); /* 一开始延时5s开始执行任务,从前一次任务结束到下一次任务开始间隔2s: scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit) */ scheduledExecutorService.scheduleWithFixedDelay(runnable, 5, 2, TimeUnit.SECONDS); } }
2.3 核心线程池的内部实现
除了计划执行外,其余三个工厂方法(newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
)的内部实现均使用了ThreadPoolExcutor
类。
ThreadPoolExcutor 类的一个参数较全的构造器如下所示:
public ThreadPoolExecutor(int corePoolSize, // 指定线程池中固定存在的线程数量 int maximumPoolSize, // 指定了线程池的最大线程数量 long keepAliveTime, // 指定了线程池中超过corePoolSize的空闲线程,在多长时间内会被销毁 TimeUnit unit, // keepAliveTime的单位 BlockingQueue<Runnable> workQueue, // 任务队列,由于存储已被提交但未被执行的任务 ThreadFactory threadFactory, // 线程工厂,由于创建线程,一般用默认的即可 RejectedExecutionHandler handler) // 拒绝策略:任务太多来不及处理时,如何拒绝任务
其中大部分的参数均比较容易理解,需要解释类型的有三个:
- 任务队列(BlockingQueue)
- 线程工厂(ThreadFactory)
- 拒绝策略(RejectedExecutionHandler)
2.3.1 任务队列(BlockingQueue)
-
直接提交的队列(SynchronousQueue):
- 是一个特殊的
BlockingQueue
,没有容量,每一个插入操作都要等待一个相应的删除操作 - 反之,每一个删除操作也都要等待一个相应的插入操作
- 提交的任务并不会被真正的保存,而是总是将新的任务提交给线程
- 有空闲线程则直接使用
- 没有空闲线程,则尝试创建
- 线程数量达到最大(
maximumPoolSize
),则执行拒绝策略
- 是一个特殊的
-
有界的任务队列(ArrayBlockingQueue):
-
队列遵循先进先出规则
-
corePoolSize
有剩余则直接获得线程处理任务 -
corePoolSize
已满则将任务加入队列 -
如等待队列已满,在总线程不大于
maximumPoolSize
的前提下创建新的线程执行任务 -
如大于
maximumPoolSize
,则执行拒绝策略
-
-
无界的任务队列(LinkedBlockingQueue):
- 队列遵循先进先出规则
corePoolSize
有剩余则直接获得线程处理任务corePoolSize
已满则将任务加入队列- 由于队列无界,所以不会出现队列已满的情况
- 无界队列会一直增长(有需要的话),直至耗尽系统的内存
-
优先任务队列(PriorityBlockingQueue):
- 带有执行优先级,可以控制任务执行的先后顺序
- 是特殊的无界队列,根据任务自身的优先级顺序先后执行
2.3.2 线程工厂(ThreadFactory)
-
用于创建线程池中的线程
-
可以自定义实现自己的线程工厂
public interface ThreadFactory { Thread newThread(Runnable r); }
2.3.3 拒绝策略(RejectedExecutionHandler)
AbortPolicy
: 直接抛出异常,阻之系统正常工作(默认的拒绝策略)CallerRunsPolicy
: 只要线程池未关闭,就直接在调用者线程中,运行当前被丢弃的任务DiscardOldestPolicy
: 丢弃 最“老” 的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务DiscardPolicy
:丢弃无法处理的任务,不予任何处理
2.4 Executors中线程池的实现
对于newFixedThreadPool
、newSingleThreadExecutor
、newCachedThreadPool
方法,其方法内部均调用了下面的构造器。
- 即使用了默认的线程工厂和拒绝策略
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
2.4.1 newFixedThreadPool 的实现
public static ExecutorService newFixedThreadPool(int nThreads) { // 方法参数 nThreads,线程池中固定存在的线程数量 return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize 和 maximumPoolSize 均为 nThreads 0L, TimeUnit.MILLISECONDS, // 存活时间为0ms new LinkedBlockingQueue<Runnable>()); // 使用无界的任务队列(LinkedBlockingQueue) }
2.4.2 newSingleThreadExecutor 的实现
// 基本与 newFixedThreadPool 相同,只是限定了线程池中固定存在的线程数量为1 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
2.4.3 newCachedThreadPool 的实现
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // corePoolSize = 0; maximumPoolSize = Integer.MAX_VALUE; 60L, TimeUnit.SECONDS, // keepAliveTime = 60s; new SynchronousQueue<Runnable>()); // 直接提交的队列(SynchronousQueue) }
2.5 扩展线程池
在默认的ThreadPoolExecutor 实现中,提供了空的beforeExecute()
、afterExecute()
和terminated()
方法。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。
beforeExecute()
:任务执行前被调用afterExecute()
:任务执行后被调用terminated()
:线程池退出前被调用
public class MyPool { public static void main(String[] args) { ExecutorService myPool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(t.getName() + ": 任务执行前"); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("任务执行结束"); } @Override protected void terminated() { System.out.println("线程池退出"); } }; myPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "任务执行中..."); }); myPool.shutdown(); } }
2.6 分而治之(Fork & Join)
“分而治之”是一个非常有效地处理大量数据的方法。
利用JDK提供的 Fork & Join 框架,我们可以十分方便的将一个大任务分解为多个子任务。当所有的子任务都执行完成后,在收集它们各自的结果,从而得到最终的结果。
- Fork & Join 框架中,任务数和线程数并不是一一对应的
- 大多数情况下,一个物理线程需要执行多个逻辑任务
- 每个线程都会有一个任务队列
- 如果线程A执行结束了,而线程B还剩下许多任务没有执行,A就会从B中”拿来“一个任务进行处理,以尽可能有效的利用物理资源
使用示例:
// 利用fork&join,实现start到end的累加 public class TestForkJoinPool extends RecursiveTask<Long> { private static final int THRESHOLD = 30000; // 划分门槛 private static final int GROUP_SIZE = 10; // 每次划分的组数 private long start; // 计数起始值 private long end; // 计数终止值 public TestForkJoinPool(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; if (end - start < THRESHOLD) { for (long i = start; i <= end; i++) { sum += i; } } else { // 将任务划分位多个子任务 long step = (end - start + 1) / GROUP_SIZE; ArrayList<TestForkJoinPool> subTasks = new ArrayList<>(); long head = start; long tail; for (int i = 0; i < GROUP_SIZE; i++) { tail = head + step; if (tail > end) { tail = end; } TestForkJoinPool subTask = new TestForkJoinPool(head, tail); head = tail + 1; subTasks.add(subTask); subTask.fork(); } // 收集子任务的计算结果 for (TestForkJoinPool subTask : subTasks) { sum += subTask.join(); } } return sum; } public static void main(String[] args) { long startTime = 0; long endTime = 0; long result = 0; ForkJoinPool forkJoinPool = new ForkJoinPool(); startTime = System.currentTimeMillis(); ForkJoinTask<Long> taskResult = forkJoinPool.submit(new TestForkJoinPool(0, 2000000000L)); try { result = taskResult.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } endTime = System.currentTimeMillis(); System.out.println("fork&join计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms"); startTime = System.currentTimeMillis(); result = 0; for (int i = 0; i <= 2000000000L; i++) { result += i; } endTime = System.currentTimeMillis(); System.out.println("普通计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms"); } }
参考书籍: Java高并发程序设计(第二版)
这篇关于Java多线程(三):线程组与线程池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-27MQ底层原理资料详解:新手入门教程
- 2024-11-27MQ项目开发资料入门教程
- 2024-11-27RocketMQ源码资料详解:新手入门教程
- 2024-11-27本地多文件上传简易教程
- 2024-11-26消息中间件源码剖析教程
- 2024-11-26JAVA语音识别项目资料的收集与应用
- 2024-11-26Java语音识别项目资料:入门级教程与实战指南
- 2024-11-26SpringAI:Java 开发的智能新利器
- 2024-11-26Java云原生资料:新手入门教程与实战指南
- 2024-11-26JAVA云原生资料入门教程