flink 1.10.1 java版本jdbc source从mysql读取数据
2022/2/9 19:29:05
本文主要是介绍flink 1.10.1 java版本jdbc source从mysql读取数据,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上实现通过jdbc从mysql读取数据。
1. 添加依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.18</version> </dependency>
由于测试环境是mysql8,所以这里选择的是mysql8.0.18版本的驱动。
2. 实体类
package com.demo.source; public class WordCount { private Long id; private String wordName; private Long wordcount; public WordCount() { } public WordCount(Long id, String wordName, Long wordcount) { this.id = id; this.wordName = wordName; this.wordcount = wordcount; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getWordName() { return wordName; } public void setWordName(String wordName) { this.wordName = wordName; } public Long getWordcount() { return wordcount; } public void setWordcount(Long wordcount) { this.wordcount = wordcount; } @Override public String toString() { return "WordCount{" + "id=" + id + ", wordName='" + wordName + '\'' + ", wordcount=" + wordcount + '}'; } }
实体类定义了一个无惨构造和一个全参数构造函数,重写了toString方法。
3. 定义mysql source
package com.demo.source; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; /** * */ public class MySqlSource extends RichSourceFunction<WordCount> { private PreparedStatement ps=null; private Connection connection=null; String driver = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"; String username = "username"; String password = "password"; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "select id, word_name, word_count from tbl_flink_wordcount;"; //获取执行语句 ps = connection.prepareStatement(sql); } @Override public void run(SourceContext<WordCount> sourceContext) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ WordCount student = new WordCount( resultSet.getLong("id"), resultSet.getString("word_name").trim(), resultSet.getLong("word_count")); sourceContext.collect(student);//发送结果 Thread.sleep(1000); } } @Override public void cancel() { } @Override public void close() throws Exception { super.close(); if(connection != null){ connection.close(); } if (ps != null){ ps.close(); } } //获取mysql连接配置 public Connection getConnection(){ try { // 加载驱动 // Class.forName(driver); //创建连接 connection = DriverManager.getConnection(url,username,password); } catch (Exception e) { System.out.println("********mysql get connection occur exception, msg = "+e.getMessage()); e.printStackTrace(); } return connection; } }
采用MySQL8驱动连接mysql8时,需要指定时区。
如果数据量大,还可以考虑分页读取等方式进行处理。
由于数据读取过程和数据处理过程是同时进行的,所以读取过程可以持续读取,也可以延时读取,一直处于读取过程中,也不会影响数据处理过程。
4. 测试代码
package com.demo.source; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * */ public class FlinkMySqlSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WordCount> dataStream = env.addSource(new MySqlSource()); // 指定并行度为1 dataStream.print().setParallelism(1); env.execute("Flink Mysql Source"); } }
5. 启动程序执行测试
可以看到从mysql读取的数据。
6. 参考表定义
CREATE TABLE `tbl_flink_wordcount` ( `id` int(11) NOT NULL AUTO_INCREMENT, `word_name` varchar(64) DEFAULT NULL, `word_count` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
这篇关于flink 1.10.1 java版本jdbc source从mysql读取数据的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15鸿蒙生态设备数量超8亿台
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?