使用 Reactor 进行反应式编程进行数据分批批量处理
2022/1/30 14:04:16
本文主要是介绍使用 Reactor 进行反应式编程进行数据分批批量处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、前言
最近在做一个项目,获取JDK8 Stream对象后,想要批量消费,不想自己写个集合来做批量处理。而反应式编程实现比如rxjava或者reactor是有丰富的流操作符,所以调研了下如何把JDK8 Stream转换为反应式流。
二、批量消费
有时候场景需要我们批量消费以便提高执行效率,比如对应同一个表的插入操作,批量插入的效率比单条逐个插入效率要好很多。那么对应给定的一个数据源,如何聚合数据为批量那?当数据源是一个内存list时候,最简单方法如下:
public static void main(String[] args) { // 模拟数据, 创建list List<Integer> personList = new ArrayList<>(); for (int i = 0; i < 98; ++i) { personList.add(i); } // 切分处理 List<List<Integer>> list = Lists.partition(personList, 20); list.stream().forEach( tempList -> System.out.println(JSON.toJSONString(tempList)) ); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
使用Google guava包里面的Lists.partition函数把list切分为一个个最多包含20个元素的list列表,并打印输出。
如果我们想要的是从这些流中每次读取limit条记录,然后批量处理这limit条记录,这样内存中每次只会存在limit条记录。这时由于JDK Stream不支持Buffer操作,我们需要自己实现,实现代码大概如下:
public static void main(String[] args) { // 模拟数据, 创建list List<Integer> personList = new ArrayList<>(); for (int i = 0; i < 98; ++i) { personList.add(i); } // 缓存列表 List<Integer> mergeList = new ArrayList<>(); int limit = 20; // 循环获取元素并缓存 personList.stream().forEach(e -> { if (mergeList.size() >= limit) { System.out.println(JSON.toJSONString(mergeList)); mergeList.clear(); } mergeList.add(e); }); // 退出后,补漏处理 if (mergeList.size() > 0) { System.out.println(JSON.toJSONString(mergeList)); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
如上代码在Stream中迭代元素时,我们把元素缓存到mergeList列表,每当mergeList有了20个元素,则处理一次。最后等流结束后,如果mergeList还有元素则需要补漏处理下。
如果不想实现上面繁琐代码,我们可以考虑吧JDK8 Stream切换到反应式实现框架比如Reactor或者Rxjava,因为后者有丰富的流操作符。其中Reactor的一个实现是:
public static void main(String[] args) { // 模拟数据, 创建list List<Integer> personList = new ArrayList<>(); for (int i = 0; i < 98; ++i) { personList.add(i); } // 为了使用buffer功能,转换为Reactor的流对象Flux Flux flux = Flux.fromStream(personList.stream()); // 聚合消费 flux.buffer(20).subscribe(e -> System.out.println(JSON.toJSONString(e))); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
如上代码,我们使用Reactor框架的Flux.fromStream方法把JDKStream转换为Flux流对象,然后调用其buffer方法设置缓存20个元素消费一次,然后调用subscribe订阅缓存流,并打印。
这篇关于使用 Reactor 进行反应式编程进行数据分批批量处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16Vue3资料:新手入门必读教程
- 2024-11-16Vue3资料:新手入门全面指南
- 2024-11-16Vue资料:新手入门完全指南
- 2024-11-16Vue项目实战:新手入门指南
- 2024-11-16React Hooks之useEffect案例详解
- 2024-11-16useRef案例详解:React中的useRef使用教程
- 2024-11-16React Hooks之useState案例详解
- 2024-11-16Vue入门指南:从零开始搭建第一个Vue项目
- 2024-11-16Vue3学习:新手入门教程与实践指南
- 2024-11-16Vue3学习:从入门到初级实战教程