这是用户在 2024-3-22 17:39 为 https://kyuubi.readthedocs.io/en/v1.9.0/deployment/spark/aqe.html#dynamically-setting 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?
🦊 Welcome to Kyuubi’s online documentation ✨, v1.9.0
🦊 欢迎使用 Kyuubi 的在线文档 ✨,v1.9.0

How To Use Spark Adaptive Query Execution (AQE) in Kyuubi#
如何在 Kyuubi 中使用 Spark 自适应查询执行 (AQE) #

The Basics of AQE#
AQE 的基础知识 #

Spark Adaptive Query Execution (AQE) is a query re-optimization that occurs during query execution.
Spark 自适应查询执行 (AQE) 是在查询执行期间发生的查询重新优化。

In terms of technical architecture, the AQE is a framework of dynamic planning and replanning of queries based on runtime statistics, which supports a variety of optimizations such as,
从技术架构上来说,AQE是一个基于运行时统计动态规划和重新规划查询的框架,支持多种优化,例如:

  • Dynamically Switch Join Strategies
    动态切换连接策略

  • Dynamically Coalesce Shuffle Partitions
    动态合并随机分区

  • Dynamically Handle Skew Joins
    动态处理倾斜连接

In Kyuubi, we strongly recommended that you turn on all capabilities of AQE by default for Kyuubi engines, no matter on what platform you run Kyuubi and Spark.
在 Kyuubi 中,我们强烈建议您默认为 Kyuubi 引擎打开 AQE 的所有功能,无论您在什么平台上运行 Kyuubi 和 Spark。

Dynamically Switch Join Strategies#
动态切换连接策略 #

Spark supports several join strategies, among which BroadcastHash Join is usually the most performant when any join side fits well in memory. And for this reason, Spark plans a BroadcastHash Join if the estimated size of a join relation is less than the spark.sql.autoBroadcastJoinThreshold.
Spark 支持多种连接策略,其中当任何连接端都适合内存时, BroadcastHash Join 通常是性能最高的。因此,如果连接关系的估计大小小于 spark.sql.autoBroadcastJoinThreshold ,Spark 会计划 BroadcastHash Join

spark.sql.autoBroadcastJoinThreshold=10M

Without AQE, the estimated size of join relations comes from the statistics of the original table. It can go wrong in most real-world cases. For example, the join relation is a convergent but composite operation rather than a single table scan. In this case, Spark might not be able to switch the join-strategy to BroadcastHash Join. While with AQE, we can runtime calculate the size of the composite operation accurately. And then, Spark now can replan the join strategy unmistakably if the size fits spark.sql.autoBroadcastJoinThreshold
如果没有AQE,连接关系的估计大小来自于原始表的统计数据。在大多数现实情况下,它可能会出错。例如,连接关系是一种收敛但复合的操作,而不是单个表扫描。在这种情况下,Spark 可能无法将连接策略切换为 BroadcastHash Join 。而使用AQE,我们可以在运行时准确计算复合操作的大小。然后,如果大小适合 spark.sql.autoBroadcastJoinThreshold ,Spark 现在可以明确地重新规划连接策略

../../_images/aqe_switch_join.png

[2] From Databricks Blog
[2] 来自 Databricks 博客

What’s more, when spark.sql.adaptive.localShuffleReader.enabled=true and after converting SortMerge Join to BroadcastHash Join, Spark also does future optimize to reduce the network traffic by converting a regular shuffle to a localized shuffle.
更重要的是,当 spark.sql.adaptive.localShuffleReader.enabled=true 以及将 SortMerge Join 转换为 BroadcastHash Join 后,Spark 还会进行未来优化,通过将常规 shuffle 转换为局部 shuffle 来减少网络流量。

../../_images/localshufflereader.png

As shown in the above fig, the local shuffle reader can read all necessary shuffle files from its local storage, actually without performing the shuffle across the network.
如上图所示,本地shuffle reader可以从其本地存储读取所有必需的shuffle文件,实际上无需通过网络执行shuffle。

