Flink写入数据到MySQL案例
2022/2/22 19:23:57
本文主要是介绍Flink写入数据到MySQL案例,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
案例准备:
1、启动MySQL,在mysql中创建数据库flinkdb,并创建表sensor_temp
CREATE TABLE sensor_temp ( id varchar(32), temp double )
代码实现:
def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource) dataStream.addSink(new MyJdbcSinkFunction()) env.execute() } class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{ var connection: Connection =_ var insertStmt: PreparedStatement=_ var updateStmt: PreparedStatement=_ override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1,value.temperature) updateStmt.setString(2,value.id) updateStmt.execute() if(updateStmt.getUpdateCount == 0){ insertStmt.setString(1,value.id) insertStmt.setDouble(2,value.temperature) insertStmt.execute() } } override def open(parameters: Configuration): Unit = { connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123") insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)") updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?") } override def close(): Unit = { insertStmt.close() updateStmt.close() connection.close() }
运行结果:
查询数据select * from sensor_temp;
这篇关于Flink写入数据到MySQL案例的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-25如何部署MySQL集群资料:新手入门教程
- 2024-12-24MySQL集群部署资料:新手入门教程
- 2024-12-24MySQL集群资料详解:新手入门教程
- 2024-12-24MySQL集群部署入门教程
- 2024-12-24部署MySQL集群学习:新手入门教程
- 2024-12-24部署MySQL集群入门:一步一步搭建指南
- 2024-12-07MySQL读写分离入门:轻松掌握数据库读写分离技术
- 2024-12-07MySQL读写分离入门教程
- 2024-12-07MySQL分库分表入门详解
- 2024-12-07MySQL分库分表入门指南