FLink18--全窗口聚合方式2 ProcessWindowApp
2022/3/27 23:26:21
本文主要是介绍FLink18--全窗口聚合方式2 ProcessWindowApp,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
一、依赖
二、代码
package net.xdclass.class11; import java.util.List; import java.util.stream.Collectors; import org.apache.commons.collections.IteratorUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; /** * 推荐本方法 * 全量聚合方法2 process(new ProcessWindowFunction(){}) * !!!WindowFunction后面可能废弃,用processWindowFunction更好,有打开关闭功能 * 全窗口函数,自定义窗口计算,适用于复杂场景 * @desc 窗口计算,全窗口函数,可以拿到整个窗口的数据做计算 * @menu */ public class FLink18ProcessWindowApp { public static void main(String[] args) throws Exception{ //WebUi方式运行 // final StreamExecutionEnvironment env = // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置运行模式为流批一体 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //并行度 env.setParallelism(1); //设置为自定义source // DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDs = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder videoOrder) throws Exception { return videoOrder.getTitle(); } }); //全窗口函数,可以拿到整个窗口的数据做计算 SingleOutputStreamOperator<VideoOrder> sumAllWindowDs = keyByDs .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<VideoOrder> iterable, Collector<VideoOrder> output) throws Exception { List<VideoOrder> list = IteratorUtils.toList(iterable.iterator()); if (list.size() <= 0) { return; } int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue(); //新建一个返回结果对象,数据聚合后发送出去 VideoOrder videoOrder = new VideoOrder(); videoOrder.setMoney(total); videoOrder.setTitle(list.get(0).getTitle()); videoOrder.setCreateTime(list.get(0).getCreateTime()); //获取窗口开始结束时间,还可以获取很多信息 System.out.println("窗口开始时间"+context.window().getStart()+"窗口结束时间"+context.window().getEnd()); output.collect(videoOrder); } }); sumAllWindowDs.print(); //DataStream需要调用execute,可以取个名称 env.execute("Sailing Window job"); } }
这篇关于FLink18--全窗口聚合方式2 ProcessWindowApp的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享