处理交互式工作流中的大型查询

交互式数据工作流面临的挑战是处理大型查询。 这包括生成过多输出行、提取许多外部分区或针对极大型数据集进行计算的查询。 这些查询可能非常缓慢、使计算资源饱和,并使其他人难以共享相同的计算。

查询监视器是一个过程,它通过检查大型查询的最常见原因和终止通过阈值的查询来防止查询垄断计算资源。 本文介绍如何启用和配置查询监视器。

重要

查询监视器已为使用 UI 创建的所有通用计算实例启用。

破坏性查询的示例

分析师正在实时数据仓库中执行一些临时查询。 分析师使用共享自动缩放计算,使多个用户能够同时轻松使用单个计算。 假设有两个表,每个表有一百万行。

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

这些表大小在 Apache Spark 中可管理。 不过,它们每个都包含一个join_key列,每一行中都是空字符串。 如果数据不完全干净,或者某些键比其他键更为普遍,则可能会出现这种情况。 这些空联接键比任何其他值都更为普遍。

在以下代码中,分析师在其键上联接这两个表,这将生成 一万亿个结果的输出,所有这些结果在单个执行程序(获取 " " 密钥的执行程序)上生成:

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

此查询似乎正在运行。 但是,在不了解数据的情况下,分析人员发现,在整个作业执行过程中“只”剩下了一项任务。 查询从未完成,这让分析师对为什么它不起作用感到沮丧和困惑。

在这种情况下,只有一个有问题的联接密钥。 其他时候可能还有更多。

启用和配置查询监视器

若要启用和配置查询监视器,需要执行以下步骤。

  • 使用 spark.databricks.queryWatchdog.enabled. 启用 Watchdog
  • 使用 spark.databricks.queryWatchdog.minTimeSecs. 配置任务运行时。
  • 使用 spark.databricks.queryWatchdog.minOutputRows 显示输出。
  • 使用 spark.databricks.queryWatchdog.outputRatioThreshold. 配置输出比率。

若要防止查询为输入行数创建过多的输出行,可以启用查询监视程序,并将输出行的最大数目配置为输入行数的倍数。 在此示例中,我们使用的比率为 1000(默认值)。

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

后一种配置声明,任何给定任务不应生成输入行数超过 1000 倍。

小窍门

输出比率是完全可自定义的。 我们建议开始较低,并查看哪些阈值适用于你和你的团队。 1,000 到 10,000 的范围是一个很好的起点。

查询监视程序不仅阻止用户垄断因无法完成的查询而浪费计算资源,还通过迅速终止不可能完成的查询来节省时间。 例如,以下查询将在几分钟后失败,因为它超出了比率。

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

下面是你将看到的内容:

查询监视器

它通常足以启用查询监视器并设置输出/输入阈值比率,但还可以选择设置两个附加属性: spark.databricks.queryWatchdog.minTimeSecsspark.databricks.queryWatchdog.minOutputRows。 这些属性指定查询中给定任务在取消它之前必须运行的最短时间,以及该查询中某个任务的最小输出行数。

例如,如果希望为每个任务生成大量行,则可以将 minTimeSecs 设置为更高的值。 同样,如果要在查询中的任务生成 1000 万行之后才停止查询,则可以将 spark.databricks.queryWatchdog.minOutputRows 设置为 1000 万。 如果条件较低,查询将成功,即使输出/输入比率已超出也是如此。

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

小窍门

如果在笔记本中配置查询监视器,则配置不会在计算重启时保留。 如果要为计算的所有用户配置 Query Watchdog,建议使用 计算配置

检测对超大型数据集的查询

另一个典型的大型查询可能会扫描来自大型表/数据集的大量数据。 扫描操作可能持续很长时间,并占用计算资源(即使是读取大型 Hive 表的元数据也可能需要较长时间)。 可以设置maxHivePartitions以防止从大型Hive表中提取过多分区。 同样,还可以设置为 maxQueryTasks 限制对极大型数据集的查询。

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

何时应启用查询监视程序?

应为即席分析计算启用查询监视程序,其中 SQL 分析师和数据科学家正在共享给定的计算,管理员需要确保查询“很好地”相互配合。

何时应禁用查询监视器?

一般情况下,我们不建议急切地取消 ETL 方案中使用的查询,因为循环中通常没有人工来更正错误。 建议禁用查询监视机制,仅在即席分析计算时启用。