Java多线程(三):线程组与线程池

2021/6/6 1:22:57

本文主要是介绍Java多线程(三):线程组与线程池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录:

  • 1. 线程组
  • 2. 线程池
    • 2.1 Executors(线程池工厂)
    • 2.2 计划任务(延时执行,周期执行)
    • 2.3 核心线程池的内部实现
      • 2.3.1 任务队列(BlockingQueue)
      • 2.3.2 线程工厂(ThreadFactory)
      • 2.3.3 拒绝策略(RejectedExecutionHandler)
    • 2.4 Executors中线程池的实现
      • 2.4.1 newFixedThreadPool 的实现
      • 2.4.2 newSingleThreadExecutor 的实现
      • 2.4.3 newCachedThreadPool 的实现
    • 2.5 扩展线程池
    • 2.6 分而治之(Fork & Join)

1. 线程组

在一个系统中,如果线程数量较多,且功能分配比较明确,就可以将相同功能的线程放在同一个线程组(ThreadGroup) 中。

ThreadGroup有两个比较重要的功能:

  • activeCount()方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值
  • list()方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助

线程组使用举例:

public class TestThreadGroup {

    public static void main(String[] args) {
        ThreadGroup applesThread = new ThreadGroup("apples");
        ThreadGroup bananasThread = new ThreadGroup("bananas");
        Apple apple = new Apple();
        Banana banana = new Banana();

        for (int i = 0; i < 3; i++) {
            new Thread(applesThread, apple, "apple" + i).start();
        }

        for (int i = 0; i < 3; i++) {
            new Thread(bananasThread, banana, "banana" + i).start();
        }

        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // activeCount()方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值
        System.out.println("当前活动线程总数: " + applesThread.activeCount());
        // list()方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助
        System.out.println("bananas组中的所有线程信息: ");
        bananasThread.list();
    }
}

class Apple implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": I'm an apple");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Banana implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": I'm a banana");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:
在这里插入图片描述

2. 线程池

线程池是Java并发编程中一项重要的工具。

  • 虽然与进程相比,线程是一种相对轻量级的工具,但其创建、关闭依然需要消费相当的资源。
  • 如果高频率的创建、关闭线程,对系统性能的影响依旧十分明显的。

为解决此问题,我们可以使用JDK提供的工具——线程池

  • 正如他的名字所表达的,线程池就像一个大池子,里面存放着一些线程;
  • 当有需要的时候,就从里面取出线程以完成指定的工作;
  • 使用完在将线程重新放回线程池中。

JDK为线程池提供了一套Executor框架,以帮助开发人员有效的进行线程控制

Executor框架的核心成员:

  • Interface Executor

  • interface ExecutorService extends Executor

  • abstract class AbstractExecutorService implements ExecutorService

  • class ThreadPoolExecutor extends AbstractExecutorService

    • 实现了Executor接口,任何Runnable的对象都可以被ThreadPoolExecutor线程池调用
    • 线程池工厂(Executors)重要的内部实现支持类
  • interface ScheduledExecutorService extends ExecutorService

    • ExecutorService接口之上扩展了在给定时间执行某任务的功能
    • 如固定延时后执行、周期性执行等。
  • class Executors

    • 线程池工厂,可以利用此工厂构建线程池

2.1 Executors(线程池工厂)

主要的工厂方法:

工厂方法功能
ExecutorService newFixedThreadPool(int nThreads)返回一个固定线程数量的线程池,当有新任务提交时,如有空闲线程则立即执行,否则暂存在任务队列中进行等待(先进先出)
ExecutorService newSingleThreadExecutor()相当于newFixedThreadPool(1)
ExecutorService newCachedThreadPool()返回一个可根据实际情况调整的线程池,优先复用线程,线程不够时创建。
ScheduledExecutorService newSingleThreadScheduledExecutor()返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)newSingleThreadScheduledExecutor()类似,但此方法可以指定线程数量。

以 newFixedThreadPool(int nThreads) 为例,介绍线程池的使用:

public class TestPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() +  ": hello thread pool!");
        };

        for (int i = 0; i < 4; i++) {
            executorService.submit(task);
        }

        // 执行完所有任务后,关闭线程池(不再接受新任务)。如果线程池已经关闭,则调用没有其他效果。
        executorService.shutdown();
    }
}

2.2 计划任务(延时执行,周期执行)

