可以在启动流式处理查询之前,通过在 SparkSession 中设置以下配置来启用基于 RocksDB 的状态管理。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
可以在 Lakeflow 声明性管道上启用 RocksDB。 请参阅针对有状态处理优化管道配置。
启用更改日志检查点
在 Databricks Runtime 13.3 LTS 及更高版本中,可以启用更改日志检查点功能,以降低结构化流工作负载的检查点持续时间和端到端延迟。 Databricks 建议为所有“结构化流式处理”有状态查询启用更改日志检查点。
以前,RocksDB 状态存储会在检查点操作期间对数据文件拍摄快照和进行上传。 为了避免这种成本,更改日志检查点功能会仅将自上一个检查点以来发生了更改的记录写入持久存储。”
默认情况下,更改日志检查点功能处于禁用状态。 可以使用以下语法在 SparkSession 级别启用更改日志检查点:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
可以在现有流上启用更改日志检查点,并维护存储在检查点中的状态信息。
重要
已启用更改日志检查点功能的查询只能在 Databricks Runtime 13.3 LTS 及更高版本上运行。 你可以禁用更改日志检查点功能以还原旧有的检查点行为,但必须继续在 Databricks Runtime 13.3 LTS 或更高版本上运行这些查询。 必须重启作业才能完成这些更改。
RocksDB 状态存储指标
每个状态运算符收集有关在其 RocksDB 实例上执行的状态管理操作的指标,这些指标可用于观测状态存储,并可能有助于调试作业速度缓慢问题。
在 Databricks Runtime 16.4 LTS 及更高版本中,特定状态存储实例的指标标有其分区 ID 和存储名称,确保它们保持独立。 所有其他指标都报告为每个状态操作符在其运行的所有任务中的汇总总和。
这些指标是 customMetrics
中 stateOperators
字段内的 StreamingQueryProgress
映射的一部分。 下面是采用 JSON 格式的 StreamingQueryProgress
的示例(使用 StreamingQueryProgress.json()
获取)。
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
指标的详细说明如下:
指标名称 | 说明 |
---|---|
rocksdb提交写入批次延迟 (rocksdbCommitWriteBatchLatency) | 将内存中结构中的分阶段写入 (WriteBatch) 应用于本机 RocksDB 所花费的时间(以毫秒为单位)。 |
rocksdb提交刷新延迟 | 将 RocksDB 内存中更改刷新到本地磁盘所花费的时间(以毫秒为单位)。 |
rocksdbCommitCompactLatency | 在检查点提交期间进行压缩(可选)所花费的时间(以毫秒为单位)。 |
rocksdbCommitPauseLatency | 在检查点提交过程中停止后台工作线程(以实现压缩等目的)所花费的时间(以毫秒为单位)。 |
rocksdbCommitCheckpointLatency | 创建本机 RocksDB 快照并将其写入本地目录所花费的时间(以毫秒为单位)。 |
rocksdb提交文件同步延迟毫秒 | 将本机 RocksDB 快照相关的文件同步到外部存储(检查点位置)所花费的时间(以毫秒为单位)。 |
rocksdb获取延迟 | 每个基础本机 RocksDB::Get 调用平均花费的时间(以纳秒为单位)。 |
rocksdbPutCount(RocksDB写入计数) | 每个基础本机 RocksDB::Put 调用平均花费的时间(以纳秒为单位)。 |
rocksdbGetCount | 本机 RocksDB::Get 调用的数量(不包括从 WriteBatch 执行的 Gets - 用于暂存写入的内存中批处理)。 |
rocksdbPutCount(RocksDB写入计数) | 本机 RocksDB::Put 调用数量(不包括对 WriteBatch 执行的 Puts - 用于暂存写入的内存中批处理)。 |
rocksdb每次Get读取的总字节数 (rocksdbTotalBytesReadByGet) | 通过本机 RocksDB::Get 调用读取的未压缩字节数。 |
RocksDB通过Put命令写入的总字节数 | 通过本机 RocksDB::Put 调用写入的未压缩字节数。 |
rocksdb读取块缓存命中计数 | 使用本机 RocksDB 块缓存避免从本地磁盘读取数据的次数。 |
rocksdb读取块缓存未命中计数 | 从本地磁盘读取数据所需的未命中本机 RocksDB 块缓存次数。 |
rocksdb压缩读取的总字节数 | 本机 RocksDB 压缩进程从本地磁盘读取的字节数。 |
rocksdb压缩后写入的总字节数 | 本机 RocksDB 压缩进程写入本地磁盘的字节数。 |
rocksdb总压缩延迟毫秒 | RocksDB 压缩(后台压缩以及在提交期间启动的可选压缩)所花费的时间(以毫秒为单位)。 |
rocksdb写入器暂停延迟毫秒 | 由于后台压缩或将内存表刷新到磁盘而导致写入器停滞的时间(以毫秒为单位)。 |
rocksdb迭代器总字节读取量 | 某些有状态操作(例如 flatMapGroupsWithState 中的超时处理或窗口聚合中的水印处理)需要通过迭代器读取 DB 中的整个数据。 使用迭代器读取的未压缩数据的总大小。 |