一键同步mysql到数仓(airflow调度)
2022/7/10 2:20:03
本文主要是介绍一键同步mysql到数仓(airflow调度),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
经常会接到产品的需求:同步***表到hive,做分析。(做多了感觉很烦,就写一个工具)
一:背景、功能、流程介绍
1.背景:
1.数仓使用hive存储,datax导数据、airflow调度 2.虽然数据产品同学对datax进行了封装,可以点点点完成mysql表的同步,但是过程太复杂了 还需要自己手动建表,还不支持修改。就萌生了自己写一个工具的想法
2.功能
就是通过mysql配置完成hive的一般建表,airflow调度任务的生成
3.流程
1.配置mysql链接 2.根据mysql数据类型,生成对应的hive表结构,建表 3.生成airflow调度任务(读取mysql表,调用datax,修复分区)
二:代码
1.配置文件介绍:
MysqlToHive.properties
jdbcalias:ptx_read #mysql别名要和同步的数据库的别名保持一致 table:be_product #要同步的表名 hive_prefix:ods.ods_product_ ##生成hive表的前缀 hive_suffix:_dd ##增量表还是全量表 owner=xiaolong.wu ##airflow任务的owner lifecycle=180 ##hive表的生命周期,数据数据产品删除数据 airflowpath=/airflow/dags/ods/ ##生成airflow任务文件的路径 s3path=s3://path ##datax写hive表需要的基本路径 jdbc1alias : hive ##可以写多个mysql链接,不用一个来回改 jdbc1host : 127.0.0.1 jdbc1port : 3306 jdbc1user : root jdbc1passwd : ** jdbc1db_name : test jdbc2alias:read jdbc2host : 127.0.0.1 jdbc2port : 3306 jdbc2user : root jdbc2passwd :** jdbc2db_name :test
2.基本代码:
MysqlToHive.java
import java.io.*; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Properties; class Database {//mysql配置工具类,非重点 private String alias; private String host; private int port; private String user; private String passwd; private String db_name; public String getAlias() {return alias;} public void setAlias(String alias) {this.alias = alias;} public String getHost() {return host;} public void setHost(String host) {this.host = host;} public int getPort() {return port;} public void setPort(int port) {this.port = port;} public String getUser() {return user;} public void setUser(String user) {this.user = user;} public String getPasswd() {return passwd;} public void setPasswd(String passwd) {this.passwd = passwd;} public String getDb_name() {return db_name;} public void setDb_name(String db_name) {this.db_name = db_name;} @Override public String toString() { return "Database{" +"alias='" + alias + '\'' +", host='" + host + '\'' +", port=" + port + ", user='" + user + '\'' +", passwd='" + passwd + '\'' +", db_name='" + db_name + '\'' +'}'; } } public class MysqlToHive { public static String jdbcalias; public static String table; public static String hive_prefix; public static String hive_suffix; public static String owner; public static String lifecycle; public static String airflowpath; public static String s3path; public static Database database = new Database(); public static List<Database> databasesList = new ArrayList<Database>(); public static List<String> mysqlTableColumn = new ArrayList<String>(); public static void parseProperties(Properties pp){ jdbcalias = pp.getProperty("jdbcalias"); table = pp.getProperty("table"); hive_prefix = pp.getProperty("hive_prefix"); owner = pp.getProperty("owner"); lifecycle = pp.getProperty("lifecycle"); airflowpath = pp.getProperty("airflowpath"); s3path = pp.getProperty("s3path"); hive_suffix = pp.getProperty("hive_suffix"); int dbindex = 1;//根据mysql链接的别名,找到对应的mysql配置 while(pp.getProperty("jdbc"+dbindex+"alias")!= null){ Database databaseItem = new Database(); databaseItem.setDb_name(pp.getProperty("jdbc"+dbindex+"db_name")); databaseItem.setHost(pp.getProperty("jdbc"+dbindex+"host")); databaseItem.setAlias(pp.getProperty("jdbc"+dbindex+"alias")); databaseItem.setPasswd(pp.getProperty("jdbc"+dbindex+"passwd")); databaseItem.setPort(Integer.parseInt(pp.getProperty("jdbc"+dbindex+"port"))); databaseItem.setUser(pp.getProperty("jdbc"+dbindex+"user")); System.out.println(databaseItem.toString()); databasesList.add(databaseItem); if(databaseItem.getAlias().equals(jdbcalias)){ database =databasesList.get(dbindex-1); } dbindex++; } } //读取配置文件 public static void readDbPropertiesFile (String fileName){ Properties pp = new Properties(); try { InputStream fps = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName); pp.load(fps); parseProperties(pp); fps.close(); } catch (Exception e) { e.printStackTrace(); } } //链接mysql,拿到对应的表结构,为后续生成hive表做准备 public static void readTableFormatted () throws Exception { String sql="show full fields from " + table; Connection con=null; Statement st=null; ResultSet rs=null; Class.forName("com.mysql.cj.jdbc.Driver"); System.out.println("jdbc:mysql://"+database.getHost()+":"+database.getPort()+"/"+database.getDb_name()+"?serverTimezone=UTC"); con= DriverManager.getConnection("jdbc:mysql://"+database.getHost()+":"+database.getPort()+"/"+database.getDb_name()+"?serverTimezone=UTC", database.getUser(),database.getPasswd()); st=con.createStatement(); rs=st.executeQuery(sql); while(rs.next()) { // System.out.println(rs.getString("Field").toLowerCase()+"|"+rs.getString("Type").toLowerCase() +"|"+ rs.getString("Comment")); mysqlTableColumn.add(rs.getString("Field").toLowerCase()+"|"+rs.getString("Type").toLowerCase() +"|"+ rs.getString("Comment")); } } public static int getmysqlLength(String type) { return Integer.parseInt(type.substring(type.indexOf("(")+1,type.indexOf(")"))); } //根据mysql类型,生成对应的hive类型,并建hive表 public static void buildExecuteHiveSql () throws IOException, InterruptedException { StringBuilder hiveSqlBuilder = new StringBuilder(); hiveSqlBuilder.append("CREATE TABLE "+hive_prefix+table+hive_suffix+" ( \n"); for (int i = 0; i < mysqlTableColumn.size(); i++) { String []fieldAndType= mysqlTableColumn.get(i).split("\\|"); hiveSqlBuilder.append(fieldAndType[0] + " "); if(fieldAndType[1].contains("bigint") || fieldAndType[1].contains("int") || fieldAndType[1].contains("smallint") || fieldAndType[1].contains("tinyint")){ hiveSqlBuilder.append("bigint"); } else if(fieldAndType[1].contains("binary") || fieldAndType[1].contains("varbinary") ){ hiveSqlBuilder.append("binary"); } else if(fieldAndType[1].contains("date") ){ hiveSqlBuilder.append("date"); } else if(fieldAndType[1].contains("double") || fieldAndType[1].contains("float") || fieldAndType[1].contains("decimal")){ hiveSqlBuilder.append("double"); } else if(fieldAndType[1].contains("char") || fieldAndType[1].contains("varchar") || fieldAndType[1].contains("mediumtext") || fieldAndType[1].contains("datetime") || fieldAndType[1].contains("time") || fieldAndType[1].contains("timestamp")){ hiveSqlBuilder.append("string"); } String comment = ""; if(fieldAndType.length==3){comment = fieldAndType[2];}; hiveSqlBuilder.append(" comment '"+comment+ "' ,"); hiveSqlBuilder.append("\n"); } hiveSqlBuilder.deleteCharAt(hiveSqlBuilder.length()-2); //去除最后的回车和, hiveSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) \n"); hiveSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET \n"); hiveSqlBuilder.append("TBLPROPERTIES ('lifecycle'='"+lifecycle+"','owner'='"+owner+"','parquet.compression'='snappy');"); System.out.println(hiveSqlBuilder.toString()); Process process = new ProcessBuilder("hive","-e","\""+hiveSqlBuilder.toString()+"\"").redirectErrorStream(true).start();; BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; while ((line = br.readLine()) != null) { System.out.println(line); } process.waitFor(); } //拼接mysql查询语句,用于airflow调度中查询mysql的sql语句 public static void printAirflowJobGetSelectSql (StringBuilder mysqlSelectBuilder,StringBuilder hiveTypeBuilder){ mysqlSelectBuilder.append("select "); for (int i = 0; i < mysqlTableColumn.size(); i++) { String []fieldAndType= mysqlTableColumn.get(i).split("\\|"); mysqlSelectBuilder.append(fieldAndType[0] + ","); hiveTypeBuilder.append("{\"name\":\""+fieldAndType[0]+"\",\"type\":\""); if(fieldAndType[1].contains("bigint") || fieldAndType[1].contains("int") || fieldAndType[1].contains("smallint") || fieldAndType[1].contains("tinyint")){ hiveTypeBuilder.append("bigint"); } else if(fieldAndType[1].contains("binary") || fieldAndType[1].contains("varbinary") ){ hiveTypeBuilder.append("binary"); } else if(fieldAndType[1].contains("date") ){ hiveTypeBuilder.append("date"); } else if(fieldAndType[1].contains("double") || fieldAndType[1].contains("float") || fieldAndType[1].contains("decimal")){ hiveTypeBuilder.append("double"); } else if(fieldAndType[1].contains("char") || fieldAndType[1].contains("varchar") || fieldAndType[1].contains("mediumtext") || fieldAndType[1].contains("datetime") || fieldAndType[1].contains("time") || fieldAndType[1].contains("timestamp")){ hiveTypeBuilder.append("string"); } hiveTypeBuilder.append("\"},"); } hiveTypeBuilder.deleteCharAt(hiveTypeBuilder.length()-1); mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length()-1); mysqlSelectBuilder.append(" from " + table); } //在固定路径下生成airflow文件,就是生成调度 //该部分涉及到公司封装的代码太多了,就不保留了 public static void printAirflowJob () throws FileNotFoundException { String db = hive_prefix.substring(0,hive_prefix.indexOf(".")); String odsTableName = hive_prefix.substring(hive_prefix.indexOf(".")+1); if(new File(odsTableName).exists()){ System.out.println("folder exist,please delete the folder "+airflowpath+odsTableName+table+hive_suffix); } else{ StringBuilder mysqlSelectBuilder = new StringBuilder(); StringBuilder hiveTypeBuilder = new StringBuilder(); printAirflowJobGetSelectSql(mysqlSelectBuilder,hiveTypeBuilder); File dir = new File(airflowpath+odsTableName+table+hive_suffix); dir.mkdirs(); PrintWriter pw = new PrintWriter(airflowpath+odsTableName+table+hive_suffix+"/"+odsTableName+table+hive_suffix+"_dag.py"); pw.println("import airflow"); pw.println("job_name='"+hive_prefix+table+hive_suffix+"'"); pw.println("job_owner='"+owner+"'"); pw.println("default_job_retries=1"); pw.println("default_job_retry_delay=timedelta(minutes=5)"); pw.println("default_start_date=airflow.utils.dates.days_ago(1)"); pw.println("dag_schedule_interval='12 1 * * *'"); pw.println(""); pw.println(""); pw.println("hive_table_name = job_name"); pw.println(""); pw.println("default_args = {"); pw.println(" 'owner': job_owner,"); pw.println(" 'depends_on_past': False,"); pw.println(" 'start_date': default_start_date,"); pw.println(" 'email': default_email_to,"); pw.println(" 'email_on_failure': False,"); pw.println(" 'email_on_retry': False,"); pw.println(" 'retries': default_job_retries,"); pw.println(" 'retry_delay': default_job_retry_delay,"); pw.println(" 'pool': default_pool,"); pw.println(" 'priority_weight': 10"); pw.println("}"); pw.println(""); } } public static void main(String[] args) throws Exception { readDbPropertiesFile("MysqlToHive.properties"); readTableFormatted(); buildExecuteHiveSql(); printAirflowJob(); } }
三:执行样例
1.mysql样例:
CREATE TABLE mysql_column_test ( bigint_test bigint(10), int_test int(10), smallint_test smallint(10), binary_test binary(20), varbinary_test varbinary(20), decimal_test decimal(30,5), double_test double(30,5), float_test float(30,5), char_test char(40), varchar_test varchar(40), mediumtext_test mediumtext, date_test date, datetime_test datetime, time_test time, timestamp_test timestamp DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8; insert into mysql_column_test(bigint_test,int_test,smallint_test,binary_test,varbinary_test,decimal_test,double_test,float_test,char_test,varchar_test,mediumtext_test, date_test,datetime_test,time_test) values(1,2,3,UNHEX('4'),UNHEX('5'),6.1,7.1,8.1,9.1,'10','11','2022-01-01','2020-09-14 23:18:17',CURRENT_TIMESTAMP);
2.代码执行:直接复制代码过去,新建文件,执行
javac -cp mysql-connector-java-8.0.18.jar MysqlToHive.java java -classpath mysql-connector-java-8.0.18.jar: MysqlToHive
这篇关于一键同步mysql到数仓(airflow调度)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-16MySQL资料:新手入门教程
- 2024-11-16MySQL资料:新手入门教程
- 2024-11-15MySQL教程:初学者必备的MySQL数据库入门指南
- 2024-11-15MySQL教程:初学者必看的MySQL入门指南
- 2024-11-04部署MySQL集群项目实战:新手入门教程
- 2024-11-04如何部署MySQL集群资料:新手入门指南
- 2024-11-02MySQL集群项目实战:新手入门指南
- 2024-11-02初学者指南:部署MySQL集群资料
- 2024-11-01部署MySQL集群教程:新手入门指南
- 2024-11-01如何部署MySQL集群:新手入门教程