pyflink实时接收kafka数据至mysql

2022/1/25 19:07:09

本文主要是介绍pyflink实时接收kafka数据至mysql,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings
from pyflink.table.catalog import HiveCatalog
from pyflink.table import SqlDialect

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)

catalog = HiveCatalog("myhive", "ods", "/home/hadoop/hive-3.1.2/conf")

# Register the catalog
t_env.register_catalog("myhive", catalog)
# set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
t_env.use_catalog("myhive")
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
# Create a catalog table
t_env.execute_sql("""CREATE TABLE IF NOT EXISTS sink_parent_info(
 etl_date STRING
,id                       BIGINT
,user_id                  BIGINT
,height                   DECIMAL(5,2)
,weight                   DECIMAL(5,2)
)
""")

# should return the tables in current catalog and database.
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(f"""
CREATE TEMPORARY TABLE source_parent_info(
 id                       bigint
,user_id                  bigint
,height                   decimal(5,2)
,weight                   decimal(5,2)
) with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://xxxx:3306/xxxx',
'connector.driver'= 'com.mysql.cj.jdbc.Driver',
'connector.table' = 'parent_info',
'connector.username' = 'root',
'connector.password' = 'xxxx',
'connector.write.flush.interval' = '1s')
""")

t_env.execute_sql("""
INSERT INTO sink_parent_info
SELECT
id
,user_id
,height
,weight

FROM source_parent_info
""").wait()

参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830



这篇关于pyflink实时接收kafka数据至mysql的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程