次の方法で共有


ステートフル ストリーミングとは

"ステートフルな" 構造化ストリーミング クエリでは、中間状態情報の増分更新が必要です。一方、"ステートレスな" 構造化ストリーミング クエリでは、ソースからシンクに対して処理された行に関する情報のみが追跡されます。

ステートフル操作には、ストリーミング集計、ストリーミングdropDuplicates、ストリームストリーム結合、カスタムステートフルアプリケーションが含まれます。

ステートフルな構造化ストリーミング クエリに必要な中間状態情報は、誤って構成された場合に予期しない待機時間と運用環境の問題につながる可能性があります。

Databricks Runtime 13.2 LTS 以降では、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮するために、RockDB の変更ログのチェックポイント処理を有効にすることができます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログのチェックポイント処理を有効にすることをお勧めします。 変更ログのチェックポイント処理を有効にするを参照してください。

ステートフルな構造化ストリーミング クエリを最適化する

ステートフルな構造化ストリーミング クエリの中間状態の情報を管理することにより、予期せぬ待機時間や運用環境の問題を防ぐことができます。

Databricks では次を推奨しています。

  • コンピューティングに最適化されたインスタンスをワーカーとして使用します。
  • シャッフル パーティションの数を、クラスター内のコア数の 1 ~ 2 倍に設定します。
  • SparkSession の spark.sql.streaming.noDataMicroBatches.enabled 構成を false に設定します。 これにより、ストリーミング マイクロバッチ エンジンは、データを含まないマイクロバッチを処理できなくなります。 また、この構成を false に設定すると、ウォーターマークまたは処理時間のタイムアウトを使用するステートフルな操作が発生し、すぐにではなく新しいデータが到着するまでデータ出力が取得されないことにも注意してください。

Databricks では、ステートフル ストリームの状態を管理するために、RocksDB と変更ログのチェックポイント処理を使用することをお勧めします。 「Azure Databricks で RocksDB 状態ストアを構成する」をご覧ください。

状態管理スキームは、クエリの再起動間で変更できません。 既定の管理でクエリが開始されている場合は、状態ストアを変更するには、新しいチェックポイントの場所でクエリを最初から再起動する必要があります。

構造化ストリーミングで複数のステートフル演算子を操作する

Databricks Runtime 13.3 LTS 以降では、Azure Databricks により、構造化ストリーミング ワークロードのステートフル演算子に高度なサポートが提供されます。 複数のステートフル演算子を連結できるようになりました。つまり、ウィンドウ集計などの操作の出力を、結合などの別のステートフル操作にフィードできます。

Databricks Runtime 16.2 以降では、複数のステートフル演算子を使用するワークロードで transformWithState を使用できます。 カスタム ステートフル アプリケーション の構築を参照してください。

次の例では、使用できるいくつかのパターンを示します。

重要

複数のステートフル演算子を操作する場合、次の制限があります。

  • 従来のカスタム ステートフル演算子 (FlatMapGroupWithStateapplyInPandasWithState はサポートされていません。
  • 追加出力モードのみがサポートされています。

チェーンされた時間枠の集計

Python(プログラミング言語)

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

スカラ (プログラミング言語)

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

2 つの異なるストリームの時間枠の集計の後にストリーム同士の枠の結合が続く

Python(プログラミング言語)

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

スカラ (プログラミング言語)

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

ストリーム同士の時間間隔結合の後に時間枠の集計が続く

Python(プログラミング言語)

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

スカラ (プログラミング言語)

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

構造化ストリーミングの状態の再調整

状態の再調整は、Lakeflow 宣言パイプラインのすべてのストリーミング ワークロードに対して既定で有効になっています。 Databricks Runtime 11.3 LTS 以降では、Spark クラスター構成で次の構成オプションを設定して、状態の再調整を有効にすることができます。

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状態を再調整すると、クラスターのサイズ変更イベントが発生するステートフルな構造化ストリーミング パイプラインにメリットがあります。 クラスター サイズの変化に関わらず、ステートレス ストリーミング操作には、メリットがありません。

コンピューティングの自動スケールには、構造化ストリーミング ワークロードのクラスター サイズのスケールダウンに制限があります。 Databricks では、ストリーミング ワークロードの自動スケーリングが強化された Lakeflow 宣言型パイプラインを使用することをお勧めします。 自動スケーリングを使用した Lakeflow 宣言パイプラインのクラスター使用率の最適化に関するページを参照してください。

クラスターのサイズ変更イベントは、状態の再調整をトリガーします。 マイクロバッチでは、状態がクラウド ストレージから新しい Executor に読み込まれると、イベントの再調整中の待機時間が長くなる可能性があります。