java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join
2021/7/22 14:08:13
本文主要是介绍java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
UNION介绍
DataStream.union()方法将两条或者多条DataStream合并成一条具有与输入流相同类型的输出DataStream.
事件合流的方式为FIFO方式。操作符并不会产生一个特定顺序的事件流。union操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。
说明:
1.union 合并的流的元素必须是相同的
2.union 可以合并多条流
3.union不去重,合流顺序为先进先出
具体用法:
DataStream<SensorReading> parisStream = ...
DataStream<SensorReading> tokyoStream = ...
DataStream<SensorReading> rioStream = ...
DataStream<SensorReading> allCities = parisStream
.union(tokyoStream, rioStream)
CONNECT
CONNECT也是用来合并多个数据流的,它和UNION的功能类似,区别在于:
connect只能连接两个数据流,union可以连接多个数据流。
connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上。
具体用法:
合并流:
// first stream
DataStream<Integer> first = ...
// second stream
DataStream<String> second = ...
// connect streams
ConnectedStreams<Integer, String> connected = first.connect(second);
两种keyby后合并
DataStream<Tuple2<Integer, Long>> one = ...
DataStream<Tuple2<Integer, String>> two = ...
// keyBy two connected streams
ConnectedStreams<Tuple2<Int, Long>, Tuple2<Integer, String>> keyedConnect1 = one
.connect(two)
.keyBy(0, 0); // key both input streams on first attribute
// alternative: connect two keyed streams
ConnectedStreams<Tuple2<Integer, Long>, Tuple2<Integer, String>> keyedConnect2 = one
.keyBy(0)
.connect(two.keyBy(0));
CoGroup:
该操作是将两个数据流/集合按照key进行group,然后将相同key的数据进行处理,但是它和join操作稍有区别,它在一个流/数据集中没有找到与另一个匹配的数据还是会输出。
import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.util.Collector; import java.util.Random; import java.util.concurrent.TimeUnit; public class CoGroupMain { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final Random random = new Random(); DataStreamSource<Tuple2<String, String>> source1 = env.addSource(new RichSourceFunction<Tuple2<String, String>>() { boolean isRunning = true; String[] s1 = {"1,a", "2,b", "3,c", "4,d", "5,e"}; public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { int size = s1.length; while (isRunning) { TimeUnit.SECONDS.sleep(1); String[] s = s1[random.nextInt(size)].split(","); Tuple2 t = new Tuple2(); t.f0 = s[0]; t.f1 = s[1]; ctx.collect(t); } } public void cancel() { isRunning = false; } }); DataStreamSource<Tuple2<String, String>> source2 = env.addSource(new RichSourceFunction<Tuple2<String, String>>() { boolean isRunning = true; String[] s1 = {"1,a", "2,b", "3,c", "4,d", "5,e", "6,f", "7,g", "8,h"}; public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { int size = s1.length; while (isRunning) { TimeUnit.SECONDS.sleep(3); String[] s = s1[random.nextInt(size)].split(","); Tuple2 t = new Tuple2(); t.f0 = s[0]; t.f1 = s[1]; ctx.collect(t); } } public void cancel() { isRunning = false; } }); source1.coGroup(source2) .where(new KeySelector<Tuple2<String, String>, Object>() { public Object getKey(Tuple2<String, String> value) throws Exception { return value.f0; } }).equalTo(new KeySelector<Tuple2<String, String>, Object>() { public Object getKey(Tuple2<String, String> value) throws Exception { return value.f0; } }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))) .trigger(CountTrigger.of(1)) .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, Object>() { public void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple2<String, String>> second, Collector<Object> out) throws Exception { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("DataStream first:\n"); for (Tuple2<String, String> value : first) { stringBuffer.append(value.f0 + "=>" + value.f1 + "\n"); } stringBuffer.append("DataStream second:\n"); for (Tuple2<String, String> value : second) { stringBuffer.append(value.f0 + "=>" + value.f1 + "\n"); } out.collect(stringBuffer.toString()); } }).print(); env.execute(); } }
Join
flink中常见的join有四个:
- Tumbling Window Join
- Sliding Window Join
- Session Window Join
- Interval Join
Join的编程模型为:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
Tumbling Window Join的实例:
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; public class TumblingMain { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple2<String, String>> source1 = env.socketTextStream("192.168.6.23", 9022) .map(new MapFunction<String, Tuple2<String, String>>() { public Tuple2<String, String> map(String value) throws Exception { return Tuple2.of(value.split(" ")[0], value.split(" ")[1]); } }); DataStream<Tuple2<String, String>> source2 = env.socketTextStream("192.168.6.23", 9023) .map(new MapFunction<String, Tuple2<String, String>>() { public Tuple2<String, String> map(String value) throws Exception { return Tuple2.of(value.split(" ")[0], value.split(" ")[1]); } }); source1.join(source2) .where(new KeySelector<Tuple2<String, String>, Object>() { public Object getKey(Tuple2<String, String> value) throws Exception { return value.f0; } }) .equalTo(new KeySelector<Tuple2<String, String>, Object>() { public Object getKey(Tuple2<String, String> value) throws Exception { return value.f0; } }) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .trigger(CountTrigger.of(1)) .apply(new JoinFunction<Tuple2<String, String>, Tuple2<String, String>, Object>() { public Object join(Tuple2<String, String> first, Tuple2<String, String> second) throws Exception { if (first.f0.equals(second.f0)) { return first.f1 + " " + second.f1; } return null; } }).print(); env.execute(); } }
Interval Join
Interval Join会将两个数据流按照相同的key,并且在其中一个流的时间范围内的数据进行join处理。通常用于把一定时间范围内相关的分组数据拉成一个宽表。我们通常可以用类似下面的表达式来使用interval Join来处理两个数据流
Interval Join变成模型:
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
这篇关于java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24怎么修改Kafka的JVM参数?-icode9专业技术文章分享
- 2024-12-23线下车企门店如何实现线上线下融合?
- 2024-12-23鸿蒙Next ArkTS编程规范总结
- 2024-12-23物流团队冬至高效运转,哪款办公软件可助力风险评估?
- 2024-12-23优化库存,提升效率:医药企业如何借助看板软件实现仓库智能化
- 2024-12-23项目管理零负担!轻量化看板工具如何助力团队协作
- 2024-12-23电商活动复盘,为何是团队成长的核心环节?
- 2024-12-23鸿蒙Next ArkTS高性能编程实战
- 2024-12-23数据驱动:电商复盘从基础到进阶!
- 2024-12-23从数据到客户:跨境电商如何通过销售跟踪工具提升营销精准度?