次の方法で共有


Azure Databricks で RocksDB 状態ストアを構成する

ストリーミング クエリを開始する前に、SparkSession で次の構成を設定することにより、RocksDB ベースの状態管理を有効にすることができます。

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Lakeflow 宣言パイプラインで RocksDB を有効にすることができます。 ステートフル処理については、 パイプライン構成の最適化に関するを参照してください。

変更ログのチェックポイント処理を有効にする

Databricks Runtime 13.3 LTS 以降では、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮するために、変更ログのチェックポイント処理を有効にすることができます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログのチェックポイント処理を有効にすることをお勧めします。

従来、RocksDB 状態ストアはチェックポイント処理中にスナップショットを作成し、データ ファイルをアップロードします。 このコストを回避するために、変更ログのチェックポイント処理では、最後のチェックポイント以降に変更されたレコードのみが永続ストレージに書き込まれます。

変更ログのチェックポイント処理は、既定で無効になっています。 SparkSession レベルで変更ログのチェックポイント処理を有効にするには、次の構文を使用します。

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

既存のストリームで変更ログのチェックポイント処理を有効にし、チェックポイントに格納されている状態情報を維持することができます。

重要

変更ログのチェックポイント処理を有効にしたクエリは、Databricks Runtime 13.3 LTS 以降でのみ実行できます。 変更ログのチェックポイント処理を無効にして従来のチェックポイント処理の動作に戻すことができますが、これらのクエリは引き続き Databricks Runtime 13.3 LTS 以降で実行する必要があります。 これらの変更を行うには、ジョブを再起動する必要があります。

RocksDB 状態ストア メトリクス

各状態操作は、状態ストアを観察するために RocksDB インスタンスで実行された状態管理操作に関連するメトリックを収集します。これは、デバッグ ジョブの低速化につながる可能性があります。

Databricks Runtime 16.4 LTS 以降では、特定の状態ストア インスタンスのメトリックにパーティション ID とストア名のラベルが付けられます。そのため、個別の状態が維持されます。 その他のすべてのメトリックは、状態演算子が実行されているすべてのタスクの各状態演算子の集計合計として報告されます。

これらのメトリックは、customMetricsstateOperators フィールド内の StreamingQueryProgress マップの一部です。 JSON 形式の StreamingQueryProgress の例を次に示します (StreamingQueryProgress.json() を使用して取得)。

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

メトリックの詳細な説明は次のとおりです。

測定項目名 説明
rocksdbコミットライトバッチ遅延 インメモリ構造 (WriteBatch) でのステージングされた書き込みをネイティブ RocksDB に適用するためにかかった時間 (ミリ秒単位)。
rocksdbCommitFlushLatency (コミットフラッシュ待ち時間) ローカル ディスクへの RocksDB のメモリ内の変更をフラッシュするのにかかった時間 (ミリ秒単位)。
rocksdb コミットコンパクトレイテンシー チェックポイントのコミット中に圧縮に要した時間 (省略可能)(ミリ秒単位)。
rocksdbコミット一時停止レイテンシー チェックポイントのコミットの一部としてバックグラウンド ワーカー スレッド (圧縮など) の停止に要した時間 (ミリ秒単位)。
ロックスDBコミットチェックポイントレイテンシー ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込むのにかかった時間 (ミリ秒単位)。
rocksdbコミットファイル同期遅延時間(ミリ秒) ネイティブの RocksDB スナップショット関連ファイルを外部ストレージ (チェックポイントの場所) に同期するためにかかった時間 (ミリ秒単位)。
rocksdbGetLatency 基になるネイティブ RocksDB::Get 呼び出しごとにかかった平均時間 (ナノ秒単位)。
rocksdbPutCount 基になるネイティブ RocksDB::Put 呼び出しごとにかかった平均時間 (ナノ秒単位)。
rocksdbGetCount ネイティブ RocksDB::Get 呼び出しの数 (WriteBatch からの Gets を含まない - ステージング書き込みに使用されるメモリ バッチ内)。
rocksdbPutCount ネイティブ RocksDB::Put 呼び出しの数 (WriteBatch への Puts を含まない - ステージング書き込みに使用されるメモリ バッチ内)。
rocksdbのGetによって読み取られた総バイト数 ネイティブ RocksDB::Get 呼び出しによって読み取られた非圧縮バイト数。
rocksdbTotalBytesWrittenByPut ネイティブ RocksDB::Put 呼び出しによって書き込まれた非圧縮バイト数。
If a translation is warranted, it could be something like ロックスDB読み取りブロックキャッシュヒット数. ローカル ディスクからのデータの読み取りを避けるために、native RocksDB block キャッシュを使用する回数。
rocksdb 読み取りブロックキャッシュミスカウント Native RocksDB ブロックキャッシュがミスしてローカルディスクからデータを読み取る必要があった回数。
RocksDBコンパクションによる総合バイト読み込み量 Native RocksDB の圧縮プロセスによってローカル ディスクから読み取られたバイト数。
rocksdbコンパクションにより書き込まれたバイトの合計 Native RocksDB 圧縮プロセスによってローカル ディスクに書き込まれたバイト数。
rocksdbTotalCompactionLatencyMs RocksDB 圧縮 (コミット中に開始されるバックグラウンドとオプションの圧縮の両方) にかかった時間 (ミリ秒単位)。
rocksdbWriterStallLatencyMs バックグラウンドでの圧縮、または memtables からディスクへのフラッシュによってライターが停止した時間 (ミリ秒単位)。
rocksdbイテレータ経由で読み取られた総バイト数 ステートフルな操作 (flatMapGroupsWithState のタイムアウト処理やウィンドウ内の集計でのウォーターマークなど) の一部では、反復子を使用して DB 内のデータ全体を読み取る必要があります。 反復子を使用して読み取られた非圧縮データの合計サイズ。