基于 Ubuntu 玩转 Hudi Docker Demo (3)—— Spark写入和查询

2022/2/12 7:15:22

本文主要是介绍基于 Ubuntu 玩转 Hudi Docker Demo (3)—— Spark写入和查询,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

简介

上一篇文章《基于 Ubuntu 玩转 Hudi Docker Demo (2)—— 测试数据写入 Kafka》介绍了如何将测试数据写入到 kafka 集群。
本文介绍如何使用 Spark 消费 Kafka 数据,并将数据写入 HDFS。 其中 Hudi 以 Jar 包的方式引入到 Spark。

Hudi 表和查询的类型

表类型支持的查询类型
Copy On Write (写时复制,简称 cow)支持快照查询和增量查询
Merge On Read (写时复制,简称 mor)支持快照查询、增量查询、读优化查询

1. 表类型

  • Copy On Write (写时复制,简称 cow) : 以 列式(e.g parquet) 格式存储数据,数据写入的时候同步合并历史数据。
  • Merge On Read (读时合并,简称 mor): 结合列存(e.g parquet)和行存(e.g avro)方式来存储数据。增量的数据先以行存的方式存储数据,然后以同步或者异步的方式合并数据生成性的列存文件。
权衡CopyOnWriteMergeOnRead
数据延迟
查询延迟
更新成本高 ,需要重写整个 parquet 文件低,append 方式写 增量文件
写放大小,取决于合并策略

2. 查询类型

  • Snapshot Queries (快照查询):可以查询到最近一次成功提交或者合并的快照数据。
  • Incremental Queries (增量查询):可查询指定提交或者合并后新写入表的数据。
  • Read Optimized Queries (读优化查询):仅限于 MergeOnRead 表,可以查询到列存文件的数据。

对于 MergeOnRead 表选择查询类型需做以下权衡:

权衡Snapshot QueriesRead Optimized Queries
数据延迟
查询延迟

具体过程

1. 进入容器 adhoc-2

sudo docker exec -it adhoc-2 /bin/bash

在这里插入图片描述

2. 执行 spark-submit

执行以下spark-submit 命令以启动delta-streamer,从 kafka 集群消费数据,采用 COPY_ON_WRITE 模式写入到HDFS,表名 stock_ticks_cow

spark-submit \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
  --table-type COPY_ON_WRITE \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
  --source-ordering-field ts  \
  --target-base-path /user/hive/warehouse/stock_ticks_cow \
  --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

在这里插入图片描述

执行以下spark-submit 命令以启动delta-streamer,从 kafka 集群消费数据,采用 MERGE_ON_READ 模式写入到HDFS,表名 stock_ticks_mor

spark-submit \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
  --table-type MERGE_ON_READ \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
  --source-ordering-field ts \
  --target-base-path /user/hive/warehouse/stock_ticks_mor \
  --target-table stock_ticks_mor \
  --props /var/demo/config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --disable-compaction

在这里插入图片描述

3. 查看 hdfs 文件

stock_ticks_cow
stock_ticks_cow以日期分区,分区目录下有一个 元数据文件 和 parquet 格式的数据文件。
在 .hoodle 目录下可以看见 commit 信息。
在这里插入图片描述

stock_ticks_mor
在这里插入图片描述

4. 同步到 Hive 元数据

/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
  --jdbc-url jdbc:hive2://hiveserver:10000 \
  --user hive \
  --pass hive \
  --partitioned-by dt \
  --base-path /user/hive/warehouse/stock_ticks_cow \
  --database default \
  --table stock_ticks_cow

/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
  --jdbc-url jdbc:hive2://hiveserver:10000 \
  --user hive \
  --pass hive \
  --partitioned-by dt \
  --base-path /user/hive/warehouse/stock_ticks_mor \
  --database default \
  --table stock_ticks_mor

4. Spark SQL 查询

进入 spark-shell:

$SPARK_INSTALL/bin/spark-shell \
  --jars $HUDI_SPARK_BUNDLE \
  --master local[2] \
  --driver-class-path $HADOOP_CONF_DIR \
  --conf spark.sql.hive.convertMetastoreParquet=false \
  --deploy-mode client \
  --driver-memory 1G \
  --executor-memory 3G \
  --num-executors 1 \
  --packages org.apache.spark:spark-avro_2.11:2.4.4
  • stock_ticks_cow 是 CopyOnWrite 表
  • stock_ticks_mor_ro 是 MergeOnRead 表,用于读优化查询
  • stock_ticks_mor_rt 是 MergeOnRead 表,用于快照查询
Spark context Web UI available at http://adhoc-2:4040
Spark context available as 'sc' (master = local[2], app id = local-1644547729231).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show tables").show(100, false)
+--------+------------------+-----------+
|database|tableName         |isTemporary|
+--------+------------------+-----------+
|default |stock_ticks_cow   |false      |
|default |stock_ticks_mor_ro|false      |
|default |stock_ticks_mor_rt|false      |
+--------+------------------+-----------+

## Run max timestamp query against COW table
scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts)            |
+------+-------------------+
|GOOG  |2018-08-31 10:29:00|
+------+-------------------+

## Projection Query
scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022538859  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211022538859  |GOOG  |2018-08-31 10:29:00|3391  |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

# Merge-On-Read Queries:
# Run ReadOptimized Query. Notice that the latest timestamp is 10:29
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts)            |
+------+-------------------+
|GOOG  |2018-08-31 10:29:00|
+------+-------------------+

