Spark聚合优化:我们如何通过解决GroupBy性能瓶颈和避免使用spark EXPAND命令,将运行时间从4小时缩减至40分钟。

2025/1/3 21:04:04

本文主要是介绍Spark聚合优化:我们如何通过解决GroupBy性能瓶颈和避免使用spark EXPAND命令,将运行时间从4小时缩减至40分钟。,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

处理大规模数据集的高效处理是大数据处理中至关重要的任务,但这并不罕见遇到性能瓶颈。本文详细介绍了我们如何优化一个Spark作业,该作业输入大约1000亿条记录,在运行了4个多小时后因查询执行效率低下而失败。通过识别并解决EXPAND操作的使用问题,我们将运行时间缩短到了仅仅40分钟。

看来看这个问题

我们的Spark作业涉及聚合用户活动数据进行分析。输入大约包含100亿条记录,查询被设计为计算包含COUNTCOUNT DISTINCT操作的多个指标:COUNTCOUNT DISTINCT分别是计数和计算唯一值的SQL函数。

    COUNT(CASE WHEN random_id IN (1,3) AND random_flag = 'true' THEN random_id_2 END) AS random_metric_1,  
    COUNT(DISTINCT CASE WHEN random_id IN (1,3) AND random_flag = 'true' THEN random_id_2 END) AS random_metric_2,  
    COUNT(CASE WHEN random_id IN (2,4,5) THEN random_id_2 END) AS random_metric_3,  
    COUNT(DISTINCT CASE WHEN random_id IN (2,4,5) THEN random_id_2 END) AS random_metric_4  
    FROM random.table  
    GROUP BY 1,2,3,4,5,6,7,8,9;

这个查询根据多个列对记录进行了分组,并使用条件 COUNTCOUNT DISTINCT 计算了多个度量。然而,Spark 任务运行得非常慢,并在运行了超过 4 小时之后最终失败了。当我们分析 Spark UI 时,发现问题是由于一个叫 EXPAND 的操作导致的。

Spark UI 分析

当我们查看Spark UI中的SQL选项卡时,我们发现:

  1. 扩展操作:Spark 的执行计划引入了一个扩展命令(EXPAND),这大大增加了处理的记录数量。虽然 GROUP BY 操作的输入是 730 亿条记录,但 EXPAND 命令生成了 3000 亿条记录。
  2. 主要原因:使用 COUNT DISTINCT 表达式迫使 Spark 对记录进行类似于笛卡尔积的扩展来计算不同的计数。这种不必要的数据膨胀是造成运行缓慢的主要原因。
Spark 扩展命令

EXPAND命令具体是什么,Spark在什么情况下会使用它?

EXPAND——是 Spark 中一个相对不那么广为人知的操作,在特定情况下使用。为了更好地理解它的功能,我们深入研究了 Spark 的源代码,以更好地理解它的功能。

扩展: 此操作会把所有的组表达式用在每一行输入上,从而从单行输入生成多行输出。

当查询涉及 GROUPING SETSCUBEROLLUP ,或者在 GROUP BY 子句中使用多个 COUNT DISTINCT 聚合函数时,Spark 会触发 EXPAND 命令。

解决

为了优化查询,我们重构了逻辑,尽可能地移除了 COUNT DISTINCT。相反,我们使用条件表达式预先汇总了不同的值。例如:

优化前的状态:
    COUNT(DISTINCT CASE WHEN random_id IN (1,3) AND random_flag= 'true' THEN random_id_2 END) AS random_metric

此 SQL 代码片段用于计算满足特定条件的唯一 random_id_2 值的数量。具体来说,它会选择 random_id 为 1 或 3 并且 random_flag 为 'true' 的记录,并统计这些记录中 random_id_2 的唯一值。

优化后:

计算当 random_id 为 1 或 3 且 random_flag 为 'true' 时的最大值,结果为 1 或者 0,最后命名为 lcp_unique_visitor

MAX(CASE WHEN random_id IN (1,3) AND random_flag = 'true' THEN 1 ELSE 0 END) AS lcp_unique_visitor

这种方法确保在聚合前,不同值已在记录级别上解决。通过将此逻辑应用于所有 COUNT DISTINCT 表达式,我们简化了统计,避免了不必要的展开操作。

结果如下
  1. 执行计划更改:优化后,Spark的执行计划不再包含EXPAND算子。
  2. 运行时改进:之前耗时超过4小时未能完成的Spark作业现在仅用40分钟就成功完成了。
  3. 资源利用率:在移除了EXPAND瓶颈后,集群的资源使用效率显著提升,减少了内存和计算资源的消耗。
关键要点如下
  1. 避免在聚合函数中使用 COUNT DISTINCT: 尽可能用条件聚合如 MAXSUM 替换 COUNT DISTINCT 以简化执行计划的复杂性。
  2. 理解 Spark 的执行计划 :通过 Spark UI 分析物理计划,并识别可能引起效率低下的操作,例如 EXPAND。
  3. 预聚合数据 :如果条件允许,预聚合数据以减少 GROUP BY 操作中的计算负担。
  4. 迭代和测试 :逐步进行优化,并重新运行较小的查询来验证执行计划和性能的变化。
结论

通过分析Spark UI界面并重构我们的查询逻辑,我们将一个失败的任务成功改造成高效的。取消了EXPAND操作并用预先聚合的条件逻辑替换COUNT DISTINCT被证明是最关键的优化方法。这些策略对任何大型的Spark任务都适用,特别是在性能因为类似的问题而受影响的地方。

高效地处理Spark不仅需要理解执行计划,还需要编写查询。当你处理大规模数据集时,时刻关注Spark UI中的潜在瓶颈——不要害怕调整你的方法!



这篇关于Spark聚合优化:我们如何通过解决GroupBy性能瓶颈和避免使用spark EXPAND命令,将运行时间从4小时缩减至40分钟。的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程