Java线程池还提供了计划执行的功能,主要包括以下三个方法:

  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

    方法会在给定的时间,对任务进行一次调度

  • ... scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

    对任务进行周期性的调度,任务的调度频率是唯一的(从第一次开始执行到第二次开始执行的时间是一定的)。

  • ... scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

    对任务进行周期性的调度,任务的调度频率不是唯一的(从第一次执行结束到第二次开始执行的时间是一定的)。

schedule举例:

public class ScheduledTask {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        Runnable runnable = () -> {
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
            // 任务需要执行1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
        };
        
        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");

        /*
        延时2s后调用任务:
        schedule(Runnable command, long delay, TimeUnit unit)
         */
        scheduledExecutorService.schedule(runnable, 2, TimeUnit.SECONDS);

        scheduledExecutorService.shutdown();
    }
}

在这里插入图片描述

scheduleAtFixedRate举例:

public class ScheduledTask {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        Runnable runnable = () -> {
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
            // 任务需要执行1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
        };

        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");

        /*
        每隔2s调用一次runnable任务:
        scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
         */
        scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS);
    }
}

在这里插入图片描述

scheduleWithFixedDelay举例:

package com.ju.pool;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledTask {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        Runnable runnable = () -> {
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
            // 任务需要执行1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
        };

        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");

        /*
        一开始延时5s开始执行任务,从前一次任务结束到下一次任务开始间隔2s:
        scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit)
         */
        scheduledExecutorService.scheduleWithFixedDelay(runnable, 5, 2, TimeUnit.SECONDS);
    }
}

在这里插入图片描述

2.3 核心线程池的内部实现

除了计划执行外,其余三个工厂方法(newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool)的内部实现均使用了ThreadPoolExcutor类。

ThreadPoolExcutor 类的一个参数较全的构造器如下所示:

public ThreadPoolExecutor(int corePoolSize, // 指定线程池中固定存在的线程数量
                              int maximumPoolSize, // 指定了线程池的最大线程数量
                              long keepAliveTime, // 指定了线程池中超过corePoolSize的空闲线程,在多长时间内会被销毁
                              TimeUnit unit, // keepAliveTime的单位
                              BlockingQueue<Runnable> workQueue, // 任务队列,由于存储已被提交但未被执行的任务
                              ThreadFactory threadFactory, // 线程工厂,由于创建线程,一般用默认的即可
                              RejectedExecutionHandler handler) // 拒绝策略:任务太多来不及处理时,如何拒绝任务

其中大部分的参数均比较容易理解,需要解释类型的有三个:

  • 任务队列(BlockingQueue)
  • 线程工厂(ThreadFactory)
  • 拒绝策略(RejectedExecutionHandler)

2.3.1 任务队列(BlockingQueue)

  • 直接提交的队列(SynchronousQueue):

    • 是一个特殊的BlockingQueue,没有容量,每一个插入操作都要等待一个相应的删除操作
    • 反之,每一个删除操作也都要等待一个相应的插入操作
    • 提交的任务并不会被真正的保存,而是总是将新的任务提交给线程
    • 有空闲线程则直接使用
    • 没有空闲线程,则尝试创建
    • 线程数量达到最大(maximumPoolSize),则执行拒绝策略
  • 有界的任务队列(ArrayBlockingQueue):

    • 队列遵循先进先出规则

    • corePoolSize有剩余则直接获得线程处理任务

    • corePoolSize已满则将任务加入队列

    • 如等待队列已满,在总线程不大于maximumPoolSize的前提下创建新的线程执行任务

    • 如大于maximumPoolSize,则执行拒绝策略

  • 无界的任务队列(LinkedBlockingQueue):

    • 队列遵循先进先出规则
    • corePoolSize有剩余则直接获得线程处理任务
    • corePoolSize已满则将任务加入队列
    • 由于队列无界,所以不会出现队列已满的情况
    • 无界队列会一直增长(有需要的话),直至耗尽系统的内存
  • 优先任务队列(PriorityBlockingQueue):

    • 带有执行优先级,可以控制任务执行的先后顺序
    • 是特殊的无界队列,根据任务自身的优先级顺序先后执行

2.3.2 线程工厂(ThreadFactory)

  • 用于创建线程池中的线程

  • 可以自定义实现自己的线程工厂

    public interface ThreadFactory {
        Thread newThread(Runnable r);
    }
    

