基于 Ubuntu 玩转 Hudi Docker Demo (3)—— Spark写入和查询
2022/2/12 7:15:22
本文主要是介绍基于 Ubuntu 玩转 Hudi Docker Demo (3)—— Spark写入和查询,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
简介
上一篇文章《基于 Ubuntu 玩转 Hudi Docker Demo (2)—— 测试数据写入 Kafka》介绍了如何将测试数据写入到 kafka 集群。
本文介绍如何使用 Spark 消费 Kafka 数据,并将数据写入 HDFS。 其中 Hudi 以 Jar 包的方式引入到 Spark。
Hudi 表和查询的类型
表类型 | 支持的查询类型 |
---|---|
Copy On Write (写时复制,简称 cow) | 支持快照查询和增量查询 |
Merge On Read (写时复制,简称 mor) | 支持快照查询、增量查询、读优化查询 |
1. 表类型
- Copy On Write (写时复制,简称 cow) : 以 列式(e.g parquet) 格式存储数据,数据写入的时候同步合并历史数据。
- Merge On Read (读时合并,简称 mor): 结合列存(e.g parquet)和行存(e.g avro)方式来存储数据。增量的数据先以行存的方式存储数据,然后以同步或者异步的方式合并数据生成性的列存文件。
权衡 | CopyOnWrite | MergeOnRead |
---|---|---|
数据延迟 | 高 | 低 |
查询延迟 | 低 | 高 |
更新成本 | 高 ,需要重写整个 parquet 文件 | 低,append 方式写 增量文件 |
写放大 | 大 | 小,取决于合并策略 |
2. 查询类型
- Snapshot Queries (快照查询):可以查询到最近一次成功提交或者合并的快照数据。
- Incremental Queries (增量查询):可查询指定提交或者合并后新写入表的数据。
- Read Optimized Queries (读优化查询):仅限于 MergeOnRead 表,可以查询到列存文件的数据。
对于 MergeOnRead 表选择查询类型需做以下权衡:
权衡 | Snapshot Queries | Read Optimized Queries |
---|---|---|
数据延迟 | 低 | 高 |
查询延迟 | 高 | 低 |
具体过程
1. 进入容器 adhoc-2
sudo docker exec -it adhoc-2 /bin/bash
2. 执行 spark-submit
执行以下spark-submit 命令以启动delta-streamer,从 kafka 集群消费数据,采用 COPY_ON_WRITE 模式写入到HDFS,表名 stock_ticks_cow
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_cow \ --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
执行以下spark-submit 命令以启动delta-streamer,从 kafka 集群消费数据,采用 MERGE_ON_READ 模式写入到HDFS,表名 stock_ticks_mor
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_mor \ --target-table stock_ticks_mor \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --disable-compaction
3. 查看 hdfs 文件
stock_ticks_cow
stock_ticks_cow以日期分区,分区目录下有一个 元数据文件 和 parquet 格式的数据文件。
在 .hoodle 目录下可以看见 commit 信息。
stock_ticks_mor
4. 同步到 Hive 元数据
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \ --jdbc-url jdbc:hive2://hiveserver:10000 \ --user hive \ --pass hive \ --partitioned-by dt \ --base-path /user/hive/warehouse/stock_ticks_cow \ --database default \ --table stock_ticks_cow /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \ --jdbc-url jdbc:hive2://hiveserver:10000 \ --user hive \ --pass hive \ --partitioned-by dt \ --base-path /user/hive/warehouse/stock_ticks_mor \ --database default \ --table stock_ticks_mor
4. Spark SQL 查询
进入 spark-shell:
$SPARK_INSTALL/bin/spark-shell \ --jars $HUDI_SPARK_BUNDLE \ --master local[2] \ --driver-class-path $HADOOP_CONF_DIR \ --conf spark.sql.hive.convertMetastoreParquet=false \ --deploy-mode client \ --driver-memory 1G \ --executor-memory 3G \ --num-executors 1 \ --packages org.apache.spark:spark-avro_2.11:2.4.4
- stock_ticks_cow 是 CopyOnWrite 表
- stock_ticks_mor_ro 是 MergeOnRead 表,用于读优化查询
- stock_ticks_mor_rt 是 MergeOnRead 表,用于快照查询
Spark context Web UI available at http://adhoc-2:4040 Spark context available as 'sc' (master = local[2], app id = local-1644547729231). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("show tables").show(100, false) +--------+------------------+-----------+ |database|tableName |isTemporary| +--------+------------------+-----------+ |default |stock_ticks_cow |false | |default |stock_ticks_mor_ro|false | |default |stock_ticks_mor_rt|false | +--------+------------------+-----------+ ## Run max timestamp query against COW table scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ ## Projection Query scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022538859 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+ # Merge-On-Read Queries: # Run ReadOptimized Query. Notice that the latest timestamp is 10:29 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ # Run Snapshot Query. Notice that the latest timestamp is again 10:29 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ # Run Read Optimized and Snapshot project queries scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022707523 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022707523 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022707523 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022707523 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+
5. 写入第二批数据到 kafka
退出 docker 容器,在 Ubuntu 命令行执行
cat docker/demo/data/batch_2.json | kafkacat -b kafkabroker -t stock_ticks -P
6. 进入容器 adhoc-2,执行 spark-submit 写入第二批数据到 Hudi 表
进入容器 adhoc-2
sudo docker exec -it adhoc-2 /bin/bash
第二批数据到 Hudi CopyOnWrite 表
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_cow \ --target-table stock_ticks_cow \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
查看 hdfs 目录:
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_cow
第二批数据到 Hudi MergeOnRead 表
spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /user/hive/warehouse/stock_ticks_mor \ --target-table stock_ticks_mor \ --props /var/demo/config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --disable-compaction
查看 hdfs 目录:
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_mor
7. Spark SQL 查询
进入 spark-shell:
$SPARK_INSTALL/bin/spark-shell \ --jars $HUDI_SPARK_BUNDLE \ --master local[2] \ --driver-class-path $HADOOP_CONF_DIR \ --conf spark.sql.hive.convertMetastoreParquet=false \ --deploy-mode client \ --driver-memory 1G \ --executor-memory 3G \ --num-executors 1 \ --packages org.apache.spark:spark-avro_2.11:2.4.4
Spark context Web UI available at http://adhoc-2:4040 Spark context available as 'sc' (master = local[2], app id = local-1644571477181). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. # 1. 快照方式查询 CopyOnWrite 表 scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:59:00| +------+-------------------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+ # 2. 增量方式查询 CopyOnWrite 表 scala> import org.apache.hudi.DataSourceReadOptions scala> val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220211064632000").load("/user/hive/warehouse/stock_ticks_cow") scala> hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1") scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false); +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+ # 3. 读优化方式查询 MergeOnRead 表 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:29:00| +------+-------------------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022538859 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| +-------------------+------+-------------------+------+---------+--------+ # 4. 快照方式查询 MergeOnRead 表 scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false) +------+-------------------+ |symbol|max(ts) | +------+-------------------+ |GOOG |2018-08-31 10:59:00| +------+-------------------+ scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false) +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+ # 5. 增量方式查询 MergeOnRead 表 scala> val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220211064632000").load("/user/hive/warehouse/stock_ticks_mor") scala> hoodieIncViewDF.registerTempTable("stock_ticks_mor_incr_tmp1") scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_incr_tmp1 where symbol = 'GOOG'").show(100, false); +-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| +-------------------+------+-------------------+------+---------+--------+
其中,对于 MergeOnRead 表,读优化查询和快照查询得到的结果是不一样的。
读优化查询:
+-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211022538859 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085| <<<<<<<<<<<<<<<<<< +-------------------+------+-------------------+------+---------+--------+
快照查询:
+-------------------+------+-------------------+------+---------+--------+ |_hoodie_commit_time|symbol|ts |volume|open |close | +-------------------+------+-------------------+------+---------+--------+ |20220211022538859 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 | |20220211064632375 |GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215| <<<<<<<<<<<<<<<<<< +-------------------+------+-------------------+------+---------+--------+
由此可以看出读优化查询与快照查询的区别。
这篇关于基于 Ubuntu 玩转 Hudi Docker Demo (3)—— Spark写入和查询的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-27docker容器内没有bash,怎么通过docker exec -it进入容器内部?-icode9专业技术文章分享
- 2024-12-26alpine构建的镜像无法使用docker exec -it 进入内部怎么办?-icode9专业技术文章分享
- 2024-12-24Docker环境部署资料详解
- 2024-12-24Docker环境部署教程:新手入门详解
- 2024-12-24Docker环境部署项目实战教程
- 2024-12-24Docker环境部署学习:初学者指南
- 2024-12-24Docker环境部署入门:新手必读指南
- 2024-12-20Docker部署资料:新手入门教程
- 2024-12-19Docker部署实战:新手入门教程
- 2024-12-19Docker部署教程:新手入门详解