KAFKA数据源同步到SQL SERVER数据库代码实现
2021/12/3 19:06:43
本文主要是介绍KAFKA数据源同步到SQL SERVER数据库代码实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.ruoyi.quartz.controller; import com.ruoyi.quartz.domain.LfoSbomP; import com.ruoyi.quartz.domain.LfoSbomS; import com.ruoyi.quartz.sbom.model.LogStatus; import com.ruoyi.quartz.sbom.process.bean.receive.ReceiveJsonLFOSBBBOMBean; import com.ruoyi.quartz.sbom.process.bean.receive.ReceiveJsonRootBean; import com.ruoyi.quartz.sbom.process.bean.send.SendJsonLFOServiceBOMBean; import com.ruoyi.quartz.service.ILfoSbomLogService; import com.ruoyi.quartz.service.ILfoSbomSService; import com.ruoyi.quartz.util.LongToDateUtils; import com.ruoyi.quartz.util.WCLocationConstants; import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import com.alibaba.fastjson.JSONObject; import java.io.*; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; import com.ruoyi.quartz.service.ILfoSbomPService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author zhulm7 * @date 2021-09-18 09:45:00 * 这个方法主要是消费资源,调用资源路径,消费者拉取kafka中的数据,写入到文件中 * http://localhost/dev-api/message/receive/file/delta */ @RestController @RequestMapping("message/receive") public class ConsumerFileDeltaController { @Autowired private ILfoSbomPService lfoPService; @Autowired private ILfoSbomSService lfosService; private static String KAFKA_ACCOUNT_NAME = "kaf_fineReport"; private static String KAFKA_ACCOUNT_PWD = "XpyO8MBtxC';"; private static String KAFKA_PRODUCER_SERVER = "n1.ikp.tcp.com:8092,n2.ikp.tcp.com:8092"; private static String CONSUMER_GROUP_ID = "fineReport"; private static String CONSUMER_ENABLE_AUTO_COMMIT = "false"; private static String CONSUMER_AUTO_OFFSET_RESET = "earliest"; private static String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000"; private static String CONSUMER_SESSION_TIMEOUT_MS = "10000"; private static String CONSUMER_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; private static String CONSUMER_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; private static String KAFKA_SECURITY_PROTOCOL = "SASL_SSL"; private static String KAFKA_SSL_TRUSTSTORE_LOCATION = ""; private static String KAFKA_SSL_TRUSTSTORE_PWD = ""; private static String KAFKA_SASL_MECHANISM = "SCRAM-SHA-512"; private static String CONSUMER_ELOIS_TOPIC = "ebgwc"; private static KafkaConsumer<String, String> consumer=null; /** * Consumed data from KAFKA * @return 增量方式消费kafka中流式数据处理逻辑 * @throws Exception 2021-09-26 13:37:00 */ @RequestMapping("/file/delta") public void receive(String msg) { //业务逻辑(增量数据)书写的方式。。。。。。 Properties props = new Properties(); props.put("bootstrap.servers", "n1.ikp.lenovo.com:9092,n2.ikp.lenovo.com:9092,n3.ikp.lenovo.com:9092,n4.ikp.lenovo.com:9092,n5.ikp.lenovo.com:9092,n6.ikp.lenovo.com:9092"); props.put("group.id", "fineReport"); props.put("enable.auto.commit", "false"); // 设置不自动提交 props.put("auto.offset.reset", "earliest"); // 从头开始记录信息 earliest latest none props.put("auto.commit.interval.ms", "1000");// 自动提交间隔 props.put("session.timeout.ms", "10000"); // 超时时间30秒 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("ssl.endpoint.identification.algorithm", ""); props.put("ssl.truststore.location", "D:/kafka-2.8.0-src/client_truststore.jks"); props.put("ssl.truststore.password", "WSO2_sp440"); props.put("sasl.mechanism", "SCRAM-SHA-512"); //XpyO8MBt props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_fineReport' password='XpyO8MBtcx';"); final int minBatchSize = 50; final int minBuffer_length = 420000000; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); int buffer_length = 0; KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //ebgwc_lfo-sbom-delta 增量 consumer.subscribe(Collections.singletonList("ebgwc_lfo-sbom-delta")); try { int whileIndex = 0; DateFormat df= new SimpleDateFormat("yyyyMMdd"); String rootFolder="delta"; String parentFolder=df.format(new Date()); String folder= WCLocationConstants.WT_HOME+ File.separator+"cluster"+File.separator+rootFolder+File.separator+parentFolder; File dir=new File(folder); //创建文件夹 if(!dir.exists()) { dir.mkdirs(); } //一次性把lfoNumber加载数据库中的数据,方便后续插入使用,减少连接数据库次数 Set<String> lst =new HashSet<String>(); //List<LfoSbomP> lfoplist = lfoPService.initLfoNumberData();//查询的数据量过大 //for(int i=0;i<lfoplist.size();i++){ //String lfopnumbernc= lfoplist.get(i).getLfoNumber(); //lst.add(lfopnumbernc); //} List<String> lfonumberplist = lfoPService.initAllLfoNumberData(); for(int i=0;i<lfonumberplist.size();i++){ String lfopnumbernc= lfonumberplist.get(i); lst.add(lfopnumbernc); } DateFormat df1= new SimpleDateFormat("yyyyMMddHHmmss"); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); whileIndex += 1; int record_counter = 0; int forIndex = 1; for (ConsumerRecord<String, String> record : records) { buffer.add(record); //数据添加到缓存里 record_counter += 1;//计数器 buffer_length += record.value().length();//字符串长度 //if(buffer.size() >= minBatchSize){//原来通过消息 if(buffer_length > minBuffer_length || record_counter == records.count()){ for (int k = 0; k < buffer.size(); k++) { ConsumerRecord<String, String> record_buffer = buffer.get(k); //获取kafka生产者消息时间 String datestr = LongToDateUtils.longToDate(record_buffer.timestamp()); String topic_name = record_buffer.topic();//主题 String partition = record_buffer.partition()+"";//分区 String offset=record_buffer.offset()+"";//偏移量 String key=record_buffer.key();//日志主键 String value=record_buffer.value();//日志value if(StringUtils.isBlank(value)) { continue; //跳过 } if(!(value.startsWith("{") && value.endsWith("}"))) { if(StringUtils.isBlank(value)) { continue;//跳过 } } //正常情况写入txt日志文件 ReceiveJsonRootBean receiveJsonRootBean = JSONObject.parseObject(value, ReceiveJsonRootBean.class); String LFONumber = receiveJsonRootBean.getLfo().getLfoNumber(); //判断LFONumber字符串是否在内存lst中存在,如果存在,那么跳过,如果不存在,那么写入数据库操作 boolean contains = lst.contains(LFONumber); if(!contains){//如果不存在,那么执行插入条件 //循环遍历结果集 //最后把这个number记录数据库中 lst.add(LFONumber); //文件格式 String filePath = folder + File.separator +"ebgwc_lfo_sbom_delta"+ whileIndex + "_" + forIndex + ".txt"; //向文件尾部追加数据 (保留向文件中插入数据的功能) appendToFile(filePath, value+"\r\n"); int limit = 200;//分批处理,每次处理200个 List<LfoSbomS> buffer_lfosbom = new ArrayList<>(); List<SendJsonLFOServiceBOMBean> lfosbom = receiveJsonRootBean.getLfosbom(); System.out.printf("测试delta:"+lfosbom.toString()); for (int m = 0; m < lfosbom.size(); m++) { //向数据库中增量插入业务数据 LfoSbomS lfo_s = new LfoSbomS(); lfo_s.setLfoNumber(lfosbom.get(m).getLfoNumber()); lfo_s.setChangeType(lfosbom.get(m).getChangeType()); lfo_s.setServicePart(lfosbom.get(m).getServicePart()); lfo_s.setBacCode(lfosbom.get(m).getBacCode()); lfo_s.setComposedQty(lfosbom.get(m).getComposedQty()); lfo_s.setDescription(lfosbom.get(m).getDescription()); lfo_s.setLongdescription(lfosbom.get(m).getLongdescription()); lfo_s.setOffSet(topic_name+"_"+partition+"_"+offset+"_"+LFONumber+"_"+datestr);//主键 lfo_s.setInsertDate(datestr);//推送日期 //向缓存中插入数据 buffer_lfosbom.add(lfo_s); //当没存满的时候遍历结束了,也要执行插入操作 if(limit == buffer_lfosbom.size() || m == lfosbom.size()-1){ //要分成200一批次插入增量的分块数据 lfosService.insertListLfoSbomS(buffer_lfosbom); //清空缓存中的数据 buffer_lfosbom.clear(); } } // 正常增量数据记录日志 LfoSbomP lfo_p = new LfoSbomP(); lfo_p.setLfoNumber(LFONumber); lfo_p.setChangeType("INS"); lfo_p.setPartition(partition); lfo_p.setOffSet(offset); lfo_p.setFilePath(filePath);//文件所在路径 lfo_p.setStatus(LogStatus.SUCCESS); lfo_p.setMessage("ebgwc_lfo-sbom-delta");//存放主题 //插入日志,调用方法 lfoPService.insertLfoSbomP(lfo_p); }else{ //如果set集合中存在lfonumber,那么不错处理,非常重要!!! } } //清空缓存 buffer.clear(); //计数器清零 buffer_length = 0; //索引值加1 forIndex += 1; } } } } catch (Exception e) { System.out.print("commit failed"); System.out.print(e); } finally { //consumer.commitSync(); } } /** * 方法追加文件:使用FileWriter */ public static void appendToFile(String fileName, String content) { try { //打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件 BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true)); writer.write(content); writer.close(); } catch (IOException e) { e.printStackTrace(); } } }
这篇关于KAFKA数据源同步到SQL SERVER数据库代码实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-01-08Docker下的SqlServer发布订阅启用
- 2023-06-05Docker安装MS SQL Server并使用Navicat远程连接
- 2023-05-25深入浅出 SQL Server CDC 数据同步
- 2023-05-12通过空间占用和执行计划了解SQL Server的行存储索引
- 2023-04-24以SQLserver为例的Dapper详细讲解
- 2022-11-30SQL server高级函数查询
- 2022-11-26SQL SERVER数据库服务器CPU不能全部利用原因分析
- 2022-11-21SQL Server 时间算差值/常用函数
- 2022-11-20调试Archery连接SQL Server提示驱动错误
- 2022-10-22SQL Server 完整、差异备份+完整、差异还原(详细讲解,规避错误)