可以使用 DataFrame 操作或 SQL 表值函数来查询结构化流式处理状态数据和元数据。 使用这些函数可观察结构化流式处理有状态查询的状态信息,这对于监视和调试非常有用。
必须对流式处理查询的检查点路径具有读取访问权限才能查询状态数据或元数据。 本文中所述的函数提供对状态数据和元数据的只读访问权限。 只能使用批量读取语义来查询状态信息。
注意
无法查询 Lakeflow 声明性管道、流式处理表或具体化视图的状态信息。 不能使用无服务器计算或配置了标准访问模式的计算来查询状态信息。
要求
- 使用以下计算配置之一:
- Databricks Runtime 16.3 及更高版本,用在配置了标准访问模式的计算上。
- Databricks Runtime 14.3 LTS 及更高版本可在配置为专用或无隔离访问模式的计算上运行。
- 对流式处理查询所使用的检查点路径的读取访问权限。
读取结构化流式处理状态存储
可以读取在任何受支持的 Databricks Runtime 中执行的结构化流式处理查询的状态存储信息。 使用以下语法:
Python语言
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
状态读取器 API 参数
状态读取器 API 支持以下可选配置:
选项 | 类型 | 默认值 | 说明 |
---|---|---|---|
batchId |
长整型 | 最新批处理 ID | 表示要从中读取的目标批。 指定此选项来查询查询早期状态的状态信息。 该批必须提交,但尚未清理。 |
operatorId |
长整型 | 0 | 表示要从中读取的目标运算符。 当查询使用多个有状态运算符时,使用此选项。 |
storeName |
字符串 | 默认 | 表示要从中读取的目标状态存储名称。 当有状态运算符使用多个状态存储实例时,将使用此选项。 必须为流之间的联接指定 storeName 或 joinSide ,但不能同时指定两者。 |
joinSide |
字符串(“left”或“right”) | 表示要从中读取的目标端。 当用户想要从流联接中读取状态时,将使用此选项。 | |
stateVarName |
字符串 | 没有 | 要在此查询进行过程中读取的状态变量名称。 在init 运算符使用的StatefulProcessor 函数中,状态变量名称是为每个transformWithState 变量提供的唯一名称。 如果使用运算符, transformWithState 则此选项是必需的选项。 此选项仅适用于 transformWithState 运算符,并忽略其他运算符。 在 Databricks Runtime 16.2 及更高版本中可用。 |
readRegisteredTimers |
布尔型 | 假 | 设置为 true 可读取 transformWithState 运算符中使用的已注册计时器。 此选项仅适用于 transformWithState 运算符,并忽略其他运算符。 在 Databricks Runtime 16.2 及更高版本中可用。 |
flattenCollectionTypes |
布尔型 | 是 | 如果为 true ,则平展为 map 和 list 状态变量返回的记录。 如果为 false ,则使用 Spark SQL Array 或 Map 返回记录。 此选项仅适用于 transformWithState 运算符,并忽略其他运算符。 在 Databricks Runtime 16.2 及更高版本中可用。 |
返回的数据具有以下架构:
列 | 类型 | 说明 |
---|---|---|
key |
结构体(从状态键派生出的后续类型) | 状态检查点中有状态运算符记录的键。 |
value |
结构体(从状态值派生出的后续类型) | 状态检查点中有状态运算符记录的值。 |
partition_id |
整数 | 包含有状态运算符记录的状态检查点的分区。 |
请参阅 read_statestore
表值函数。
读取结构化流式处理状态元数据
重要
必须在 Databricks Runtime 14.2 或更高版本上运行流式处理查询才能记录状态元数据。 状态元数据文件不会破坏向后兼容性。 如果选择在 Databricks Runtime 14.1 或更低版本上运行流式处理查询,则会忽略现有状态元数据文件,并且不会写入新的状态元数据文件。
可以读取 Databricks Runtime 14.2 或更高版本上运行的结构化流式处理查询的状态元数据信息。 使用以下语法:
Python语言
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
返回的数据具有以下架构:
列 | 类型 | 说明 |
---|---|---|
operatorId |
整数 | 有状态流式处理运算符的整数 ID。 |
operatorName |
整数 | 有状态流式处理运算符的名称。 |
stateStoreName |
字符串 | 运算符的状态存储的名称。 |
numPartitions |
整数 | 状态存储的分区数。 |
minBatchId |
长整型 | 可用于查询状态的最小批 ID。 |
maxBatchId |
长整型 | 可用于查询状态的最大批 ID。 |
注意
minBatchId
和 maxBatchId
提供的批处理 ID 值在写入检查点时反映状态。 系统会使用微批处理执行自动清理旧的批处理,因此不能保证此处提供的值仍可用。
示例:查询流间联接的一侧
使用以下语法查询流间联接的左侧:
Python语言
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
示例:查询具有多个有状态运算符的流的状态存储
此示例使用状态元数据读取器收集具有多个有状态运算符的流式处理查询的元数据详细信息,然后使用元数据结果作为状态读取器的选项。
状态元数据读取器将检查点路径作为唯一选项,如以下语法示例所示:
Python语言
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
下表表示状态存储元数据的示例输出:
operatorId | 操作员名称 | 状态存储名称 | 分区数量 | minBatchId | maxBatchId |
---|---|---|---|---|---|
0 | 状态存储保存 | 默认 | 200 | 0 | 13 |
1 | dedupeWithinWatermark | 默认 | 200 | 0 | 13 |
若要获取运算符的结果 dedupeWithinWatermark
,请使用该选项查询状态读取器 operatorId
,如以下示例所示:
Python语言
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);