ストリーミング クエリを開始する前に、SparkSession で次の構成を設定することにより、RocksDB ベースの状態管理を有効にすることができます。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Lakeflow 宣言パイプラインで RocksDB を有効にすることができます。 ステートフル処理については、 パイプライン構成の最適化に関するを参照してください。
変更ログのチェックポイント処理を有効にする
Databricks Runtime 13.3 LTS 以降では、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮するために、変更ログのチェックポイント処理を有効にすることができます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログのチェックポイント処理を有効にすることをお勧めします。
従来、RocksDB 状態ストアはチェックポイント処理中にスナップショットを作成し、データ ファイルをアップロードします。 このコストを回避するために、変更ログのチェックポイント処理では、最後のチェックポイント以降に変更されたレコードのみが永続ストレージに書き込まれます。
変更ログのチェックポイント処理は、既定で無効になっています。 SparkSession レベルで変更ログのチェックポイント処理を有効にするには、次の構文を使用します。
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
既存のストリームで変更ログのチェックポイント処理を有効にし、チェックポイントに格納されている状態情報を維持することができます。
重要
変更ログのチェックポイント処理を有効にしたクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイント処理を無効にして従来のチェックポイント処理の動作に戻すことができますが、これらのクエリは引き続き Databricks Runtime 13.3 LTS 以降で実行する必要があります。 これらの変更を行うには、ジョブを再起動する必要があります。
RocksDB 状態ストア メトリクス
各状態操作は、状態ストアを観察するために RocksDB インスタンスで実行された状態管理操作に関連するメトリックを収集します。これは、デバッグ ジョブの低速化につながる可能性があります。
Databricks Runtime 16.4 LTS 以降では、特定の状態ストア インスタンスのメトリックにパーティション ID とストア名のラベルが付けられます。そのため、個別の状態が維持されます。 その他のすべてのメトリックは、状態演算子が実行されているすべてのタスクの各状態演算子の集計合計として報告されます。
これらのメトリックは、customMetrics
の stateOperators
フィールド内の StreamingQueryProgress
マップの一部です。 JSON 形式の StreamingQueryProgress
の例を次に示します (StreamingQueryProgress.json()
を使用して取得)。
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
メトリックの詳細な説明は次のとおりです。
測定項目名 | 説明 |
---|---|
rocksdbコミットライトバッチ遅延 | インメモリ構造 (WriteBatch) でのステージングされた書き込みをネイティブ RocksDB に適用するためにかかった時間 (ミリ秒単位)。 |
rocksdbCommitFlushLatency (コミットフラッシュ待ち時間) | ローカル ディスクへの RocksDB のメモリ内の変更をフラッシュするのにかかった時間 (ミリ秒単位)。 |
rocksdb コミットコンパクトレイテンシー | チェックポイントのコミット中に圧縮に要した時間 (省略可能)(ミリ秒単位)。 |
rocksdbコミット一時停止レイテンシー | チェックポイントのコミットの一部としてバックグラウンド ワーカー スレッド (圧縮など) の停止に要した時間 (ミリ秒単位)。 |
ロックスDBコミットチェックポイントレイテンシー | ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込むのにかかった時間 (ミリ秒単位)。 |
rocksdbコミットファイル同期遅延時間(ミリ秒) | ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期するためにかかった時間 (ミリ秒単位)。 |
rocksdbGetLatency | 基になるネイティブ RocksDB::Get 呼び出しごとにかかった平均時間 (ナノ秒単位)。 |
rocksdbPutCount | 基になるネイティブ RocksDB::Put 呼び出しごとにかかった平均時間 (ナノ秒単位)。 |
rocksdbGetCount | ネイティブ RocksDB::Get 呼び出しの数 (WriteBatch からの Gets を含まない - ステージング書き込みに使用されるメモリ バッチ内)。 |
rocksdbPutCount | ネイティブ RocksDB::Put 呼び出しの数 (WriteBatch への Puts を含まない - ステージング書き込みに使用されるメモリ バッチ内)。 |
rocksdbのGetによって読み取られた総バイト数 | ネイティブ RocksDB::Get 呼び出しによって読み取られた非圧縮バイト数。 |
rocksdbTotalBytesWrittenByPut | ネイティブ RocksDB::Put 呼び出しによって書き込まれた非圧縮バイト数。 |
If a translation is warranted, it could be something like ロックスDB読み取りブロックキャッシュヒット数. | ローカル ディスクからのデータの読み取りを避けるために、native RocksDB block キャッシュを使用する回数。 |
rocksdb 読み取りブロックキャッシュミスカウント | Native RocksDB ブロックキャッシュがミスしてローカルディスクからデータを読み取る必要があった回数。 |
RocksDBコンパクションによる総合バイト読み込み量 | Native RocksDB の圧縮プロセスによってローカル ディスクから読み取られたバイト数。 |
rocksdbコンパクションにより書き込まれたバイトの合計 | Native RocksDB 圧縮プロセスによってローカル ディスクに書き込まれたバイト数。 |
rocksdbTotalCompactionLatencyMs | RocksDB 圧縮 (コミット中に開始されるバックグラウンドとオプションの圧縮の両方) にかかった時間 (ミリ秒単位)。 |
rocksdbWriterStallLatencyMs | バックグラウンドでの圧縮、または memtables からディスクへのフラッシュによってライターが停止した時間 (ミリ秒単位)。 |
rocksdbイテレータ経由で読み取られた総バイト数 | ステートフルな操作 (flatMapGroupsWithState のタイムアウト処理やウィンドウ内の集計でのウォーターマークなど) の一部では、反復子を使用して DB 内のデータ全体を読み取る必要があります。 反復子を使用して読み取られた非圧縮データの合計サイズ。 |