Flink流处理-Source之Mysql
2021/11/8 2:09:46
本文主要是介绍Flink流处理-Source之Mysql,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
MysqlEletricFenceResultSource
package pers.aishuang.flink.streaming.source.mysql; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * 读取Mysql中电子围栏相关表结合成后的规则 */ public class MysqlElectricFenceResultSource extends RichSourceFunction { //新建日志打印器 private static final Logger logger = LoggerFactory.getLogger(MysqlElectricFenceResultSource.class); //定义JDBC变量 private static Connection conn = null; private static PreparedStatement pstmt = null; //设置标识用于记录当前循环读取mysql配置 private static Boolean flag = true; //定义获取配置文件参数工具 private static ParameterTool parameterTool = null; private static Map<String, String> parasMap = null; private static ParameterTool globalJobParameters = null; @Override public void open(Configuration parameters) throws Exception { //方式一:通过ParameterTool自己再获取配置文件参数 parameterTool = ParameterTool.fromPropertiesFile(MysqlElectricFenceResultSource.class .getClassLoader() .getResourceAsStream("conf.properties")); //-- 获取Driver、url、user、password String driver = parameterTool.getRequired("jdbc.driver"); String url = parameterTool.getRequired("jdbc.url"); String user = parameterTool.getRequired("jdbc.user"); String password = parameterTool.getRequired("jdbc.password"); //方式二:通过执行环境设置的全局任务参数里获取 参数 parasMap = getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters() .toMap(); String driver2 = parasMap.get("jdbc.driver"); String url2 = parasMap.get("jdbc.url"); String user2 = parasMap.get("jdbc.user"); String password2 = parasMap.get("jdbc.password"); //方式三:与方式二本质上一样 ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); String driver3 = globalJobParameters.getRequired("jdbc.driver"); String url3 = globalJobParameters.getRequired("jdbc.url"); String user3 = globalJobParameters.getRequired("jdbc.user"); String password3 = globalJobParameters.getRequired("jdbc.password"); //获取MySQL连接 //-- 加载驱动 Class.forName(driver); //-- 获取连接 conn = DriverManager.getConnection(url, user, password); //-- 执行SQL //查出 有进入时间 没有出去时间,按照vin分组,找到目前最小id(电子围栏结果表的主键id) String sql = "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin"; //-- 创建预编译对象 pstmt = conn.prepareStatement(sql); } @Override public void run(SourceContext ctx) throws Exception { while(flag) { HashMap<String, Integer> vehInfoMap = new HashMap<>(); ResultSet rs = pstmt.executeQuery(); while(rs.next()) { vehInfoMap.put(rs.getString("vin") , rs.getInt("id")); } if(vehInfoMap.isEmpty()){ logger.warn("从mysql中electronic_fence相关表的数据为空"); } else { ctx.collect(vehInfoMap); logger.info("查询电子围栏分析结果表中数据,存在记录数据为:%s 条",vehInfoMap.size()); } if(!rs.isClosed()) {rs.close();} //多久从mysql获取一次数据 //TimeUnit.MICROSECONDS.sleep(parameterTool.getLong("vehinfo.millionseconds")); //每1秒钟获取一次最新数据,因为窗口每隔90s进行一次计算,因此该时间一定要小于窗口触发计算的频率 TimeUnit.MICROSECONDS.sleep(1);//1ms } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { super.close(); if(!pstmt.isClosed()){pstmt.close();} if(!conn.isClosed()) {conn.close();} } }
MysqlElectricFenceSource
package pers.aishuang.flink.streaming.source.mysql; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.HashMap; import java.util.concurrent.TimeUnit; public class MySQLElectricFenceSource extends RichSourceFunction<HashMap<String, ElectricFenceResultTmp>> { private static final Logger logger = LoggerFactory.getLogger(MySQLElectricFenceSource.class.getSimpleName()); private static Connection conn = null; private static Statement stmt = null; //设置标识用于记录当前循环读取mysql配置 private static Boolean flag = true; private static String elerulesTime = null; @Override public void open(Configuration parameters) throws Exception { //1. 获取上下文中的 parameterTool ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); //2. 读取配置文件中参数,注册驱动、url、user、passworld String driver = globalJobParameters.getRequired("jdbc.driver"); String url = globalJobParameters.getRequired("jdbc.url"); String user = globalJobParameters.getRequired("jdbc.user"); String password = globalJobParameters.getRequired("jdbc.password"); //3. 多长时间去查一次mysql数据 elerulesTime = globalJobParameters.getRequired("elerules.millionseconds"); //4. 设置驱动和连接 Class.forName(driver); conn = DriverManager.getConnection(url,user,password); stmt = conn.createStatement(); } @Override public void run(SourceContext<HashMap<String, ElectricFenceResultTmp>> ctx) throws Exception { while (flag){ HashMap<String, ElectricFenceResultTmp> map = new HashMap<>(); //1. 查询SQL String sql = "select " + "vins.vin,setting.id,setting.name,setting.address,setting.radius," + "setting.longitude,setting.latitude,setting.start_time,setting.end_time \n" + "from vehicle_networking.electronic_fence_setting setting \n" + "inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id \n" + "where setting.status=1"; ResultSet rs = stmt.executeQuery(sql); while(rs.next()){ map.put( rs.getString("vin"), new ElectricFenceResultTmp( rs.getInt("id"), rs.getString("name"), rs.getString("address"), rs.getFloat("radius"), rs.getDouble("longitude"), rs.getDouble("latitude"), rs.getDate("start_time"), rs.getDate("end_time") ) ); } ctx.collect(map); //关闭rs if(!rs.isClosed()) { rs.close(); } //收集electricFenceResult 指定休眠时间 ms TimeUnit.MICROSECONDS.sleep(Long.parseLong(elerulesTime)); } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { super.close(); if(!stmt.isClosed()) stmt.close(); if(!conn.isClosed()) conn.close(); } }
VehicleInfoMysqlSource
package pers.aishuang.flink.streaming.source.mysql; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pers.aishuang.flink.streaming.entity.VehicleInfoModel; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Date; import java.util.HashMap; public class VehicleInfoMysqlSource extends RichSourceFunction<HashMap<String, VehicleInfoModel>> { //创建日志打印器 private Logger logger = LoggerFactory.getLogger(VehicleInfoMysqlSource.class); //定义JDBC变量 private Connection conn = null; private PreparedStatement pstmt = null; //定义获取配置文件参数工具 ParameterTool parameterTool; //定义是否运行的标记 private boolean isRunning = true; //flag @Override public void open(Configuration parameters) throws Exception { //通过全局变量获取配置参数 parameterTool = (ParameterTool) getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); //获取mysql JDBC的 driver、url、user、password String driver = parameterTool.getRequired("jdbc.driver"); String url = parameterTool.getRequired("jdbc.url"); String user = parameterTool.getRequired("jdbc.user"); String password = parameterTool.getRequired("jdbc.password"); //加载驱动、获取连接、创建sql字符串、获取预编译对象 Class.forName(driver); conn = DriverManager.getConnection(url); String sql = "select t12.vin,t12.series_name,t12.model_name,t12.series_code,t12.model_code,t12.nick_name,t3.sales_date,t4.car_type\n" + " from (\n" + "select t1.vin, t1.series_name, t2.show_name as model_name, t1.series_code,t2.model_code,t2.nick_name,t1.vehicle_id\n" + " from vehicle_networking.dcs_vehicles t1 left join vehicle_networking.t_car_type_code t2 on t1.model_code = t2.model_code) t12\n" + " left join (select vehicle_id, max(sales_date) sales_date from vehicle_networking.dcs_sales group by vehicle_id) t3\n" + " on t12.vehicle_id = t3.vehicle_id\n" + " left join\n" + " (select tc.vin,'net_cat' car_type from vehicle_networking.t_net_car tc\n" + " union all select tt.vin,'taxi' car_type from vehicle_networking.t_taxi tt\n" + " union all select tp.vin,'private_car' car_type from vehicle_networking.t_private_car tp\n" + " union all select tm.vin,'model_car' car_type from vehicle_networking.t_model_car tm) t4\n" + " on t12.vin = t4.vin"; pstmt = conn.prepareStatement(sql); } @Override public void run(SourceContext<HashMap<String, VehicleInfoModel>> ctx) throws Exception { while(isRunning) { ResultSet resultSet = pstmt.executeQuery(); HashMap<String, VehicleInfoModel> vehicleInfoMap = new HashMap<>(); while(resultSet.next()) { VehicleInfoModel vehicleInfoModel = new VehicleInfoModel(); //车架号 String vin = resultSet.getString("vin"); //车系 String seriesName = resultSet.getString("series_name"); //车型 String modelName = resultSet.getString("model_name"); //车系编码 String seriesCode = resultSet.getString("series_code"); //车型编码 String modelCode = resultSet.getString("model_code"); //车辆类型简称 String nickName = resultSet.getString("nick_name"); //出售日期 String salesDate = resultSet.getString("sales_date"); //车辆用途 String carType = resultSet.getString("car_type"); //年限 String liveTime = "-1"; if (salesDate != null) { //当前日期-售出日期=使用年限 liveTime = String.valueOf((new Date().getTime() - resultSet.getDate("sales_date").getTime()) / 1000 / 3600 / 24 / 365); } if (null == vin) { vin = "未知"; } if (null == seriesName) { seriesName = "未知"; } if (null == modelName) { modelName = "未知"; } if (null == seriesCode) { seriesCode = "未知"; } if (null == modelCode) { modelCode = "未知"; } if (null == nickName) { nickName = "未知"; } if (null == salesDate) { salesDate = "未知"; } if (null == carType) { carType = "未知"; } vehicleInfoModel.setSeriesName(seriesName); vehicleInfoModel.setSeriesCode(seriesCode); vehicleInfoModel.setModelName(modelName); vehicleInfoModel.setModelCode(modelCode); vehicleInfoModel.setLiveTime(liveTime); vehicleInfoModel.setNickName(nickName); vehicleInfoModel.setCarType(carType); vehicleInfoModel.setSalesDate(salesDate); //将车辆基础数据封装到集合返回 vehicleInfoMap.put(vin, vehicleInfoModel); } if(vehicleInfoMap.isEmpty()) { logger.warn("从车辆基础数据表中查询数据为空...."); }else{ ctx.collect(vehicleInfoMap); } resultSet.close(); //设置多久从mysql查询一次数据(及规则变更周期时间) Thread.sleep(parameterTool.getInt("vehinfo.millionseconds")); } } @Override public void cancel() { isRunning = false; } /** * 释放资源 * @throws Exception */ @Override public void close() throws Exception { super.close(); if(pstmt != null) pstmt.close(); if(conn != null) conn.close(); } }
这篇关于Flink流处理-Source之Mysql的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-25MySQL报错Duplicate entry '0' for key 'PRIMARY'
- 2024-05-29阿里 Canal 实时同步 MySQL 增量数据至 ClickHouse 数据库
- 2024-05-24在Linux下管理MySQL的大小写敏感性
- 2024-04-26MySQL查出时间比实际晚8小时的解决方案
- 2024-04-01JPA不识别MySQL的枚举类型
- 2024-03-30mysql数据库表卡死解决方法
- 2024-03-15MySQL多数据源笔记5-ShardingJDBC实战
- 2024-03-11natural join mysql
- 2024-03-11关于VS2017,VS2015 中利用 EF使用Mysql 不显示数据源问题解决方案
- 2024-02-26mysql 阿里云xb后缀备份文件恢复-icode9专业技术文章分享