FlinkSQL实践记录
2022/1/23 2:06:33
本文主要是介绍FlinkSQL实践记录,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1.背景
Flink目前在国内发展的火热,笔者在2018首次接触了flink之后,总是在官网/公众号各个地方追踪它的新动态,但一直没机会在生产上使用,近期有流式计算的需求,且目前企业对计算的实时性也要求越来越高,今天先在本地环境测试一把。测试把kafka中数据通过flink处理后写入mysql。
环境: java8 , scala2.12
版本: flink1.13.0
maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--kafka connector--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- kafka中数据已json格式存储,解析需要flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!--JDBC connector--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency>
2. 代码
2.1 kafka生产者
for (int i = 1; i < 10; i++) { JSONObject json = new JSONObject(); json.put("id", i+""); json.put("name", "name"+i); ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>( "flinksqldemo", i, json.toJSONString() ); // 发送消息 producer.send(record);
2.2 flink处理
// 创建执行环境 //EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // 把kafka中的topic映射成一个输入临时表 tableEnv.executeSql( "CREATE TABLE source1(id STRING, name STRING) WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flinkdemo'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'flinkGroup'," + " 'scan.startup.mode' = 'earliest-offset'," + " 'format' = 'json'," + " 'json.fail-on-missing-field' = 'false'," + " 'json.ignore-parse-errors' = 'true')" ); // test_info表需在mysql中提前建好 // 把Mysql中的表映射为一个输出临时表 String mysql_sql = "CREATE TABLE mysql_sink (" + " id string," + " name string " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false'," + " 'table-name' = 'test_info'," + " 'username' = 'test'," + " 'password' = 'pwd'" + ")"; tableEnv.executeSql(mysql_sql); // 插入数据 tableEnv.executeSql("INSERT INTO mysql_sink SELECT id, name FROM source1");
mysql查询表,可观察到在实时的插入数据。
3. 遇到的问题及解决办法
3.1 sql解析异常
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "<EOF>" at line 1, column 263. Was expecting one of: "UESCAPE" ... <QUOTED_STRING> ... ")" ... "," ...
原因是以下代码中,WITH ( xxx )
,少了右括号
// 把kafka中的topic映射成一个输入临时表 tableEnv.executeSql( "CREATE TABLE sensor_source(id STRING, name STRING) WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flinkdemo'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'flinkGroup'," + " 'scan.startup.mode' = 'earliest-offset'," + " 'format' = 'json'" );
3.2 json格式问题
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
缺少flink-json依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
3.3 print输出问题
Exception in thread "main" org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:775) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:854) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) at org.apache.FlinkSqlDemo.main(FlinkSqlDemo.java:71) Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
缺少依赖flink-client
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
3.4 sink到mysql 程序中断,mysql表无数据,无报错
Process finished with exit code 0
缺少jdbc jar包
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency>
4. 拓展
# 输出到控制台 String print_sql = "CREATE TABLE print_sink (" + "id STRING," + "name STRING" + ") WITH (" + " 'connector' = 'print'" + ")"; tableEnv.executeSql(print_sql ); tableEnv.executeSql("INSERT INTO print_sink SELECT * FROM sensor_source"); # sink到kafka另一个主题 String kafka_sink_sql = "create table kafka_sink (id string, name string) with (" + " 'connector' = 'kafka'," + " 'topic' = 'test_info_2'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'format' = 'json'" + ")"; tableEnv.executeSql(kafka_sink_sql); tableEnv.executeSql("insert into kafka_sink select * from sensor_source");
这篇关于FlinkSQL实践记录的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24怎么修改Kafka的JVM参数?-icode9专业技术文章分享
- 2024-12-23线下车企门店如何实现线上线下融合?
- 2024-12-23鸿蒙Next ArkTS编程规范总结
- 2024-12-23物流团队冬至高效运转,哪款办公软件可助力风险评估?
- 2024-12-23优化库存,提升效率:医药企业如何借助看板软件实现仓库智能化
- 2024-12-23项目管理零负担!轻量化看板工具如何助力团队协作
- 2024-12-23电商活动复盘,为何是团队成长的核心环节?
- 2024-12-23鸿蒙Next ArkTS高性能编程实战
- 2024-12-23数据驱动:电商复盘从基础到进阶!
- 2024-12-23从数据到客户:跨境电商如何通过销售跟踪工具提升营销精准度?