查询历史记录系统表参考

重要

此系统表为公共预览版

本文包含有关查询历史记录系统表的信息,包括表架构的大纲。

表路径:此系统表位于 system.query.history.

使用查询历史记录表

查询历史记录表包含使用 SQL 仓库或无服务器计算在笔记本作业中运行的查询记录。 该表包括来自访问表的同一区域中所有工作区的帐户范围记录。

默认情况下,只有管理员有权访问系统表。 如果要与用户或组共享表的数据,Databricks 建议为每个用户或组创建动态视图。 请参阅创建动态视图

查询历史记录系统表架构

查询历史记录表使用以下架构:

列名称 数据类型 说明 示例
account_id 字符串 帐户的 ID。 11e22ba4-87b9-4cc2
-9770-d10b894b7118
workspace_id 字符串 运行查询的工作区的 ID。 1234567890123456
statement_id 字符串 唯一标识语句执行的 ID。 可以使用此 ID 在“查询历史记录”UI 中查找语句执行。 7a99b43c-b46c-432b
-b0a7-814217701909
session_id 字符串 Spark 会话 ID。 01234567-cr06-a2mp
-t0nd-a14ecfb5a9c2
execution_status 字符串 语句终止状态。 可能的值为:
- FINISHED:执行成功
- FAILED:执行失败,并显示随附错误消息中所述失败的原因
- CANCELED:执行已取消
FINISHED
compute 结构体 一个结构,表示用于运行语句的计算资源的类型,以及资源的 ID(如适用)。 type 值将是 WAREHOUSESERVERLESS_COMPUTE {
type: WAREHOUSE,
cluster_id: NULL,
warehouse_id: ec58ee3772e8d305
}
executed_by_user_id 字符串 运行语句的用户的 ID。 2967555311742259
executed_by 字符串 运行该语句的用户的电子邮件地址或用户名。 example@databricks.com
statement_text 字符串 SQL 语句的文本。 如果已配置客户管理的密钥,则 statement_text 为空。 由于存储限制,较长的语句文本值会进行压缩。 即使使用压缩,也可能达到字符限制。 SELECT 1
statement_type 字符串 语句类型。 例如:ALTERCOPYINSERT SELECT
error_message 字符串 描述错误条件的消息。 如果已配置客户管理的密钥,则 error_message 为空。 [INSUFFICIENT_PERMISSIONS]
Insufficient privileges:
User does not have
permission SELECT on table
'default.nyctaxi_trips'.
client_application 字符串 运行语句的客户端应用程序。 例如:Databricks SQL 编辑器、Tableau 和 Power BI。 此字段派生自客户端应用程序提供的信息。 虽然值预计在一段时间内保持静态,但无法保证这一点。 Databricks SQL Editor
client_driver 字符串 用于运行语句时连接到 Databricks 的连接器。 例如:Databricks SQL Driver for Go、Databricks ODBC 驱动程序、Databricks JDBC 驱动程序。 Databricks JDBC Driver
total_duration_ms bigint 语句的总执行时间(以毫秒为单位,不包括结果提取时间)。 1
waiting_for_compute_duration_ms bigint 等待预配计算资源的时间(以毫秒为单位)。 1
waiting_at_capacity_duration_ms bigint 等待队列中可用计算容量的时间(以毫秒为单位)。 1
execution_duration_ms bigint 执行查询所花费的时间(以毫秒为单位)。 1
compilation_duration_ms bigint 加载元数据和优化语句所花费的时间(以毫秒为单位)。 1
total_task_duration_ms bigint 所有任务工期的总和(以毫秒为单位)。 这个时间表示跨所有节点的所有核心运行查询所花费的总时间。 如果并行执行多个任务,则持续时间可能明显长于挂钟持续时间。 如果任务等待可用节点,则持续时间可能比挂钟持续时间要短。 1
result_fetch_duration_ms bigint 执行完成后提取查询结果所花费的时间(以毫秒为单位)。 1
start_time 时间戳 Databricks 收到请求的时间。 时区信息记录在值的末尾,其中 +00:00 表示 UTC。 2022-12-05T00:00:00.000+0000
end_time 时间戳 语句执行结束的时间,不包括结果提取时间。 时区信息记录在值的末尾,其中 +00:00 表示 UTC。 2022-12-05T00:00:00.000+00:00
update_time 时间戳 上次收到进度更新的语句的时间。 时区信息记录在值的末尾,其中 +00:00 表示 UTC。 2022-12-05T00:00:00.000+00:00
read_partitions bigint 修剪后读取的分区数。 1
pruned_files bigint 已修剪的文件数。 1
read_files bigint 修剪后读取的文件数。 1
read_rows bigint 语句读取的总行数。 1
produced_rows bigint 语句返回的总行数。 1
read_bytes bigint 语句读取的数据的总大小(以字节为单位)。 1
read_io_cache_percent 整数 (int) 从 IO 缓存中读取的永久性数据的字节百分比。 50
from_result_cache 布尔 TRUE 指示语句结果是从缓存中提取的。 TRUE
spilled_local_bytes bigint 执行语句时临时写入磁盘的数据大小(以字节为单位)。 1
written_bytes bigint 写入云对象存储的持久性数据的大小(以字节为单位)。 1
shuffle_read_bytes bigint 通过网络发送的数据总量(以字节为单位)。 1
query_source 结构体 包含表示执行此语句所涉及的 Databricks 实体(例如作业、笔记本或仪表板)的键值对的结构。 此字段仅记录 Databricks 实体。 {
alert_id: 81191d77-184f-4c4e-9998-b6a4b5f4cef1,
sql_query_id: null,
dashboard_id: null,
notebook_id: null,
job_info: {
job_id: 12781233243479,
job_run_id: null,
job_task_run_id: 110373910199121
},
legacy_dashboard_id: null,
genie_space_id: null
}
executed_as 字符串 使用其特权运行语句的用户或服务主体的名称。 example@databricks.com
executed_as_user_id 字符串 使用其特权运行语句的用户或服务主体的 ID。 2967555311742259

