FlinkCDC Mysql到Kafka
2021/4/23 19:25:35
本文主要是介绍FlinkCDC Mysql到Kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
描述:
利用flink CDC 将读取mysql binlog 将数据从mysql抽离出来发送至kafka
1、pom.xml文件依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>FlinkSql</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>FlinkSqlTest</artifactId> <properties> <flink.version>1.11.3</flink.version> <scala.binary.version>2.11</scala.binary.version> <flink-cdc.version>1.1.0</flink-cdc.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <!-- <scope>test</scope>--> </dependency> <!-- <dependency>--> <!-- <groupId>mysql</groupId>--> <!-- <artifactId>mysql-connector-java</artifactId>--> <!-- <version>8.0.16</version>--> <!-- </dependency>--> <dependency> <groupId>com.alibaba.ververica</groupId> <!-- add the dependency matching your database --> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink-cdc.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> <scope> compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.11.1</version> </dependency> <!-- old planner flink table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!--new planner--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-format-changelog-json</artifactId> <version>1.0.0</version> </dependency> <!-- 日志相关依赖,flink必须要加,否则报错 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.11</scalaCompatVersion> <scalaVersion>2.11.12</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置jar包的入口类(可选) --> <mainClass>test.SqlJoinKafka10Test</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2、代码
package test import java.util.concurrent.TimeUnit import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.{EnvironmentSettings, TableResult} /** * @program: demo * @description: 从kafka0.10读取数据,sink到es或者kafka * @author: yang * @create: 2021-01-15 11:48 */ object SqlJoinMysqlKafkaCDCTest { def main(args: Array[String]): Unit = { //1、环境准备 val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setStateBackend(new FsStateBackend("hdfs://locahost:8020/flink/mysql/checkpoints")) env.setStateBackend(new FsStateBackend("file:///D://tmp//flink/mysql")) //env.setStateBackend(new FsStateBackend("file:///root/flink1.11/flink-1.11.3/job/checkpoint")) env.enableCheckpointing(8000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) //读取mysql表数据1 val createMysqlUserTabel = """ |CREATE TABLE `user` ( | id INT, | name STRING, | create_time TIMESTAMP(3) |) WITH ( | 'connector' = 'mysql-cdc', | 'hostname' = 'hostname', | 'port' = '3306', | 'username' = 'root', | 'password' = 'localhost', | 'database-name' = 'test', | 'table-name' = 'user' |) """.stripMargin //读取mysql表数据2 val createMysqlWeblogTabel = """ |CREATE TABLE user_info ( | id INT, | username STRING, | password STRING |) WITH ( | 'connector' = 'mysql-cdc', | 'hostname' = 'localhost', | 'port' = '3306', | 'username' = 'root', | 'password' = 'root', | 'database-name' = 'test', | 'table-name' = 'user_info' |) """.stripMargin val createMysqlSinkTabel = """ |CREATE TABLE user_all ( | id INT, | name STRING, | create_time TIMESTAMP(3), | bid INT, | username STRING, | password STRING, | primary key(id) not ENFORCED |) WITH ( | 'connector' = 'kafka-0.10', | 'topic' = 'user_all', | 'scan.startup.mode' = 'earliest-offset', | 'properties.bootstrap.servers' = 'localhost:9092', | 'format' = 'changelog-json' |) """.stripMargin val unoinSql = """ |insert into user_all | select u.id,u.name,u.create_time,ui.bid,ui.username,ui.password | from (select id,name,create_time from `user`) as u | left JOIN (select id as bid,username,password from user_info) as ui | on u.id = ui.bid """.stripMargin tableEnv.executeSql(createMysqlUserTabel) tableEnv.executeSql(createMysqlWeblogTabel) tableEnv.executeSql(createMysqlSinkTabel) val result: TableResult = tableEnv.executeSql(unoinSql) result.print() } }
3、创建topic
kafka-topics --create --zookeeper h1:2181,h2:2181,h3:2181 --replication-factor 1 --partitions 1 --topic user_all
4、消费user_all topic,并查询数据
kafka-console-consumer --zookeeper h1:2181,h2:2181,h3:2181 --from-beginning --topic user_all
{"data":{"id":3,"name":"1","create_time":"2021-02-23 10:55:37","bid":3,"username":"1","password":"333"},"op":"-D"} {"data":{"id":3,"name":"name3","create_time":"2021-02-23 10:55:37","bid":3,"username":"1","password":"333"},"op":"+I"} {"data":{"id":4,"name":"1","create_time":"2021-02-23 14:56:55","bid":4,"username":"1","password":"4"},"op":"-D"} {"data":{"id":4,"name":"name4","create_time":"2021-02-23 14:56:55","bid":4,"username":"1","password":"4"},"op":"+I"}
这篇关于FlinkCDC Mysql到Kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-23BinLog入门:新手必读的MySQL二进制日志指南
- 2024-10-23Binlog入门:MySQL数据库的日志管理指南
- 2024-10-22MySQL数据库入门教程:从安装到基本操作
- 2024-10-22MySQL读写分离入门教程:轻松实现数据库性能提升
- 2024-10-22MySQL分库分表入门教程
- 2024-10-22MySQL慢查询的诊断与优化指南
- 2024-10-22MySQL索引入门教程:快速理解与应用指南
- 2024-10-22MySQL基础入门教程:从安装到基本操作
- 2024-10-22MySQL数据库中的Binlog详解与操作教程
- 2024-10-12部署MySQL集群项目实战:新手入门教程