重要
此系统表为公共预览版。
本文包含有关查询历史记录系统表的信息,包括表架构的大纲。
表路径:此系统表位于 system.query.history
.
使用查询历史记录表
查询历史记录表包含使用 SQL 仓库或无服务器计算在笔记本和作业中运行的查询记录。 该表包括来自访问表的同一区域中所有工作区的帐户范围记录。
默认情况下,只有管理员有权访问系统表。 如果要与用户或组共享表的数据,Databricks 建议为每个用户或组创建动态视图。 请参阅创建动态视图。
查询历史记录系统表架构
查询历史记录表使用以下架构:
列名称 | 数据类型 | 说明 | 示例 |
---|---|---|---|
account_id |
字符串 | 帐户的 ID。 | 11e22ba4-87b9-4cc2 -9770-d10b894b7118 |
workspace_id |
字符串 | 运行查询的工作区的 ID。 | 1234567890123456 |
statement_id |
字符串 | 唯一标识语句执行的 ID。 可以使用此 ID 在“查询历史记录”UI 中查找语句执行。 | 7a99b43c-b46c-432b -b0a7-814217701909 |
session_id |
字符串 | Spark 会话 ID。 | 01234567-cr06-a2mp -t0nd-a14ecfb5a9c2 |
execution_status |
字符串 | 语句终止状态。 可能的值为: - FINISHED :执行成功- FAILED :执行失败,并显示随附错误消息中所述失败的原因- CANCELED :执行已取消 |
FINISHED |
compute |
结构体 | 一个结构,表示用于运行语句的计算资源的类型,以及资源的 ID(如适用)。
type 值将是 WAREHOUSE 或 SERVERLESS_COMPUTE 。 |
{ type: WAREHOUSE, cluster_id: NULL, warehouse_id: ec58ee3772e8d305 } |
executed_by_user_id |
字符串 | 运行语句的用户的 ID。 | 2967555311742259 |
executed_by |
字符串 | 运行该语句的用户的电子邮件地址或用户名。 | example@databricks.com |
statement_text |
字符串 | SQL 语句的文本。 如果已配置客户管理的密钥,则 statement_text 为空。 由于存储限制,较长的语句文本值会进行压缩。 即使使用压缩,也可能达到字符限制。 |
SELECT 1 |
statement_type |
字符串 | 语句类型。 例如:ALTER 、COPY 和 INSERT 。 |
SELECT |
error_message |
字符串 | 描述错误条件的消息。 如果已配置客户管理的密钥,则 error_message 为空。 |
[INSUFFICIENT_PERMISSIONS] Insufficient privileges: User does not have permission SELECT on table 'default.nyctaxi_trips'. |
client_application |
字符串 | 运行语句的客户端应用程序。 例如:Databricks SQL 编辑器、Tableau 和 Power BI。 此字段派生自客户端应用程序提供的信息。 虽然值预计在一段时间内保持静态,但无法保证这一点。 | Databricks SQL Editor |
client_driver |
字符串 | 用于运行语句时连接到 Databricks 的连接器。 例如:Databricks SQL Driver for Go、Databricks ODBC 驱动程序、Databricks JDBC 驱动程序。 | Databricks JDBC Driver |
total_duration_ms |
bigint | 语句的总执行时间(以毫秒为单位,不包括结果提取时间)。 | 1 |
waiting_for_compute_duration_ms |
bigint | 等待预配计算资源的时间(以毫秒为单位)。 | 1 |
waiting_at_capacity_duration_ms |
bigint | 等待队列中可用计算容量的时间(以毫秒为单位)。 | 1 |
execution_duration_ms |
bigint | 执行查询所花费的时间(以毫秒为单位)。 | 1 |
compilation_duration_ms |
bigint | 加载元数据和优化语句所花费的时间(以毫秒为单位)。 | 1 |
total_task_duration_ms |
bigint | 所有任务工期的总和(以毫秒为单位)。 这个时间表示跨所有节点的所有核心运行查询所花费的总时间。 如果并行执行多个任务,则持续时间可能明显长于挂钟持续时间。 如果任务等待可用节点,则持续时间可能比挂钟持续时间要短。 | 1 |
result_fetch_duration_ms |
bigint | 执行完成后提取查询结果所花费的时间(以毫秒为单位)。 | 1 |
start_time |
时间戳 | Databricks 收到请求的时间。 时区信息记录在值的末尾,其中 +00:00 表示 UTC。 |
2022-12-05T00:00:00.000+0000 |
end_time |
时间戳 | 语句执行结束的时间,不包括结果提取时间。 时区信息记录在值的末尾,其中 +00:00 表示 UTC。 |
2022-12-05T00:00:00.000+00:00 |
update_time |
时间戳 | 上次收到进度更新的语句的时间。 时区信息记录在值的末尾,其中 +00:00 表示 UTC。 |
2022-12-05T00:00:00.000+00:00 |
read_partitions |
bigint | 修剪后读取的分区数。 | 1 |
pruned_files |
bigint | 已修剪的文件数。 | 1 |
read_files |
bigint | 修剪后读取的文件数。 | 1 |
read_rows |
bigint | 语句读取的总行数。 | 1 |
produced_rows |
bigint | 语句返回的总行数。 | 1 |
read_bytes |
bigint | 语句读取的数据的总大小(以字节为单位)。 | 1 |
read_io_cache_percent |
整数 (int) | 从 IO 缓存中读取的永久性数据的字节百分比。 | 50 |
from_result_cache |
布尔 |
TRUE 指示语句结果是从缓存中提取的。 |
TRUE |
spilled_local_bytes |
bigint | 执行语句时临时写入磁盘的数据大小(以字节为单位)。 | 1 |
written_bytes |
bigint | 写入云对象存储的持久性数据的大小(以字节为单位)。 | 1 |
shuffle_read_bytes |
bigint | 通过网络发送的数据总量(以字节为单位)。 | 1 |
query_source |
结构体 | 包含表示执行此语句所涉及的 Databricks 实体(例如作业、笔记本或仪表板)的键值对的结构。 此字段仅记录 Databricks 实体。 | { alert_id: 81191d77-184f-4c4e-9998-b6a4b5f4cef1, sql_query_id: null, dashboard_id: null, notebook_id: null, job_info: { job_id: 12781233243479, job_run_id: null, job_task_run_id: 110373910199121 }, legacy_dashboard_id: null, genie_space_id: null } |
executed_as |
字符串 | 使用其特权运行语句的用户或服务主体的名称。 | example@databricks.com |
executed_as_user_id |
字符串 | 使用其特权运行语句的用户或服务主体的 ID。 | 2967555311742259 |
查看记录的查询配置文件
若要根据查询历史表中的记录导航到查询配置文件,请执行以下操作:
- 确定感兴趣的记录,然后复制记录的
statement_id
。 - 请参考记录的
workspace_id
以确保您登录的工作区与记录相同。 - 单击
工作区边栏中的查询历史记录。
- 在“语句 ID”字段中,将
statement_id
粘贴到记录上。 - 单击查询的名称。 此时将显示查询指标的概述。
- 单击“查看查询配置文件”。
了解查询来源列
该 query_source
列包含语句执行中涉及的 Azure Databricks 实体的一组唯一标识符。
query_source
如果该列包含多个 ID,则表示语句执行是由多个实体触发的。 例如,作业结果可能会触发调用 SQL 查询的警报。 在此示例中,将填充所有三个 ID 到 query_source
。 此列的值不按执行顺序排序。
可能的查询源包括:
- alert_id:从警报触发的语句
- sql_query_id:在此 SQL 编辑器 会话中执行的语句
- dashboard_id:从仪表板执行的语句
- legacy_dashboard_id:在旧仪表板中执行的语句
- genie_space_id:从 Genie 空间执行的语句
- notebook_id:从笔记本执行的语句
- job_info.job_id:在作业中执行的语句
- job_info.job_run_id:从作业运行执行的语句
- job_info.job_task_run_id:在作业任务运行中执行的语句
query_source的有效组合
以下示例显示如何根据查询的运行方式填充query_source
列:
在作业运行期间执行的查询包括填充
job_info
的结构体:{
alert_id: null,
sql_query_id: null,
dashboard_id: null,
notebook_id: null,
job_info: {
job_id: 64361233243479,
job_run_id: null,
job_task_run_id: 110378410199121
},
legacy_dashboard_id: null,
genie_space_id: null
}
来自旧仪表板的查询包括一个
sql_query_id
和legacy_dashboard_id
:{
alert_id: null,
sql_query_id: 7336ab80-1a3d-46d4-9c79-e27c45ce9a15,
dashboard_id: null,
notebook_id: null,
job_info: null,
legacy_dashboard_id: 1a735c96-4e9c-4370-8cd7-5814295d534c,
genie_space_id: null
}
来自警报的查询包括
sql_query_id
和alert_id
:{
alert_id: e906c0c6-2bcc-473a-a5d7-f18b2aee6e34,
sql_query_id: 7336ab80-1a3d-46d4-9c79-e27c45ce9a15,
dashboard_id: null,
notebook_id: null,
job_info: null,
legacy_dashboard_id: null,
genie_space_id: null
}
来自仪表板的查询包括一个
dashboard_id
,但不包含job_info
:{
alert_id: null,
sql_query_id: null,
dashboard_id: 887406461287882,
notebook_id: null,
job_info: null,
legacy_dashboard_id: null,
genie_space_id: null
}
具体化元存储中的查询历史记录
以下代码可用于创建每小时、每天或每周运行的作业,以具体化元存储中的查询历史记录。 相应地调整 HISTORY_TABLE_PATH
和 LOOKUP_PERIOD_DAYS
变量。
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
HISTORY_TABLE_PATH = "jacek.default.history"
# Adjust the lookup period according to your job schedule
LOOKUP_PERIOD_DAYS = 1
def table_exists(table_name):
try:
spark.sql(f"describe table {table_name}")
return True
except Exception:
return False
def save_as_table(table_path, df, schema, pk_columns):
deltaTable = (
DeltaTable.createIfNotExists(spark)
.tableName(table_path)
.addColumns(schema)
.execute()
)
merge_statement = " AND ".join([f"logs.{col}=newLogs.{col}" for col in pk_columns])
result = (
deltaTable.alias("logs")
.merge(
df.alias("newLogs"),
f"{merge_statement}",
)
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll()
.execute()
)
result.show()
def main():
df = spark.read.table("system.query.history")
if table_exists(HISTORY_TABLE_PATH):
df = df.filter(f"update_time >= CURRENT_DATE() - INTERVAL {LOOKUP_PERIOD_DAYS} days")
else:
print(f"Table {HISTORY_TABLE_PATH} does not exist. Proceeding to copy the whole source table.")
save_as_table(
HISTORY_TABLE_PATH,
df,
df.schema,
["workspace_id", "statement_id"]
)
main()