flink java旁路输出(Side Output),对原始流进行分流、复制
2021/10/5 17:12:36
本文主要是介绍flink java旁路输出(Side Output),对原始流进行分流、复制,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
flink通过ProcessFunction
来分流,可以将一份流进行拆分、复制等操作,比如下面的代码通过读取一个基本的文本流,将流分别做处理后进行输出:
案例代码
package wordcount; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; public class manyOutWordCount { public static void main(String[] args) throws Exception { // 1.创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.从文件中读取数据 DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt"); // 执行环境并行度设置3 env.setParallelism(3); // 3.按照空格分词,流的类型是new Tuple2<>(wordLine, 1) DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] wordString = value.split(" "); for (String wordLine : wordString) { out.collect(new Tuple2<>(wordLine, 1)); } } }); //旁路输出,拆分流 final OutputTag<Tuple2<String, Integer>> sideStream = new OutputTag<Tuple2<String, Integer>>("te") { }; SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = sensorStream.process(new ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { out.collect(new Tuple2<>(value.f0, 2)); // 这里把 mainDataStream 的输出变为 Tuple(单词,2) ctx.output(sideStream, value); // 这里把 sideStream 的输出变为 Tuple(单词,1) } }); DataStream<Tuple2<String, Integer>> sideOutput = mainDataStream.getSideOutput(sideStream);//获取sideOutput的数据 sideOutput.print(); mainDataStream.print(); //执行 env.execute(); } }
其中数据hello.txt
的文件内容是:
hello world hello flink hello spark When we have shuffled off this mortal coil When we have shuffled off this mortal coil ack hello world hello flink hello spark When we have shuffled off this mortal coil When we have shuffled off this mortal coil ack
这篇关于flink java旁路输出(Side Output),对原始流进行分流、复制的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-02Java管理系统项目实战入门教程
- 2024-11-02Java监控系统项目实战教程
- 2024-11-02Java就业项目项目实战:从入门到初级工程师的必备技能
- 2024-11-02Java全端项目实战入门教程
- 2024-11-02Java全栈项目实战:从入门到初级应用
- 2024-11-02Java日志系统项目实战:初学者完全指南
- 2024-11-02Java微服务系统项目实战入门教程
- 2024-11-02Java微服务项目实战:新手入门指南
- 2024-11-02Java项目实战:新手入门教程
- 2024-11-02Java小程序项目实战:从入门到简单应用