FlinkMysqlSource
2021/10/30 19:11:31
本文主要是介绍FlinkMysqlSource,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
/**
* 自定义Mysql Source
*/
public class CustomerMysqlSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获得自定义Source对象
DataStreamSource<UserInfo> mysqlSource = env.addSource(new MyMysqlSource());
mysqlSource.print();
env.execute("CustomerMySQLSourceDemo");
}
/**
自定义Mysql Source实现类
*/
public static class MyMysqlSource extends RichSourceFunction<UserInfo> {
private Connection connection = null; // 定义数据库连接对象
private PreparedStatement ps = null; // 定义PreparedStatement对象
/*
使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver");
// 创建数据库连接
String url = "jdbc:mysql://node01:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";
this.connection = DriverManager.getConnection(url, "root", "123456");
// 准备PreparedStatement对象
this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");
}
/*
使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接
*/
@Override
public void close() throws Exception {
super.close();
// 关闭资源
if (this.ps != null) this.ps.close();
if (this.connection != null) this.connection.close();
}
@Override
public void run(SourceContext<UserInfo> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
int id = resultSet.getInt("id");
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String name = resultSet.getString("name");
ctx.collect(new UserInfo(id, username, password, name));
}
}
@Override
public void cancel() {
System.out.println("任务被取消......");
}
}
/**
数据定义类, POJO
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class UserInfo {
int id;
String username;
String password;
String name;
}
}
这篇关于FlinkMysqlSource的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-02MySQL 3主集群搭建
- 2024-12-25如何部署MySQL集群资料:新手入门教程
- 2024-12-24MySQL集群部署资料:新手入门教程
- 2024-12-24MySQL集群资料详解:新手入门教程
- 2024-12-24MySQL集群部署入门教程
- 2024-12-24部署MySQL集群学习:新手入门教程
- 2024-12-24部署MySQL集群入门:一步一步搭建指南
- 2024-12-07MySQL读写分离入门:轻松掌握数据库读写分离技术
- 2024-12-07MySQL读写分离入门教程
- 2024-12-07MySQL分库分表入门详解