Flink项目4 双流connect项目
2021/10/24 23:40:16
本文主要是介绍Flink项目4 双流connect项目,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1、一个是订单流,一个是对账流
定时器螫不区分key的,是项目视角的
package flinkProject import java.text.SimpleDateFormat import flinkSourse.SensorReading import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector case class ReceiptEvent(txid:String,payChannel:String,timestamp:Long) case class OrderEvent(txid:String,payChannel:String,timestamp:Long) object TxConnectedMatch { def main(args: Array[String]): Unit = { val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment executionEnvironment.setParallelism(1) executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms val stream1: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111) val receiptDataStream: DataStream[ReceiptEvent] = stream1.map(data => { val tmpList = data.split(" ") val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss") val ts = simpleDateFormat.parse(tmpList(2)).getTime ReceiptEvent(tmpList(0), tmpList(1), ts) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(0)) { override def extractTimestamp(t: ReceiptEvent) = t.timestamp }) val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 2222) val orderStram: DataStream[OrderEvent] = stream2.map(data => { val tmpList = data.split(" ") val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss") val ts = simpleDateFormat.parse(tmpList(2)).getTime OrderEvent(tmpList(0), tmpList(1), ts) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(0)) { override def extractTimestamp(t: OrderEvent) = t.timestamp }) val result: DataStream[(ReceiptEvent, OrderEvent)] = receiptDataStream.connect(orderStram) .keyBy((receipt => receipt.txid), (order => order.txid)) .process(new ConnectedCoProcessFunction()) result.print("result") result.getSideOutput(new OutputTag[OrderEvent]("order_output_tag")).print("order_output_tag") result.getSideOutput(new OutputTag[ReceiptEvent]("receipt_output_tag")).print("receipt_output_tag ") executionEnvironment.execute("connected Stream") } } class ConnectedCoProcessFunction extends CoProcessFunction[ReceiptEvent,OrderEvent,(ReceiptEvent,OrderEvent)] { var receiptValueState:ValueState[ReceiptEvent]=_ var orderValueState:ValueState[OrderEvent]=_ override def open(parameters: Configuration): Unit = { receiptValueState=getRuntimeContext.getState[ReceiptEvent](new ValueStateDescriptor[ReceiptEvent]("receipt",classOf[ReceiptEvent])) orderValueState=getRuntimeContext.getState[OrderEvent](new ValueStateDescriptor[OrderEvent]("order",classOf[OrderEvent])) } override def processElement1(in1: ReceiptEvent, context: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = { var order=orderValueState.value() //订单先来 if(order!=null){ collector.collect((in1,order)) orderValueState.clear() }else{ receiptValueState.update(in1) context.timerService().registerEventTimeTimer(in1.timestamp+3000l) } } override def processElement2(in2: OrderEvent, context: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = { var receipt=receiptValueState.value() //receipt先来 if(receipt!=null){ collector.collect(receipt,in2) receiptValueState.clear() }else{ orderValueState.update(in2) context.timerService().registerEventTimeTimer(in2.timestamp+3000l) } } override def onTimer(timestamp: Long, ctx: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#OnTimerContext, out: Collector[(ReceiptEvent, OrderEvent)]): Unit = { if(receiptValueState.value()!=null){ ctx.output(new OutputTag[ReceiptEvent]("receipt_output_tag"),receiptValueState.value()) } if(orderValueState.value()!=null){ ctx.output(new OutputTag[OrderEvent]("order_output_tag"),orderValueState.value() ) } receiptValueState.clear() orderValueState.clear() } }
2、输入数据
正常的只要两个流有匹配的txId就会输出,不管延迟多长时间
定时器是不区分key的,是项目视角的
只有一个流里面有的时候,定时器延迟3s,每个流根据自己的watermark,如下
流1输入:4 404 17/05/2015:10:26:45 不会有输出
流1输入:5 404 17/05/2015:10:26:47 不会有输出,
流1输入:7 404 17/05/2015:10:26:49
输出:receipt_output_tag > ReceiptEvent(4,404,1421461605000)
流1输入:9 404 17/05/2015:10:26:59 watermark是10:26:59
输出:
receipt_output_tag > ReceiptEvent(5,404,1421461607000)
receipt_output_tag > ReceiptEvent(7,404,1421461609000)
流2输入:6 505 17/05/2015:10:26:55 不会有输出
流2输入:8 505 17/05/2015:10:26:56 不会有输出
流2输入:1 505 17/05/2015:10:27:01
输出:
order_output_tag> OrderEvent(6,505,1421461615000)
order_output_tag> OrderEvent(8,505,1421461616000)
这篇关于Flink项目4 双流connect项目的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-30uniAPP 实现全屏左右滚动滚动的效果-icode9专业技术文章分享
- 2024-06-30如何在本地使用授权或插件-icode9专业技术文章分享
- 2024-06-30伪静态规则配置方法汇总-icode9专业技术文章分享
- 2024-06-29易优CMS安装常见问题汇总-icode9专业技术文章分享
- 2024-06-28易优新手必读安装教程-icode9专业技术文章分享
- 2024-06-28忘记eyoucms后台密码怎么办?-icode9专业技术文章分享
- 2024-06-26终极指南:Scrum中如何设置需求优先级
- 2024-06-26AI大模型企业应用实战(25)-为Langchain Agent添加记忆功能
- 2024-06-26小白家庭 nas 搭建方案-icode9专业技术文章分享
- 2024-06-23AI大模型企业应用实战(14)-langchain的Embedding