# Run Snapshot Query. Notice that the latest timestamp is again 10:29
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts)            |
+------+-------------------+
|GOOG  |2018-08-31 10:29:00|
+------+-------------------+

# Run Read Optimized and Snapshot project queries
scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_ro where  symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022707523  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211022707523  |GOOG  |2018-08-31 10:29:00|3391  |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022707523  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211022707523  |GOOG  |2018-08-31 10:29:00|3391  |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

5. 写入第二批数据到 kafka

退出 docker 容器,在 Ubuntu 命令行执行

cat docker/demo/data/batch_2.json | kafkacat -b kafkabroker -t stock_ticks -P

在这里插入图片描述

6. 进入容器 adhoc-2,执行 spark-submit 写入第二批数据到 Hudi 表

进入容器 adhoc-2

sudo docker exec -it adhoc-2 /bin/bash

第二批数据到 Hudi CopyOnWrite 表

spark-submit \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
  --table-type COPY_ON_WRITE \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
  --source-ordering-field ts \
  --target-base-path /user/hive/warehouse/stock_ticks_cow \
  --target-table stock_ticks_cow \
  --props /var/demo/config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

查看 hdfs 目录:

hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_cow

在这里插入图片描述

第二批数据到 Hudi MergeOnRead 表

spark-submit \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
  --table-type MERGE_ON_READ \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
  --source-ordering-field ts \
  --target-base-path /user/hive/warehouse/stock_ticks_mor \
  --target-table stock_ticks_mor \
  --props /var/demo/config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --disable-compaction

查看 hdfs 目录:

hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_mor

在这里插入图片描述

7. Spark SQL 查询

进入 spark-shell:

$SPARK_INSTALL/bin/spark-shell \
  --jars $HUDI_SPARK_BUNDLE \
  --master local[2] \
  --driver-class-path $HADOOP_CONF_DIR \
  --conf spark.sql.hive.convertMetastoreParquet=false \
  --deploy-mode client \
  --driver-memory 1G \
  --executor-memory 3G \
  --num-executors 1 \
  --packages org.apache.spark:spark-avro_2.11:2.4.4
Spark context Web UI available at http://adhoc-2:4040
Spark context available as 'sc' (master = local[2], app id = local-1644571477181).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

# 1. 快照方式查询 CopyOnWrite 表
scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts)            |
+------+-------------------+
|GOOG  |2018-08-31 10:59:00|
+------+-------------------+

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow where  symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022538859  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211064632375  |GOOG  |2018-08-31 10:59:00|9021  |1227.1993|1227.215|
+-------------------+------+-------------------+------+---------+--------+

# 2. 增量方式查询 CopyOnWrite 表
scala> import org.apache.hudi.DataSourceReadOptions
scala> val hoodieIncViewDF =  spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220211064632000").load("/user/hive/warehouse/stock_ticks_cow")

scala> hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_cow_incr_tmp1 where  symbol = 'GOOG'").show(100, false);
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211064632375  |GOOG  |2018-08-31 10:59:00|9021  |1227.1993|1227.215|
+-------------------+------+-------------------+------+---------+--------+


# 3. 读优化方式查询 MergeOnRead 表
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts)            |
+------+-------------------+
|GOOG  |2018-08-31 10:29:00|
+------+-------------------+


scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_ro where  symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022538859  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211022538859  |GOOG  |2018-08-31 10:29:00|3391  |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

# 4. 快照方式查询 MergeOnRead 表
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts)            |
+------+-------------------+
|GOOG  |2018-08-31 10:59:00|
+------+-------------------+


scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_rt where  symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022538859  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211064632375  |GOOG  |2018-08-31 10:59:00|9021  |1227.1993|1227.215|
+-------------------+------+-------------------+------+---------+--------+

# 5. 增量方式查询 MergeOnRead 表
scala> val hoodieIncViewDF =  spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220211064632000").load("/user/hive/warehouse/stock_ticks_mor")


scala> hoodieIncViewDF.registerTempTable("stock_ticks_mor_incr_tmp1")


scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close  from stock_ticks_mor_incr_tmp1 where  symbol = 'GOOG'").show(100, false);
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211064632375  |GOOG  |2018-08-31 10:59:00|9021  |1227.1993|1227.215|
+-------------------+------+-------------------+------+---------+--------+

其中,对于 MergeOnRead 表,读优化查询和快照查询得到的结果是不一样的。
读优化查询:

+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022538859  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211022538859  |GOOG  |2018-08-31 10:29:00|3391  |1230.1899|1230.085|   <<<<<<<<<<<<<<<<<<
+-------------------+------+-------------------+------+---------+--------+

快照查询:

+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts                 |volume|open     |close   |
+-------------------+------+-------------------+------+---------+--------+
|20220211022538859  |GOOG  |2018-08-31 09:59:00|6330  |1230.5   |1230.02 |
|20220211064632375  |GOOG  |2018-08-31 10:59:00|9021  |1227.1993|1227.215|    <<<<<<<<<<<<<<<<<<
+-------------------+------+-------------------+------+---------+--------+

由此可以看出读优化查询与快照查询的区别。



这篇关于基于 Ubuntu 玩转 Hudi Docker Demo (3)—— Spark写入和查询的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程