Flink流处理-Sink之HBase
2021/11/8 6:12:34
本文主要是介绍Flink流处理-Sink之HBase,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
TripDriveToHBaseSink
package pers.aishuang.flink.streaming.sink.hbase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pers.aishuang.flink.streaming.entity.TripModel; import pers.aishuang.flink.streaming.utils.DateUtil; import java.io.IOException; public class TripDriveToHBaseSink extends RichSinkFunction<TripModel> { private final static Logger logger = LoggerFactory.getLogger(TripDriveToHBaseSink.class); private String tableName; private Connection conn = null; private BufferedMutator mutator = null; public TripDriveToHBaseSink(String _tableName) { this.tableName = _tableName; } @Override public void open(Configuration parameters) throws Exception { //从上下文获取到全局参数 ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); //获取HBase Java API相关参数 String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum"); String port = globalJobParameters.getRequired("zookeeper.clientPort"); org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set(HConstants.ZOOKEEPER_QUORUM,zkQuorum); conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port); conf.set(TableInputFormat.INPUT_TABLE,tableName); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf); //通过连接工厂创建连接 conn = ConnectionFactory.createConnection(hbaseConf); //设置缓存对象的多大、多长时间刷写到HBase中 //缓存写入HBaes,与Kafka的缓存写入Kafka有异曲同工之秒 BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName)); //设置缓存达到一定的大小:10M params.writeBufferSize(10*1024*1024L); //设置缓存达到一定的时间:5s params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L); //通过连接获取表对象 try { mutator = conn.getBufferedMutator(params); } catch (IOException e) { logger.error("当前获取bufferedMutator 失败:" + e.getMessage()); } } //5. 重写 invoke 方法,将读取的数据写入到 hbase @Override public void invoke(TripModel value, Context context) throws Exception { //5.1 setDataSourcePut输入参数value,返回put对象 try { Put put = setDataSourcePut(value); mutator.mutate(put); //5.2 指定时间内的数据强制刷写到hbase mutator.flush(); } catch (Exception ex) { logger.error("写入到hbase失败:" + ex.getMessage()); } } //4.重写close方法 @Override public void close() throws Exception { //4.1 关闭hbase 表和连接资源 if (mutator != null) mutator.close(); if (!conn.isClosed()) conn.close(); } //6. 实现 setDataSourcePut 方法 /** * 每条对象生成一个 put * 1.表名 2.rowkey 3.列簇 4.列名和列值 * * @param tripModel * @return */ private Put setDataSourcePut(TripModel tripModel) { String rowKey = tripModel.getVin() + "_" + DateUtil.convertStringToDate(tripModel.getTripStartTime()).getTime(); String cf = "cf"; Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vin"), Bytes.toBytes(tripModel.getVin())); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastSoc"), Bytes.toBytes(String.valueOf(tripModel.getLastSoc()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastMileage"), Bytes.toBytes(String.valueOf(tripModel.getLastMileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStartTime"), Bytes.toBytes(tripModel.getTripStartTime())); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getStart_BMS_SOC()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_longitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_longitude()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_latitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_latitude()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_mileage"), Bytes.toBytes(String.valueOf(tripModel.getStart_mileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getEnd_BMS_SOC()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_longitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_longitude()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_latitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_latitude()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_mileage"), Bytes.toBytes(String.valueOf(tripModel.getEnd_mileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripEndTime"), Bytes.toBytes(tripModel.getTripEndTime())); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("mileage"), Bytes.toBytes(String.valueOf(tripModel.getMileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("max_speed"), Bytes.toBytes(String.valueOf(tripModel.getMax_speed()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("soc_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getSoc_comsuption()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("time_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getTime_comsuption()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_low_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_low_speed_nums()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_medium_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_medium_speed_nums()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_high_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_high_speed_nums()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_SOC()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_SOC()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_SOC()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_Mileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_Mileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_Mileage()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStatus"), Bytes.toBytes(String.valueOf(tripModel.getTripStatus()))); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("processTime"), Bytes.toBytes(DateUtil.getCurrentDateTime())); return put; } }
TripSampleToHBaseSink
package pers.aishuang.flink.streaming.sink.hbase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pers.aishuang.flink.streaming.utils.DateUtil; import pers.aishuang.flink.streaming.utils.StringUtil; import java.io.IOException; public class TripSampleToHBaseSink extends RichSinkFunction<String[]> { //创建日志打印器 private final static Logger logger = LoggerFactory.getLogger(TripSampleToHBaseSink.class); //定义当前类的私有变量 private String tableName; //定义连接 org.apache.hadoop.hbase.client.Connection conn = null; //定义表操作的对象 BufferedMutator mutator = null; //创建一个有参数-表名的构造方法 public TripSampleToHBaseSink(String _tableName) { this.tableName = _tableName; } //重写open方法 @Override public void open(Configuration parameters) throws Exception { //1、从上下文获取到全局的参数 ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); //2、获取HBase Java API相关参数 //-- 指定ZK集群服务端地址(quorum:法定人数) String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum"); //-- 指定ZK客户端端口号 String port = globalJobParameters.getRequired("zookeeper.clientPort"); //-- 创建配置 org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); //-- 设置配置,加载参数 conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM,zkQuorum); conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port); conf.set(TableInputFormat.INPUT_TABLE,tableName); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf); //3、通过连接工厂创建连接 conn = ConnectionFactory.createConnection(hbaseConf); //-- 设置缓存对象的多大、多长时间刷新到Hbase中 BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName)); //-- 写缓存大小为10M params.writeBufferSize(10*1024*1024L);//10M //-- 写缓存刷写时间为5s params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//5s //4、通过连接获取表对象 try { mutator = conn.getBufferedMutator(params); } catch (IOException e) { logger.error("当前获取bufferedMutator 失败:" + e.getMessage()); } } //5、重写invoke方法,将读取的数据写入到HBase @Override public void invoke(String[] value, Context context) throws Exception { //-- setDataSourcePut输入参数value,返回put对象 try { Put put = setDataSourcePut(value); mutator.mutate(put); //-- 指定时间内的数据强制刷写到HBase mutator.flush(); } catch (IOException e) { logger.error("写入到HBase失败:" + e.getMessage()); } } //重写close方法 @Override public void close() throws Exception { //关闭hbase表和连接资源 if(mutator != null) mutator.close(); if( conn != null ) conn.close(); } /** * 实现setDataSourcePut方法 * 每个对象生成一个 put * 1、表名 2、rowkey 3、列簇 4、列别和列值 * @param tripDriveArr * @return */ private Put setDataSourcePut(String[] tripDriveArr) { //1. 如何设计rowkey VIN+时间戳反转 String rowkey = tripDriveArr[0] + StringUtil.reverse(tripDriveArr[1]); //2. 通过rowkey实例化put Put put = new Put(Bytes.toBytes(rowkey)); //3. 定义列簇的名称 String cf = "cf"; put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("vin"),Bytes.toBytes(tripDriveArr[0])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTimeStamp"),Bytes.toBytes(tripDriveArr[1])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("soc"),Bytes.toBytes(tripDriveArr[2])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("mileage"),Bytes.toBytes(tripDriveArr[3])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("speed"),Bytes.toBytes(tripDriveArr[4])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("gps"),Bytes.toBytes(tripDriveArr[5])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTime"),Bytes.toBytes(tripDriveArr[6])); put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("processTime"),Bytes.toBytes(DateUtil.getCurrentDateTime())); return put; } }
这篇关于Flink流处理-Sink之HBase的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-23线下车企门店如何实现线上线下融合?
- 2024-12-23鸿蒙Next ArkTS编程规范总结
- 2024-12-23物流团队冬至高效运转,哪款办公软件可助力风险评估?
- 2024-12-23优化库存,提升效率:医药企业如何借助看板软件实现仓库智能化
- 2024-12-23项目管理零负担!轻量化看板工具如何助力团队协作
- 2024-12-23电商活动复盘,为何是团队成长的核心环节?
- 2024-12-23鸿蒙Next ArkTS高性能编程实战
- 2024-12-23数据驱动:电商复盘从基础到进阶!
- 2024-12-23从数据到客户:跨境电商如何通过销售跟踪工具提升营销精准度?
- 2024-12-23汽车4S店运营效率提升的核心工具