The local shuffle reader optimization consists of avoiding shuffle when the SortMerge Join transforms to BroadcastHash Join after applying the AQE rules.
本地随机读取器优化包括在应用 AQE 规则后 SortMerge Join 转换为 BroadcastHash Join 时避免随机播放。

Dynamically Coalesce Shuffle Partitions#
动态合并随机分区 #

Without this feature, Spark itself could be a small files maker sometimes, especially in a pure SQL way like Kyuubi does, for example,
如果没有这个功能,Spark 本身有时可能会成为一个小文件生成器,尤其是像 Kyuubi 这样的纯 SQL 方式,例如,

  1. When spark.sql.shuffle.partitions is set too large compared to the total output size, there comes very small or empty files after a shuffle stage.
    spark.sql.shuffle.partitions 与总输出大小相比设置太大时,在洗牌阶段后会出现非常小的文件或空文件。

  2. When Spark performs a series of optimized BroadcastHash Join and Union together, the final output size for each partition might be reduced by the join conditions. However, the total final output file numbers get to explode.
    当 Spark 一起执行一系列优化的 BroadcastHash JoinUnion 时,每个分区的最终输出大小可能会因连接条件而减少。然而,最终输出文件的总数却呈爆炸式增长。

  3. Some pipeline jobs with selective filters to produce temporary data.
    一些管道作业具有选择性过滤器来生成临时数据。

  4. e.t.c ETC

Reading small files leads to very small partitions or tasks. Spark tasks will have worse I/O throughput and tend to suffer more from scheduling overhead and task setup overhead.
读取小文件会导致非常小的分区或任务。 Spark 任务的 I/O 吞吐量会更差,并且往往会遭受更多的调度开销和任务设置开销。

../../_images/blog-adaptive-query-execution-2.png

[2] From Databricks Blog
[2] 来自 Databricks 博客

Combining small partitions saves resources and improves cluster throughput. Spark provides several ways to handle small file issues, for example, adding an extra shuffle operation on the partition columns with the distribute by clause or using HINT[5]. In most scenarios, you need to have a good grasp of your data, Spark jobs, and configurations to apply these solutions case by case. Mostly, the daily used config - spark.sql.shuffle.partitions is data-dependent and unchangeable with a single Spark SQL query. For real-life Spark jobs with multiple stages, it’ impossible to use it as one size to fit all.
组合小分区可以节省资源并提高集群吞吐量。 Spark提供了多种方法来处理小文件问题,例如使用 distribute by 子句或使用 HINT 在分区列上添加额外的shuffle操作[5]。在大多数场景中,您需要充分掌握自己的数据、Spark 作业和配置,才能根据具体情况应用这些解决方案。大多数情况下,日常使用的配置 - spark.sql.shuffle.partitions 是数据相关的,并且无法通过单个 Spark SQL 查询进行更改。对于现实生活中具有多个阶段的 Spark 作业,不可能将其用作一种尺寸以适应所有情况。

But with AQE, things become more comfortable for you as Spark will do the partition coalescing automatically.
但使用 AQE,事情会变得更舒服,因为 Spark 会自动进行分区合并。

../../_images/blog-adaptive-query-execution-3.png

[2] From Databricks Blog
[2] 来自 Databricks 博客

It can simplify the tuning of shuffle partition numbers when running Spark SQL queries. You do not need to set a proper shuffle partition number to fit your dataset.
它可以简化运行 Spark SQL 查询时对 shuffle 分区数量的调整。您不需要设置适当的随机分区编号来适合您的数据集。

To enable this feature, we need to set the below two configs to true.
要启用此功能,我们需要将以下两个配置设置为 true。

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true

Other Tips for Best Practises#
其他最佳实践技巧#

For further tuning our Spark jobs with this feature, we also need to be aware of these configs.
为了使用此功能进一步调整 Spark 作业,我们还需要了解这些配置。

spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=200
How to set spark.sql.adaptive.advisoryPartitionSizeInBytes?#
如何设置 spark.sql.adaptive.advisoryPartitionSizeInBytes ? #