查看记录的查询配置文件

若要根据查询历史表中的记录导航到查询配置文件,请执行以下操作:

  1. 确定感兴趣的记录,然后复制记录的statement_id
  2. 请参考记录的 workspace_id 以确保您登录的工作区与记录相同。
  3. 单击“历史记录”图标。工作区边栏中的查询历史记录
  4. 在“语句 ID”字段中,将 statement_id 粘贴到记录上。
  5. 单击查询的名称。 此时将显示查询指标的概述。
  6. 单击“查看查询配置文件”

了解查询来源列

query_source 列包含语句执行中涉及的 Azure Databricks 实体的一组唯一标识符。

query_source如果该列包含多个 ID,则表示语句执行是由多个实体触发的。 例如,作业结果可能会触发调用 SQL 查询的警报。 在此示例中,将填充所有三个 ID 到 query_source。 此列的值不按执行顺序排序。

可能的查询源包括:

query_source的有效组合

以下示例显示如何根据查询的运行方式填充query_source列:

  • 在作业运行期间执行的查询包括填充 job_info 的结构体:

    {
    alert_id: null,
    sql_query_id: null,
    dashboard_id: null,
    notebook_id: null,
    job_info: {
    job_id: 64361233243479,
    job_run_id: null,
    job_task_run_id: 110378410199121
    },
    legacy_dashboard_id: null,
    genie_space_id: null
    }

  • 来自旧仪表板的查询包括一个sql_query_idlegacy_dashboard_id

    {
    alert_id: null,
    sql_query_id: 7336ab80-1a3d-46d4-9c79-e27c45ce9a15,
    dashboard_id: null,
    notebook_id: null,
    job_info: null,
    legacy_dashboard_id: 1a735c96-4e9c-4370-8cd7-5814295d534c,
    genie_space_id: null
    }

  • 来自警报的查询包括sql_query_idalert_id

    {
    alert_id: e906c0c6-2bcc-473a-a5d7-f18b2aee6e34,
    sql_query_id: 7336ab80-1a3d-46d4-9c79-e27c45ce9a15,
    dashboard_id: null,
    notebook_id: null,
    job_info: null,
    legacy_dashboard_id: null,
    genie_space_id: null
    }

  • 来自仪表板的查询包括一个 dashboard_id,但不包含 job_info

    {
    alert_id: null,
    sql_query_id: null,
    dashboard_id: 887406461287882,
    notebook_id: null,
    job_info: null,
    legacy_dashboard_id: null,
    genie_space_id: null
    }

具体化元存储中的查询历史记录

以下代码可用于创建每小时、每天或每周运行的作业,以具体化元存储中的查询历史记录。 相应地调整 HISTORY_TABLE_PATHLOOKUP_PERIOD_DAYS 变量。

from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

HISTORY_TABLE_PATH = "jacek.default.history"
# Adjust the lookup period according to your job schedule
LOOKUP_PERIOD_DAYS = 1

def table_exists(table_name):
    try:
        spark.sql(f"describe table {table_name}")
        return True
    except Exception:
        return False

def save_as_table(table_path, df, schema, pk_columns):
    deltaTable = (
        DeltaTable.createIfNotExists(spark)
        .tableName(table_path)
        .addColumns(schema)
        .execute()
    )

    merge_statement = " AND ".join([f"logs.{col}=newLogs.{col}" for col in pk_columns])

    result = (
        deltaTable.alias("logs")
        .merge(
            df.alias("newLogs"),
            f"{merge_statement}",
        )
        .whenNotMatchedInsertAll()
        .whenMatchedUpdateAll()
        .execute()
    )
    result.show()

def main():
    df = spark.read.table("system.query.history")
    if table_exists(HISTORY_TABLE_PATH):
        df = df.filter(f"update_time >= CURRENT_DATE() - INTERVAL {LOOKUP_PERIOD_DAYS} days")
    else:
        print(f"Table {HISTORY_TABLE_PATH} does not exist. Proceeding to copy the whole source table.")

    save_as_table(
        HISTORY_TABLE_PATH,
        df,
        df.schema,
        ["workspace_id", "statement_id"]
    )

main()