flink-sql解析canal-json实现实时同步
2021/7/12 19:08:41
本文主要是介绍flink-sql解析canal-json实现实时同步,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.lezhi.business.dxxbs.transmission.table import com.lezhi.common.{CommonTransmissonFunciton, SystemParams} import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ object user_login { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bnv = StreamTableEnvironment.create(bsEnv, bsSettings) val table_name="user_login" val primaryKey="USER_ID" val table_column= """ |USER_ID STRING, |USER_PHONE STRING, |USER_PWD STRING, |CREAT_TIME STRING, |UPLOAD_TIME STRING, |UNION_ID STRING, |OPEN_ID STRING |""".stripMargin val sql_source_table="CREATE TABLE source_table_"+table_name+" (" + table_column+ ") WITH (" + "'connector' = 'kafka'," + //连接类型为kafka "'topic' = '"+SystemParams.TOPIC+"'," + //kafka topic名称 "'properties.bootstrap.servers' = '"+SystemParams.BOOTSTRAP_SERVER+"'," + //kafka bootstrap.servers配置 "'scan.startup.mode' = 'earliest-offset'," + // topic消费位置设置 "'format' = 'canal-json'," + //数据格式配置 "'canal-json.ignore-parse-errors' = 'true'," + //当解析异常时,忽略字段的解析异常,则会将该字段值设置为null。 "'canal-json.table.include' ='"+table_name+"')" bnv.executeSql(sql_source_table) // bnv.executeSql("select * from source_table_"+table_name).print() val sql_result_table ="CREATE TABLE sink_table_"+table_name+" (" + table_column+ ",PRIMARY KEY ("+primaryKey+") NOT ENFORCED" + ") WITH (" + "'connector' = 'jdbc'," + //连接类型为jdbc "'url' = '"+SystemParams.JDBC_URL_BYMM+"'," + //he JDBC database url. "'table-name' = '"+table_name+"'," + //连接的表名 " 'username' ='"+SystemParams.JDBC_USERNAME+"',"+ //连接数据库用户名 " 'password' ='"+SystemParams.JDBC_PASSWORD+"')" println(sql_result_table) bnv.executeSql(sql_result_table) bnv.executeSql("INSERT INTO sink_table_"+table_name+" SELECT * FROM source_table_"+table_name) bnv.execute(table_name) } }
注意,下沉时下沉的表必须要有主键,否则会在更新数据时,旧数据和新数据会同时存在
这篇关于flink-sql解析canal-json实现实时同步的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-15useCallback教程:React Hook入门与实践
- 2024-11-15React中使用useContext开发:初学者指南
- 2024-11-15拖拽排序js案例详解:新手入门教程
- 2024-11-15React中的自定义Hooks案例详解
- 2024-11-14受控组件项目实战:从零开始打造你的第一个React项目
- 2024-11-14React中useEffect开发入门教程
- 2024-11-14React中的useMemo教程:从入门到实践
- 2024-11-14useReducer开发入门教程:轻松掌握React中的useReducer
- 2024-11-14useRef开发入门教程:轻松掌握React中的useRef用法
- 2024-11-14useState开发:React中的状态管理入门教程