It stands for the advisory size in bytes of the shuffle partition during adaptive query execution, which takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition. The default value of spark.sql.adaptive.advisoryPartitionSizeInBytes is 64M. Typically, if we are reading and writing data with HDFS, matching it with the block size of HDFS should be the best choice, i.e. 128MB or 256MB.
它代表自适应查询执行期间 shuffle 分区的建议大小(以字节为单位),该大小在 Spark 合并小型 shuffle 分区或拆分倾斜 shuffle 分区时生效。 spark.sql.adaptive.advisoryPartitionSizeInBytes 的默认值为64M。通常,如果我们使用HDFS读写数据,与HDFS的块大小匹配应该是最佳选择,即128MB或256MB。

Consequently, all blocks or partitions in Spark and files in HDFS are chopped up to 128MB/256MB chunks. And think about it, now all tasks for scans, sinks, and middle shuffle maps are dealing with mostly even-sized data partitions. It will make us much easier to set up executor resources or even one size to fit all.
因此,Spark 中的所有块或分区以及 HDFS 中的文件都被切成 128MB/256MB 的块。想想看,现在扫描、接收器和中间洗牌映射的所有任务都在处理大部分均匀大小的数据分区。这将使我们更容易设置执行器资源,甚至一种尺寸来适应所有资源。

How to set spark.sql.adaptive.coalescePartitions.minPartitionNum?#
如何设置 spark.sql.adaptive.coalescePartitions.minPartitionNum ? #

It stands for the suggested (not guaranteed) minimum number of shuffle partitions after coalescing. If not set, the default value is the default parallelism of the Spark application. The default parallelism is defined by spark.default.parallelism or else the total count of cores registered. I guess the motivation of this behavior made by the Spark community is to maximize the use of the resources and concurrency of the application.
它代表合并后建议(不保证)的最小洗牌分区数。如果未设置,则默认值为 Spark 应用程序的默认并行度。默认并行度由 spark.default.parallelism 或注册的核心总数定义。我猜Spark社区做出这种行为的动机是为了最大限度地利用应用程序的资源和并发性。

But there are always exceptions. Relating these two seemingly unrelated parameters can be somehow tricky for users. This config is optional by default which means users may not touch it in most real-world cases. But spark.default.parallelism has a long history and is well known then. If users set the default parallelism to an illegitimate high value unexpectedly, it could block AQE from coalescing partitions to a fair number. Another scenario that requires special attention is writing data. Usually, coalescing partitions to avoid small file issues is more critical than task concurrency for final output stages. A better data layout can benefit plenty of downstream jobs. I suggest that we set spark.sql.adaptive.coalescePartitions.minPartitionNum to 1 in this case as Spark will try its best to but not guaranteed to coalesce partitions for output.
但总有例外。对于用户来说,关联这两个看似无关的参数可能有些棘手。默认情况下,此配置是可选的,这意味着用户在大多数实际情况下可能不会触及它。但 spark.default.parallelism 历史悠久,当时已广为人知。如果用户意外地将默认并行度设置为非法的高值,则可能会阻止 AQE 将分区合并到合理的数量。另一个需要特别注意的场景是写入数据。通常,合并分区以避免小文件问题比最终输出阶段的任务并发性更重要。更好的数据布局可以使大量下游工作受益。我建议在这种情况下将 spark.sql.adaptive.coalescePartitions.minPartitionNum 设置为 1,因为 Spark 将尽力但不保证合并分区以进行输出。

How to set spark.sql.adaptive.coalescePartitions.initialPartitionNum?#
如何设置 spark.sql.adaptive.coalescePartitions.initialPartitionNum ? #

It stands for the initial number of shuffle partitions before coalescing. By default, it equals to spark.sql.shuffle.partitions(200). Firstly, it’s better to set it explicitly rather than falling back to spark.sql.shuffle.partitions. Spark community suggests set a large number to it as Spark will dynamically coalesce shuffle partitions, which I cannot agree more.
它代表合并之前 shuffle 分区的初始数量。默认情况下,它等于 spark.sql.shuffle.partitions(200) 。首先,最好明确地设置它,而不是退回到 spark.sql.shuffle.partitions 。 Spark 社区建议设置一个较大的数字,因为 Spark 会动态合并 shuffle 分区,我对此非常同意。

