java~并行计算~大集合的并行处理
2021/9/10 14:35:04
本文主要是介绍java~并行计算~大集合的并行处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
上一次写了关于《FunctionalInterface~一个批量处理数据的类》和《Future和Callable实现大任务的并行处理》的文章,本讲主要结合实际应用,来封装一个集合并行处理组件,我们的集合分为数据库查询出现的分页集合;还有一个是内存的集合,今天主要说一下内存集合的并行处理。
场景介绍
- 有一个比较耗时的工作,将top 400的用户的行为信息统计
- 统计的信息来自很多业务,很多服务,不能使用聚合直接计算
- 这些业务统计的时间,大概每个人平均需要1秒
- 这些用户的各种类型,彼此独立,没有关系
如何设计
如果直接顺序写代码,那1万的用户,需要400秒的时间,这是我们不能接受的,我们使用并行编程8秒就把它搞定。
如何实现
- 400的集合,进行拆分,每100个为一组,分为4组(4页)
- 对每100个集合进行拆分,每2个为1组,将100个分成了50组
- 对50组数据,开50个线程并行处理,结果为2行完成
- 400的信息,分成了4页,每页2秒,一共8秒
代码实现
/** * 数据集并行处理工具 */ public class DataHelper { /** * 并行处理线程数字 */ static final int THREAD_COUNT = 50; /** * 单线程中处理的集合的长度,50个线程,每个线程处理2条,如果处理时间为1S,则需要2S的时间. */ static final int INNER_LIST_LENGTH = 2; static Logger logger = LoggerFactory.getLogger(DataHelper.class); /** * 大集合拆分. * * @param list * @param len * @param <T> * @return */ private static <T> List<List<T>> splitList(List<T> list, int len) { if (list == null || list.size() == 0 || len < 1) { return null; } List<List<T>> result = new ArrayList<List<T>>(); int size = list.size(); int count = (size + len - 1) / len; for (int i = 0; i < count; i++) { List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1))); result.add(subList); } return result; } /** * 并行处理. * * @param list 大集合 * @param pageSize 单页数据大小 * @param consumer 处理程序 * @param <T> */ public static <T> void fillDataByPage(List<T> list, int pageSize, Consumer<T> consumer) { List<List<T>> innerList = new ArrayList<>(); splitList(list, pageSize).forEach(o -> innerList.add(o)); int totalPage = innerList.size(); AtomicInteger i = new AtomicInteger(); innerList.forEach(items -> { ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); i.getAndIncrement(); Collection<BufferInsert<T>> bufferInserts = new ArrayList<>(); splitList(items, INNER_LIST_LENGTH).forEach(o -> { bufferInserts.add(new BufferInsert(o, consumer)); }); try { executor.invokeAll(bufferInserts); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); logger.info("【当前数据页:{}/{}】", i.get(), totalPage); }); } /** * 多线程并发处理数据. * * @param <T> */ static class BufferInsert<T> implements Callable<Integer> { /** * 要处理的数据列表. */ List<T> items; /** * 处理程序. */ Consumer<T> consumer; public BufferInsert(List<T> items, Consumer<T> consumer) { this.items = items; this.consumer = consumer; } @Override public Integer call() { for (T item : items) { this.consumer.accept(item); } return 1; } } }
调用代码
/** * 8秒处理400个任务,每个任务执行时间为1S,并行的威力 */ @Test public void test() { List<Integer> sumList = new ArrayList<>(); for (int i = 0; i < 400; i++) { sumList.add(i); } StopWatch stopWatch = new StopWatch(); stopWatch.start(); DataHelper.fillDataByPage(sumList, 100, (o) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); stopWatch.stop(); System.out.println("time:" + stopWatch.getTotalTimeMillis()); }
结果截图
这篇关于java~并行计算~大集合的并行处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-22项目:远程温湿度检测系统
- 2024-12-21《鸿蒙HarmonyOS应用开发从入门到精通(第2版)》简介
- 2024-12-21后台管理系统开发教程:新手入门全指南
- 2024-12-21后台开发教程:新手入门及实战指南
- 2024-12-21后台综合解决方案教程:新手入门指南
- 2024-12-21接口模块封装教程:新手必备指南
- 2024-12-21请求动作封装教程:新手必看指南
- 2024-12-21RBAC的权限教程:从入门到实践
- 2024-12-21登录鉴权实战:新手入门教程
- 2024-12-21动态权限实战入门指南