2.3.3 拒绝策略(RejectedExecutionHandler)

  • AbortPolicy 直接抛出异常,阻之系统正常工作(默认的拒绝策略
  • CallerRunsPolicy 只要线程池未关闭,就直接在调用者线程中,运行当前被丢弃的任务
  • DiscardOldestPolicy 丢弃 最“老” 的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务
  • DiscardPolicy:丢弃无法处理的任务,不予任何处理

2.4 Executors中线程池的实现

对于newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool方法,其方法内部均调用了下面的构造器。

  • 即使用了默认的线程工厂和拒绝策略
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

2.4.1 newFixedThreadPool 的实现

public static ExecutorService newFixedThreadPool(int nThreads) { // 方法参数 nThreads,线程池中固定存在的线程数量
    return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize 和 maximumPoolSize 均为 nThreads
                                  0L, TimeUnit.MILLISECONDS, // 存活时间为0ms
                                  new LinkedBlockingQueue<Runnable>()); // 使用无界的任务队列(LinkedBlockingQueue)
}

2.4.2 newSingleThreadExecutor 的实现

// 基本与 newFixedThreadPool 相同,只是限定了线程池中固定存在的线程数量为1
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

2.4.3 newCachedThreadPool 的实现

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // corePoolSize = 0; maximumPoolSize = Integer.MAX_VALUE;
                                  60L, TimeUnit.SECONDS, // keepAliveTime = 60s;
                                  new SynchronousQueue<Runnable>()); // 直接提交的队列(SynchronousQueue)
}

2.5 扩展线程池

在默认的ThreadPoolExecutor 实现中,提供了空的beforeExecute()afterExecute()terminated()方法。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。

  • beforeExecute():任务执行前被调用
  • afterExecute():任务执行后被调用
  • terminated():线程池退出前被调用
public class MyPool {

    public static void main(String[] args) {
        ExecutorService myPool = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(t.getName() + ": 任务执行前");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("任务执行结束");
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        myPool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "任务执行中...");
        });

        myPool.shutdown();
    }
}

在这里插入图片描述

2.6 分而治之(Fork & Join)

“分而治之”是一个非常有效地处理大量数据的方法。

利用JDK提供的 Fork & Join 框架,我们可以十分方便的将一个大任务分解为多个子任务。当所有的子任务都执行完成后,在收集它们各自的结果,从而得到最终的结果。

  • Fork & Join 框架中,任务数和线程数并不是一一对应的
  • 大多数情况下,一个物理线程需要执行多个逻辑任务
  • 每个线程都会有一个任务队列
  • 如果线程A执行结束了,而线程B还剩下许多任务没有执行,A就会从B中”拿来“一个任务进行处理,以尽可能有效的利用物理资源

使用示例:

// 利用fork&join,实现start到end的累加
public class TestForkJoinPool extends RecursiveTask<Long> {
    private static final int THRESHOLD = 30000; // 划分门槛
    private static final int GROUP_SIZE = 10; // 每次划分的组数
    private long start; // 计数起始值
    private long end; // 计数终止值

    public TestForkJoinPool(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;

        if (end - start < THRESHOLD) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 将任务划分位多个子任务
            long step = (end - start + 1) / GROUP_SIZE;
            ArrayList<TestForkJoinPool> subTasks = new ArrayList<>();
            long head = start;
            long tail;
            for (int i = 0; i < GROUP_SIZE; i++) {
                tail = head + step;
                if (tail > end) {
                    tail = end;
                }
                TestForkJoinPool subTask = new TestForkJoinPool(head, tail);
                head = tail + 1;
                subTasks.add(subTask);
                subTask.fork();
            }

            // 收集子任务的计算结果
            for (TestForkJoinPool subTask : subTasks) {
                sum += subTask.join();
            }
        }

        return sum;
    }

    public static void main(String[] args) {
        long startTime = 0;
        long endTime = 0;
        long result = 0;

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        startTime = System.currentTimeMillis();
        ForkJoinTask<Long> taskResult = forkJoinPool.submit(new TestForkJoinPool(0, 2000000000L));
        try {
            result = taskResult.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        endTime = System.currentTimeMillis();
        System.out.println("fork&join计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms");

        startTime = System.currentTimeMillis();
        result = 0;
        for (int i = 0; i <= 2000000000L; i++) {
            result += i;
        }
        endTime = System.currentTimeMillis();
        System.out.println("普通计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms");
    }
}

在这里插入图片描述

参考书籍: Java高并发程序设计(第二版)



这篇关于Java多线程(三):线程组与线程池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程