Dynamically Handle Skew Joins#
动态处理倾斜连接 #

Without AQE, the data skewness is very likely to occur for map-reduce computing models in the shuffle phase. Data skewness can cause Spark jobs to have one or more tailing tasks, severely downgrading queries’ performance. This feature dynamically handles skew in SortMerge Join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. For example, The optimization will split oversized partitions into subpartitions and join them to the other join side’s corresponding partition.
如果没有AQE,map-reduce计算模型在shuffle阶段很可能出现数据倾斜。数据倾斜可能会导致 Spark 作业出现一项或多项尾随任务,从而严重降低查询的性能。此功能通过将倾斜任务拆分(并根据需要进行复制)为大小大致均匀的任务,动态处理 SortMerge Join 中的倾斜。例如,优化会将超大分区拆分为子分区,并将它们连接到另一连接端对应的分区。

../../_images/blog-adaptive-query-execution-6.png

[2] From Databricks Blog
[2] 来自 Databricks 博客

To enable this feature, we need to set the below two configs to true.
要启用此功能,我们需要将以下两个配置设置为 true。

spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true

Other Tips for Best Practises#
其他最佳实践技巧#

For further tuning our Spark jobs with this feature, we also need to be aware of these configs.
为了使用此功能进一步调整 Spark 作业,我们还需要了解这些配置。

spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256M
spark.sql.adaptive.advisoryPartitionSizeInBytes=64M
How to set spark.sql.adaptive.skewJoin.skewedPartitionFactor and skewedPartitionThresholdInBytes?#
如何设置 spark.sql.adaptive.skewJoin.skewedPartitionFactorskewedPartitionThresholdInBytes ? #

Spark uses these two configs and the median(not average) partition size to detect whether a partition skew or not.
Spark 使用这两个配置和分区大小中值(而非平均)来检测分区是否倾斜。

partition size > skewedPartitionFactor * the median partition size && \
skewedPartitionThresholdInBytes

As Spark splits skewed partitions targeting spark.sql.adaptive.advisoryPartitionSizeInBytes, ideally skewedPartitionThresholdInBytes should be larger than advisoryPartitionSizeInBytes. In this case, anytime you increase advisoryPartitionSizeInBytes, you should also increase skewedPartitionThresholdInBytes if you tend to enable the feature.
由于 Spark 以 Spark.sql.adaptive.advisoryPartitionSizeInBytes 为目标分割倾斜分区,理想情况下 skewedPartitionThresholdInBytes 应大于 advisoryPartitionSizeInBytes 。在这种情况下,只要您增加 advisoryPartitionSizeInBytes ,如果您倾向于启用该功能,也应该增加 skewedPartitionThresholdInBytes

Hidden Features# 隐藏功能#

DemoteBroadcastHashJoin#

Internally, Spark has an optimization rule that detects a join child with a high ratio of empty partitions and adds a no-broadcast-hash-join hint to avoid broadcasting it.
在内部,Spark 有一个优化规则,可以检测空分区比例较高的连接子节点,并添加 no-broadcast-hash-join 提示以避免广播它。

spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2

By default, if there are only less than 20% partitions of the dataset contain data, Spark will not broadcast the dataset.
默认情况下,如果数据集只有不到 20% 的分区包含数据,Spark 将不会广播该数据集。

EliminateJoinToEmptyRelation#
消除JoinToEmptyRelation #

This optimization rule detects and converts a Join to an empty LocalRelation.
此优化规则检测 Join 并将其转换为空 LocalRelation。

Disabling the Hidden Features#
禁用隐藏功能#

We can exclude some of the AQE additional rules if performance regression or bug occurs. For example,
如果出现性能回归或错误,我们可以排除一些 AQE 附加规则。例如,

SET spark.sql.adaptive.optimizer.excludedRules=org.apache.spark.sql.execution.adaptive.DemoteBroadcastHashJoin

