你还在用递归遍历目录吗?ForkJoin线程池代替传统递归遍历目录!大数据场景大量小文件磁盘IO优化方案
2022/10/13 4:24:01
本文主要是介绍你还在用递归遍历目录吗?ForkJoin线程池代替传统递归遍历目录!大数据场景大量小文件磁盘IO优化方案,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
本文主要总结下工作中遇到的forkjoin线程池的使用场景,对大量小文件的磁盘IO优化
工作使用场景描述
公司大数据部服务器存有海量非结构化数据,文件格式是xml,量级千万级,需要编写java程序解析xml,单个文件只有几百KB到几MB ,j使用传统的递归逻辑遍历磁盘目录时文件处理速度非常慢,后选择使用forkjoin进行了速度上的优化。
ForkJoin线程池的概念
关于forkjoin线程池的概念,此处不再一一赘述,其他博客已经写的很清楚了,主要是任务的拆分,即大的任务会被划分成小的任务,还有是工作窃取。
直接上干货:ForkJoin和传统递归的运行结果对比
现在分别用递归和forkjoin对目录 C:/Windows遍历,文件操作为打印路径,对比两种方法的结果。
传统递归代码
import java.io.File; import java.util.concurrent.atomic.AtomicLong; public class DiguiTest { private static AtomicLong counter = new AtomicLong(0L); public static void main(String[] args) { long start = System.currentTimeMillis(); digui(new File("C:/Windows")); System.out.println("花费:"+(System.currentTimeMillis()-start)/1000.0+"秒"); } private static void digui(File file) { if (file != null) { if (file.isDirectory()) { File[] files = file.listFiles(); if (files!=null) for (File son :files) { digui(son); } } else { System.out.println(counter.incrementAndGet()+"==>"+file.getAbsolutePath()); } } } }
ForkJoin代码
import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class Test { private static AtomicLong counter = new AtomicLong(0L); public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkjoinPool = new ForkJoinPool(6); File file = new File("C:/Windows"); MyTask task = new MyTask(file); forkjoinPool.invoke(task); // 停止接收任务 forkjoinPool.shutdown(); //等待任务执行结束 forkjoinPool.awaitTermination(2, TimeUnit.SECONDS); System.out.println("花费:" + (System.currentTimeMillis() - start) / 1000.0 + "秒"); } public static class MyTask extends RecursiveAction { private File path; // 当前要搜索的目录 public MyTask(File path) { this.path = path; } @Override protected void compute() { try { // 定义一个文件目录集合 List<MyTask> subTasks = new ArrayList<>(); // 根据当前要搜索目录的,找到所有的文件 File[] files = path.listFiles(); // 判断是否为空,不为空则继续往下搜索 if (null != files) { for (File file : files) { // 判断是否是目录 if (file.isDirectory()) { subTasks.add(new MyTask(file)); // System.out.println(" 目录: " + file.getAbsolutePath()); } else { // 判断不是目录,则是文件 // todo 处理文件 此处打印 System.out.println(counter.incrementAndGet() + "==>" + file.getAbsolutePath()); } } // 判断集合是否为空 if (null != subTasks && subTasks.size() > 0) { for (MyTask subTask : invokeAll(subTasks)) { // 等待子任务完成 subTask.join(); } } } } catch (Exception e) { e.printStackTrace(); } } } }
两者结果对比
运行时间对比
forkjoin运行结果:
传统递归运行结果
磁盘IO对比
forkjoin运行IO:
传统递归运行IO:
结论
可见forkjoin线程池在处理大量小文件时,遍历目录比传统递归要快很多.
注意: forkjoin线程池的线程数并不是越大速度越快。
ForkJoin有返回值
上述的forkjoin示例是无返回,只需要打印下文件路径,并不需要子目录对上层目录传递计算结果。
新的需求: 遍历目录 C:/Windows, 将所有文件路径保存到mysql的一张表中,为了提高写入效率,每1000条路径作为一个批次,写入mysql。
ForkJoin有返回值代码
RecursiveTask后的泛型是返回值类型
import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class Main { private static AtomicLong counter = new AtomicLong(0L); public static void main(String[] args) throws InterruptedException { ForkJoinPool forkjoinPool = new ForkJoinPool(8); File file = new File("C:/Windows"); Task task = new Task(file); List<String> list = forkjoinPool.invoke(task); forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束 // 关闭线程池 forkjoinPool.shutdown(); // todo 缓存中不足1000条的剩余数据处理, 此处写入mysql逻辑省略 System.out.println(counter.addAndGet(list.size())); } private static class Task extends RecursiveTask<List<String>> { private File file; public Task(File file) { this.file = file; } @Override protected List<String> compute() { List<String> buffer = new ArrayList<>(); if (file != null) { if (file.isDirectory()) { File[] sons = file.listFiles(); if (sons != null && sons.length > 0) { for (File son : sons) { Task task = new Task(son); invokeAll(task); List<String> join = task.join(); buffer.addAll(join); if (buffer.size() > 1000) { // todo 缓存批处理, 写入mysql逻辑省略 System.out.println(counter.addAndGet(buffer.size())); buffer.clear(); } } } } else { buffer.add(file.getAbsolutePath()); // counter.incrementAndGet(); } } return buffer; } } }
运行结果
可见,ForkJoin有返回值的批处理和无返回值的处理数据条数相同。
这篇关于你还在用递归遍历目录吗?ForkJoin线程池代替传统递归遍历目录!大数据场景大量小文件磁盘IO优化方案的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-14使用AWS Lambda和S3打造智能文件整理器 - (动手搭建系列)
- 2024-11-14Netflix简化营收基础设施中的合同管理工具
- 2024-11-142024年必备的6款开源Terraform神器
- 2024-11-14Spin 3.0来啦:全新功能让你的无服务器Wasm应用开发更上一层楼
- 2024-11-14如何高效管理项目?小团队到大企业的多功能项目管理工具推荐
- 2024-11-1333 张高清大图,带你玩转 KubeSphere 4.1.2 部署与扩展组件安装
- 2024-11-11Spark 新作《循序渐进 Spark 大数据应用开发》简介
- 2024-11-11KubeSphere 社区双周报| 2024.10.25-11.07
- 2024-11-11云原生周刊:Istio 1.24.0 正式发布
- 2024-11-10一个故事,为你理清云开发服务的选择思路