自适应查询执行(AQE)是在查询执行期间发生的查询重新优化。
运行时重新优化的动机是,Azure Databricks 在随机交换和广播交换(称为 AQE 中的查询阶段)结束时具有最 up-to日期准确的统计信息。 因此,Azure Databricks 可以选择更好的物理策略,选择最佳的随机分区大小和数字,或者执行需要提示的优化,例如倾斜联接处理。
当统计信息收集未打开或统计信息过时时,这非常有用。 在由静态派生的统计信息不准确的地方(例如在复杂查询过程中或者发生数据偏斜之后)也很有用。
能力
默认情况下,AQE 处于启用状态。 它有 4 个主要功能:
- 将排序合并联接动态更改为广播哈希联接。
- 随机交换后,动态合并分区(将小分区合并为合理大小的分区)。 非常小的任务的 I/O 吞吐量更差,并且往往受到计划开销和任务设置开销的影响。 合并小型任务可节省资源并提高群集吞吐量。
- 动态处理排序合并联接和随机哈希联接中的倾斜问题,方法是将倾斜的任务分拆为大致均匀大小的任务,并在需要时进行复制。
- 动态检测并传播空关系。
应用程序
AQE 适用于以下所有查询:
- 非流式
- 包含至少一个交换(通常当有联接、聚合或窗口)、一个子查询或两者。
并非所有应用 AQE 的查询都必须重新优化。 重新优化可能会生成与静态编译不同的查询计划,也可能不会。 若要确定查询的计划是否已由 AQE 更改,请参阅以下部分 “查询计划”。
查询计划
本部分讨论如何以不同方式检查查询计划。
本节内容:
Spark用户界面
AdaptiveSparkPlan
节点
AQE 应用的查询包含一个或多个 AdaptiveSparkPlan
节点,通常作为每个主查询或子查询的根节点。
在查询运行或运行查询之前, isFinalPlan
相应的 AdaptiveSparkPlan
节点的标志将显示为 false
;在查询执行完成后, isFinalPlan
标志将更改为 true.
不断发展的计划
查询计划关系图随着执行进度而演变,并反映正在执行的最新计划。 已执行的节点(指标可用)不会发生变化,而未执行的节点由于重新优化,可以随时间发生变化。
下面是查询计划图示例:
DataFrame.explain()
AdaptiveSparkPlan
节点
AQE 应用的查询包含一个或多个 AdaptiveSparkPlan
节点,通常作为每个主查询或子查询的根节点。 在查询运行或运行查询之前, isFinalPlan
相应的 AdaptiveSparkPlan
节点的标志将显示为 false
;在查询执行完成后, isFinalPlan
标志将更改为 true
。
当前和初始计划
在每个 AdaptiveSparkPlan
节点下,将有初始计划(应用任何 AQE 优化之前的计划)和当前计划或最终计划,具体取决于执行是否已完成。 随着执行进度,当前计划将不断发展。
运行时统计信息
每个洗牌和广播过程都包含数据统计信息。
在阶段开始前或运行时,统计信息是编译时估计值,标志 isRuntime
是 false
,例如:Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
阶段执行完成后,统计信息是在运行时收集的,标志 isRuntime
将变为 true
,例如:Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
下面是一个 DataFrame.explain
示例:
执行前
在执行期间
执行后
SQL EXPLAIN
AdaptiveSparkPlan
节点
AQE 应用的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。
无当前计划
由于 SQL EXPLAIN
不执行查询,当前计划始终与初始计划相同,并不反映 AQE 最终执行的内容。
下面是一个 SQL 说明示例:
有效性
如果一个或多个 AQE 优化生效,查询计划将更改。 这些 AQE 优化的效果通过当前计划和最终计划与初始计划和最终计划中的特定计划节点之间的差异来演示。
将“排序合并联接”动态转换为“广播哈希联接”:当前计划/最终计划与初始计划之间不同的物理联接节点
动态合并分区:节点
CustomShuffleReader
和属性Coalesced
动态处理节点
SortMergeJoin
的倾斜联接,字段isSkew
为 true。动态检测和传播空关系:计划的一部分或全部被节点LocalTableScan替换,其中关系字段为空。
配置
本节内容:
启用和禁用自适应查询执行
资产 |
---|
spark.databricks.optimizer.adaptive.enabled 类型: Boolean 是否启用或禁用自适应查询执行。 默认值: true |
启用自动优化的洗牌
资产 |
---|
spark.sql.shuffle.partitions 类型: Integer 用于联接或聚合时进行数据重排的默认分区数。 设置该值 auto 可启用自动优化的洗牌机制,该机制会根据查询计划和查询输入数据大小自动确定此数字。注意:对于结构化流式处理,从同一检查点位置重启查询时,此配置无法更改。 默认值:200 |
动态地将排序合并联接更改为广播哈希联接
资产 |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold 类型: Byte String 用于在运行时触发切换到广播联接的阈值。 默认值: 30MB |
动态合并分区
资产 |
---|
spark.sql.adaptive.coalescePartitions.enabled 类型: Boolean 是否启用或禁用分区合并。 默认值: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes 类型: Byte String 合并后的目标大小。 合并的分区大小将接近,但不超过此目标大小。 默认值: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize 类型: Byte String 合并后分区的最小大小。 合并的分区大小不会小于此大小。 默认值: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum 类型: Integer 合并后的最小分区数。 不推荐,因为设置会显式覆盖 spark.sql.adaptive.coalescePartitions.minPartitionSize 。默认值:集群核心数的2倍 |
动态处理倾斜联接
资产 |
---|
spark.sql.adaptive.skewJoin.enabled 类型: Boolean 是否启用或禁用倾斜联接功能。 默认值: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor 类型: Integer 一种因子(因素),当与中值分区大小相乘时,有助于确定分区是否存在偏斜。 默认值: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 类型: Byte String 一个阈值,该阈值有助于确定分区是否偏斜。 默认值: 256MB |
当 (partition size > skewedPartitionFactor * median partition size)
和 (partition size > skewedPartitionThresholdInBytes)
都是 true
时,分区被视为偏斜。
动态检测和传播空关系
资产 |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled 类型: Boolean 是否启用或禁用动态空关系传播。 默认值: true |
常见问题 (FAQ)
本节内容:
- 为什么 AQE 没有广播一个小联接表?
- 我是否还需要在启用 AQE 的情况下使用广播连接策略提示?
- 倾斜联接提示与 AQE 倾斜联接优化之间的区别是什么? 我该使用哪种方法?
- 为什么 AQE 不会自动调整我的加入顺序?
- 为什么 AQE 未检测到我的数据倾斜?
为什么 AQE 没有广播一个小联接表?
如果预期要广播的关系大小低于此阈值,但仍不广播:
- 检查联接类型。 某些连接类型不支持广播,例如,
LEFT OUTER JOIN
的左关系无法进行广播。 - 也可能是关系包含大量空分区,这样的话,大多数任务可以通过排序合并连接快速完成,或者可以通过倾斜连接处理进行优化。 如果非空分区的百分比低于
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
,AQE 可以避免将此类排序合并联接更改为广播哈希联接。
启用 AQE 后,我是否仍应使用广播连接策略提示?
是的。 静态计划的广播联接通常比 AQE 动态计划的广播联接性能更好,因为 AQE 在执行联接双方的混排之后(此时才能获得实际的关系大小),可能才切换到广播联接。 因此,如果你对查询非常熟悉,那么使用广播提示仍然是一个不错的选择。 AQE 将遵循与静态优化相同的查询提示,但仍可以应用不受提示影响的动态优化。
倾斜联接提示与 AQE 倾斜联接优化之间的区别是什么? 我该使用哪种方法?
建议依赖于 AQE 倾斜联接处理,而不是使用倾斜联接提示,因为 AQE 倾斜联接是完全自动的,并且通常性能优于提示对应项。
为什么 AQE 不会自动调整我的加入顺序?
动态联接重新排序不是 AQE 的一部分。
为什么 AQE 未检测到我的数据倾斜?
AQE 必须满足两个大小条件才能将分区检测为倾斜分区:
- 分区大小大于
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(默认 256MB) - 分区大小大于所有分区的中值大小乘以偏斜分区因子
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(默认值为5)
此外,某些联接类型的倾斜处理支持受到限制,例如, LEFT OUTER JOIN
仅可以优化左侧的倾斜。
遗产
自 Spark 1.6 以来,“自适应执行”一词已存在,但 Spark 3.0 中的新 AQE 基本不同。 就功能而言,Spark 1.6 仅执行“动态合并分区”部分。 就技术体系结构而言,新的 AQE 是基于运行时统计信息的动态规划和重新规划查询的框架,它支持各种优化,例如本文中所述的优化,并且可以进行扩展以实现更多潜在的优化。