Spark聚合优化:我们如何通过解决GroupBy性能瓶颈和避免使用spark EXPAND命令,将运行时间从4小时缩减至40分钟。
2025/1/3 21:04:04
本文主要是介绍Spark聚合优化:我们如何通过解决GroupBy性能瓶颈和避免使用spark EXPAND命令,将运行时间从4小时缩减至40分钟。,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
处理大规模数据集的高效处理是大数据处理中至关重要的任务,但这并不罕见遇到性能瓶颈。本文详细介绍了我们如何优化一个Spark作业,该作业输入大约1000亿条记录,在运行了4个多小时后因查询执行效率低下而失败。通过识别并解决EXPAND操作的使用问题,我们将运行时间缩短到了仅仅40分钟。
我们的Spark作业涉及聚合用户活动数据进行分析。输入大约包含100亿条记录,查询被设计为计算包含COUNT
和COUNT DISTINCT
操作的多个指标:COUNT
和COUNT DISTINCT
分别是计数和计算唯一值的SQL函数。
COUNT(CASE WHEN random_id IN (1,3) AND random_flag = 'true' THEN random_id_2 END) AS random_metric_1, COUNT(DISTINCT CASE WHEN random_id IN (1,3) AND random_flag = 'true' THEN random_id_2 END) AS random_metric_2, COUNT(CASE WHEN random_id IN (2,4,5) THEN random_id_2 END) AS random_metric_3, COUNT(DISTINCT CASE WHEN random_id IN (2,4,5) THEN random_id_2 END) AS random_metric_4 FROM random.table GROUP BY 1,2,3,4,5,6,7,8,9;
这个查询根据多个列对记录进行了分组,并使用条件 COUNT
和 COUNT DISTINCT
计算了多个度量。然而,Spark 任务运行得非常慢,并在运行了超过 4 小时之后最终失败了。当我们分析 Spark UI 时,发现问题是由于一个叫 EXPAND 的操作导致的。
当我们查看Spark UI中的SQL选项卡时,我们发现:
- 扩展操作:Spark 的执行计划引入了一个扩展命令(EXPAND),这大大增加了处理的记录数量。虽然
GROUP BY
操作的输入是 730 亿条记录,但 EXPAND 命令生成了 3000 亿条记录。 - 主要原因:使用
COUNT DISTINCT
表达式迫使 Spark 对记录进行类似于笛卡尔积的扩展来计算不同的计数。这种不必要的数据膨胀是造成运行缓慢的主要原因。
EXPAND命令具体是什么,Spark在什么情况下会使用它?
EXPAND——是 Spark 中一个相对不那么广为人知的操作,在特定情况下使用。为了更好地理解它的功能,我们深入研究了 Spark 的源代码,以更好地理解它的功能。
扩展: 此操作会把所有的组表达式用在每一行输入上,从而从单行输入生成多行输出。
当查询涉及 GROUPING SETS 、CUBE 或 ROLLUP ,或者在 GROUP BY 子句中使用多个 COUNT DISTINCT 聚合函数时,Spark 会触发 EXPAND 命令。
为了优化查询,我们重构了逻辑,尽可能地移除了 COUNT DISTINCT
。相反,我们使用条件表达式预先汇总了不同的值。例如:
COUNT(DISTINCT CASE WHEN random_id IN (1,3) AND random_flag= 'true' THEN random_id_2 END) AS random_metric
此 SQL 代码片段用于计算满足特定条件的唯一 random_id_2
值的数量。具体来说,它会选择 random_id
为 1 或 3 并且 random_flag
为 'true' 的记录,并统计这些记录中 random_id_2
的唯一值。
计算当 random_id
为 1 或 3 且 random_flag
为 'true' 时的最大值,结果为 1 或者 0,最后命名为 lcp_unique_visitor
MAX(CASE WHEN random_id IN (1,3) AND random_flag = 'true' THEN 1 ELSE 0 END) AS lcp_unique_visitor
这种方法确保在聚合前,不同值已在记录级别上解决。通过将此逻辑应用于所有 COUNT DISTINCT
表达式,我们简化了统计,避免了不必要的展开操作。
- 执行计划更改:优化后,Spark的执行计划不再包含EXPAND算子。
- 运行时改进:之前耗时超过4小时未能完成的Spark作业现在仅用40分钟就成功完成了。
- 资源利用率:在移除了EXPAND瓶颈后,集群的资源使用效率显著提升,减少了内存和计算资源的消耗。
- 避免在聚合函数中使用 COUNT DISTINCT: 尽可能用条件聚合如
MAX
或SUM
替换COUNT DISTINCT
以简化执行计划的复杂性。 - 理解 Spark 的执行计划 :通过 Spark UI 分析物理计划,并识别可能引起效率低下的操作,例如 EXPAND。
- 预聚合数据 :如果条件允许,预聚合数据以减少
GROUP BY
操作中的计算负担。 - 迭代和测试 :逐步进行优化,并重新运行较小的查询来验证执行计划和性能的变化。
通过分析Spark UI界面并重构我们的查询逻辑,我们将一个失败的任务成功改造成高效的。取消了EXPAND操作并用预先聚合的条件逻辑替换COUNT DISTINCT
被证明是最关键的优化方法。这些策略对任何大型的Spark任务都适用,特别是在性能因为类似的问题而受影响的地方。
高效地处理Spark不仅需要理解执行计划,还需要编写查询。当你处理大规模数据集时,时刻关注Spark UI中的潜在瓶颈——不要害怕调整你的方法!
这篇关于Spark聚合优化:我们如何通过解决GroupBy性能瓶颈和避免使用spark EXPAND命令,将运行时间从4小时缩减至40分钟。的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-04从 LB Ingress 到 ZTM:集群服务暴露新思路
- 2025-01-03从入门到精通:AWS认证攻略指南
- 2025-01-03DevOps和平台工程,哪个更适合你?
- 2025-01-03彼得·德西斯在AWS reinvent 2024大会上的精彩演讲要点
- 2025-01-03开源商业化 Sealos 如何做到月入 160万
- 2025-01-03数据仓库、数据湖与湖仓架构:一站式数据处理进化论
- 2025-01-02Fluss 与数据湖的深度解析(二)
- 2025-01-02阿里云部署方案项目实战:新手入门教程
- 2025-01-02阿里云RDS项目实战:新手入门教程
- 2025-01-02阿里云部署方案资料详解:新手入门指南