Best Practices for Applying AQE to Kyuubi#
将 AQE 应用于 Kyuubi 的最佳实践 #

Kyuubi is a long-running service to make it easier for end-users to use Spark SQL without having much of Spark’s basic knowledge. It is essential to have a basic configuration that works for most scenarios on the server-side.
Kyuubi 是一项长期运行的服务,可让最终用户在不了解 Spark 基础知识的情况下更轻松地使用 Spark SQL。拥有适用于服务器端大多数场景的基本配置至关重要。

Setting Default Configurations#
设置默认配置 #

Configuring by spark-defaults.conf at the engine side is the best way to set up Kyuubi with AQE. All engines will be instantiated with AQE enabled.
在引擎端通过 spark-defaults.conf 进行配置是使用 AQE 设置 Kyuubi 的最佳方式。所有引擎都将在启用 AQE 的情况下实例化。

Here is a config setting that we use in our platform when deploying Kyuubi.
这是我们在部署 Kyuubi 时在平台中使用的配置设置。

spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.adaptive.logLevel=info
spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=8192
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=400m
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.adaptive.optimizer.excludedRules
spark.sql.autoBroadcastJoinThreshold=-1

Tips# 尖端 #

Turn on AQE by default can significantly improve the user experience. Other sub-features are all enabled. advisoryPartitionSizeInBytes is targeting the HDFS block size minPartitionNum is set to 1 for the reason of coalescing first. initialPartitionNum has a high value. Since AQE requires at least one shuffle, ideally, we need to set autoBroadcastJoinThreshold to -1 to involving SortMerge Join with a shuffle for all user queries with joins. But then, the Dynamically Switch Join Strategies feature seems can not be applied later in this case. It appears to be a typo limitation of Spark AQE so far.
默认开启AQE可以显着提升用户体验。其他子功能均已启用。 advisoryPartitionSizeInBytes 的目标是 HDFS 块大小 minPartitionNum 由于首先合并而设置为 1。 initialPartitionNum 具有很高的价值。由于 AQE 至少需要一次洗牌,理想情况下,我们需要将 autoBroadcastJoinThreshold 设置为 -1,以便对所有带有连接的用户查询进行洗牌,使 SortMerge Join 参与其中。但是,动态切换连接策略功能似乎无法在以后的情况下应用。到目前为止,这似乎是 Spark AQE 的拼写错误限制。

Dynamically Setting# 动态设置#

All AQE related configurations are runtime changeable, which means that it can still modify some specific configs by SET syntaxes for each SQL query with more precise control on the client-side.
所有 AQE 相关配置都是运行时可更改的,这意味着它仍然可以通过 SET 语法为每个 SQL 查询修改某些特定配置,并在客户端进行更精确的控制。

Spark Known issues# Spark 已知问题#

SPARK-33933: Broadcast timeout happened unexpectedly in AQE
SPARK-33933:AQE 中意外发生广播超时

For Spark versions(<3.1), we need to increase spark.sql.broadcastTimeout(300s) higher even the broadcast relation is tiny.
对于 Spark 版本(<3.1),即使广播关系很小,我们也需要提高 spark.sql.broadcastTimeout(300s)

For other potential problems that may be found in the AQE features of Spark, you may refer to SPARK-33828: SQL Adaptive Query Execution QA.
对于Spark AQE功能中可能发现的其他潜在问题,您可以参考SPARK-33828:SQL自适应查询执行QA。

References# 参考 #

  1. Adaptive Query Execution
    自适应查询执行

  2. Adaptive Query Execution: Speeding Up Spark SQL at Runtime
    自适应查询执行:在运行时加速 Spark SQL

  3. SPARK-31412: New Adaptive Query Execution in Spark SQL
    SPARK-31412:Spark SQL 中新的自适应查询执行

  4. SPARK-28560: Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
    SPARK-28560:当 smj 在自适应执行中转换为 bhj 时,将 shuffle reader 优化为本地 shuffle reader

  5. Coalesce and Repartition Hint for SQL Queries
    SQL 查询的合并和重新分区提示