kafka自定义producer从mysql获取数据存储至kafka
2021/5/23 19:27:16
本文主要是介绍kafka自定义producer从mysql获取数据存储至kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package test; /** * @Description 细节决定成败 * @Date 2021/5/23 14:45 * @Author liaoxuan **/ import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; import util.GlobalConfigUtil; import java.util.Properties; /** * Kafka生产消息工具类 */ public class KafkaSender { private String topic; public KafkaSender(String topic){ super(); this.topic = topic; } /** * 发送消息到Kafka指定topic * @param topic topic名字 * @param data 数据 */ public static void sendMessage(String topic , String data){ Producer<String, String> producer = createProducer(); producer.send(new KeyedMessage<String , String>(topic , data)); } private static Producer<String , String> createProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrapServers); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeperConnect); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer<String, String>(new ProducerConfig(properties)); } }
package test; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import util.GlobalConfigUtil; import java.io.UnsupportedEncodingException; import java.sql.*; /** * @Description 细节决定成败 * @Date 2021/5/23 14:46 * @Author liaoxuan **/ public class MysqlToKafkaTest { public static void main(String[] args) throws Exception{ MysqlToKafkaTest a = new MysqlToKafkaTest(); Connection con = a.getCon(); String sql = "select * from userconfig"; Statement statement = con.createStatement(); ResultSet rs = statement.executeQuery(sql); JSONArray objects = resultSetToJson(rs); for (int i=0;i<objects.size();i++){ System.out.println(objects.get(i)); KafkaSender.sendMessage(GlobalConfigUtil.kafkaInputTopic, String.valueOf(objects.get(i))); } } public Connection getCon() { //数据库连接名称 String username= GlobalConfigUtil.mysqlUsername; //数据库连接密码 String password=GlobalConfigUtil.mysqlPassword; String driver=GlobalConfigUtil.mysqlDriver; //其中test为数据库名称 String url=GlobalConfigUtil.mysqlUrl; Connection conn=null; try{ Class.forName(driver); conn=(Connection) DriverManager.getConnection(url,username,password); }catch(Exception e){ e.printStackTrace(); } return conn; } /** * ResultSet转JSON */ public static JSONArray resultSetToJson(ResultSet rs) throws SQLException, JSONException, UnsupportedEncodingException { // json数组 JSONArray array = new JSONArray(); // 获取列数 ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); // 遍历ResultSet中的每条数据 while (rs.next()) { JSONObject jsonObj = new JSONObject(); // 遍历每一列 for (int i = 1; i <= columnCount; i++) { String value = null; String columnName = metaData.getColumnLabel(i);//列名称 if (rs.getString(columnName) != null && !rs.getString(columnName).equals("")) { value = new String(rs.getBytes(columnName), "UTF-8");//列的值,有数据则转码 // System.out.println("===" + value); } else { value = "";//列的值,为空,直接取出去 } jsonObj.put(columnName, value); } array.add(jsonObj); } rs.close(); return array; } }
package util; /* 读取配置文件的一个工具类 */ import java.util.ResourceBundle; public class GlobalConfigUtil { //获取一个资源加载器 //资源加载器会去自动加载CLASSPATH中的application.properties文件 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application"); //使用resourceBundle的getSting方法 来读取配置 public static String mysqlUsername = resourceBundle.getString("mysql.username"); public static String mysqlPassword = resourceBundle.getString("mysql.password"); public static String mysqlDriver = resourceBundle.getString("mysql.driver"); public static String mysqlUrl = resourceBundle.getString("mysql.url"); public static String kafkaBootstrapServers = resourceBundle.getString("kafka.bootstrap.servers"); public static String kafkaZookeeperConnect = resourceBundle.getString("kafka.zookeeper.connect"); public static String kafkaInputTopic = resourceBundle.getString("kafka.input.topic"); public static void main(String[] args) { System.out.println(mysqlUsername); System.out.println(mysqlPassword); System.out.println(mysqlDriver); System.out.println(mysqlUrl); System.out.println(kafkaBootstrapServers); System.out.println(kafkaZookeeperConnect); System.out.println(kafkaInputTopic); } }
application.properties:
mysql.username=root mysql.password=root mysql.driver=com.mysql.jdbc.Driver mysql.url=jdbc:mysql://localhost:3306/test # #kafka的配置 # kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092 kafka.zookeeper.connect=node01:2181,node02:2181,node03:2181 kafka.input.topic=MysqlToKafka
这篇关于kafka自定义producer从mysql获取数据存储至kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-21MySQL集群部署资料:新手入门教程
- 2024-09-21MySQL集群资料:初学者入门指南
- 2024-09-21部署MySQL集群资料:新手入门教程
- 2024-09-20MySQL集群部署教程:新手入门指南
- 2024-09-20MySQL集群教程:初学者必备指南
- 2024-09-20部署MySQL集群项目实战:新手入门教程
- 2024-09-20如何部署MySQL集群:简单教程
- 2024-09-20MySQL集群部署:新手入门指南
- 2024-09-20部署MySQL集群学习:入门指南
- 2024-09-20部署MySQL集群入门教程