次の方法で共有


カスタム ステートフル アプリケーションを構築する

Von Bedeutung

この機能は、Databricks Runtime 16.2 以降の パブリック プレビュー 段階にあります。

カスタムステートフル演算子を使用してストリーミング アプリケーションを構築し、任意のステートフル ロジックを使用する低待機時間およびほぼリアルタイムのソリューションを実装できます。 カスタムステートフル演算子は、従来の構造化ストリーミング処理を通じて使用できない新しい運用ユース ケースとパターンのロックを解除します。

Databricks では、集計、重複除去、ストリーミング結合など、サポートされているステートフル操作に組み込みの構造化ストリーミング機能を使用することをお勧めします。 「ステートフル ストリーミングとは」を参照してください。

Databricks では、任意の状態変換に従来の演算子よりも transformWithState を使用することをお勧めします。 従来の flatMapGroupsWithState 演算子と mapGroupsWithState 演算子のドキュメントについては、「 従来の任意のステートフル演算子」を参照してください。

要求事項

transformWithState 演算子と関連する API とクラスには、次の要件があります。

  • Databricks Runtime 16.2 以降で使用できます。
  • コンピューティングでは、専用または分離なしのアクセス モードを使用する必要があります。
  • RocksDB ステート ストア プロバイダーを使用する必要があります。 Databricks では、コンピューティング構成の一部として RocksDB を有効にすることをお勧めします。
  • transformWithStateInPandas は、Databricks Runtime 16.3 以降で標準アクセス モードをサポートしています。

現在のセッションに対して RocksDB ステート ストア プロバイダーを有効にするには、次を実行します。

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

transformWithState とは

transformWithState演算子は、構造化ストリーミング クエリにカスタム ステートフル プロセッサを適用します。 transformWithStateを使用するには、カスタム ステートフル プロセッサを実装する必要があります。 構造化ストリーミングには、Python、Scala、または Java を使用してステートフル プロセッサを構築するための API が含まれています。

transformWithStateを使用して、構造化ストリーミングで増分処理されたレコードのグループ化キーにカスタム ロジックを適用します。 次に、大まかな設計について説明します。

  • 1 つ以上の状態変数を定義します。
  • 状態情報はグループ化キーごとに保持され、ユーザー定義ロジックに従って状態変数ごとにアクセスできます。
  • 処理されたマイクロ バッチごとに、キーのすべてのレコードを反復子として使用できます。
  • 組み込みハンドルを使用して、タイマーとユーザー定義の条件に基づいてレコードを出力するタイミングと方法を制御します。
  • 状態値は、個々の有効期間 (TTL) 定義をサポートするため、状態の有効期限と状態サイズを柔軟に管理できます。

transformWithStateは状態ストアでのスキーマの進化をサポートしているため、過去の状態情報を失ったり、レコードを再処理したりする必要なく、運用アプリケーションを反復処理および更新できるため、開発の柔軟性とメンテナンスの容易さを実現できます。 状態ストアでのスキーマの進化を参照してください。

Von Bedeutung

PySpark では、transformWithStateInPandasの代わりに演算子transformWithStateが使用されます。 Azure Databricks のドキュメントでは、 transformWithState を使用して、Python と Scala の両方の実装の機能について説明します。

transformWithStateおよび関連 API の Scala と Python の実装は、言語の仕様によって異なりますが、同じ機能を提供します。 お好みのプログラミング言語については、言語固有の例と API のドキュメントを参照してください。

組み込みの処理ハンドル

組み込みのハンドルを使用してハンドラーを実装することで、カスタム ステートフル アプリケーション コア ロジックを実装 します

  • ハンドルは、状態の値とタイマーを操作し、受信レコードを処理し、レコードを出力するメソッドを提供します。
  • ハンドラーは、カスタム イベント ドリブン ロジックを定義します。

各状態の種類のハンドルは、基になるデータ構造に基づいて実装されますが、それぞれにレコードを取得、配置、更新、および削除する機能が含まれています。

ハンドラーは、次のセマンティクスを使用して、入力レコードまたはタイマーで観察されたイベントに基づいて実装されます。

  • handleInputRows メソッドを使用してハンドラーを定義し、データの処理方法、状態の更新、およびグループ化キーに対して処理されたレコードのマイクロ バッチごとにレコードが出力されるように制御します。 「 入力行の処理」を参照してください。
  • handleExpiredTimer メソッドを使用してハンドラーを定義し、時間ベースのしきい値を使用して、グループ化キーに対して追加のレコードが処理されるかどうかにかかわらず、ロジックを実行します。 「プログラムのタイミングイベント」を参照してください。

次の表に、これらのハンドラーでサポートされている機能動作の比較を示します。

行動 handleInputRows handleExpiredTimer
状態値を取得、配置、更新、またはクリアする イエス イエス
タイマーを作成または削除する イエス イエス
レコードを出力する イエス イエス
現在のマイクロ バッチ内のレコードを反復処理する イエス いいえ
経過時間に基づいてロジックをトリガーする いいえ イエス

handleInputRowshandleExpiredTimerを組み合わせて、必要に応じて複雑なロジックを実装できます。

たとえば、 handleInputRows を使用して各マイクロ バッチの状態値を更新し、タイマーを 10 秒後に設定するアプリケーションを実装できます。 追加のレコードが処理されない場合は、 handleExpiredTimer を使用して状態ストアの現在の値を出力できます。 グループ化キーに対して新しいレコードが処理される場合は、既存のタイマーをクリアして新しいタイマーを設定できます。

カスタム状態の種類

1 つのステートフル演算子に複数の状態オブジェクトを実装できます。 各状態オブジェクトに付ける名前は状態ストアに保持され、状態ストア リーダーでアクセスできます。 状態オブジェクトで StructTypeを使用する場合は、スキーマを渡しながら構造体内の各フィールドの名前を指定します。 これらの名前は、ステートストアを読み取るときにも表示されます。 「構造化ストリーミング状態情報の読み取り」をご覧ください。

組み込みのクラスと演算子によって提供される機能は、柔軟性と拡張性を提供することを目的としており、実装の選択は、アプリケーションで実行する必要がある完全なロジックによって通知する必要があります。 たとえば、フィールドValueStateおよびuser_idでグループ化されたsession_id、またはMapStateのキーであるuser_idでグループ化されたsession_idMapStateを使用して、ほぼ同じロジックを実装できます。 この例では、ロジックが複数のMapState間で条件を評価する必要がある場合は、session_idが推奨される実装である可能性があります。

次のセクションでは、 transformWithStateでサポートされる状態の種類について説明します。

ValueState

グループ化キーごとに、関連付けられた値があります。

値の状態には、構造体やタプルなどの複合型を含めることができます。 ValueStateを更新するときは、値全体を置き換えるロジックを実装します。 値の状態の TTL は、値が更新されるとリセットされますが、格納されているValueStateを更新せずにValueStateに一致するソース キーが処理された場合はリセットされません。

ListState

グループ化キーごとに、関連付けられたリストがあります。

リスト状態は値のコレクションであり、それぞれに複合型を含めることができます。 リスト内の各値には、独自の TTL があります。 リストに項目を追加するには、個々の項目を追加するか、項目のリストを追加するか、リスト全体を putで上書きします。 PUT 操作のみが TTL をリセットするための更新と見なされます。

MapState

グループ化キーごとに、関連付けられたマップがあります。 マップは、Python ディクテーションと同等の Apache Spark 機能です。

Von Bedeutung

グループ化キーは、構造化ストリーミング クエリの GROUP BY 句で指定されたフィールドを表します。 マップ状態には、グループ化キーの任意の数のキーと値のペアが含まれます。

たとえば、 user_id でグループ化し、各 session_idのマップを定義する場合、グループ化キーは user_id され、マップ内のキーは session_id

マップ状態は、複合型を含めることができる値にそれぞれマップされる個別のキーのコレクションです。 マップ内の各キーと値のペアには、独自の TTL があります。 特定のキーの値を更新することも、キーとその値を削除することもできます。 個々の値は、そのキーを使用して返したり、すべてのキーを一覧表示したり、すべての値を一覧表示したり、マップ内のキーと値のペアの完全なセットを操作するための反復子を返したりすることができます。

カスタム状態変数を初期化する

StatefulProcessorを初期化するときは、カスタム ロジックで状態オブジェクトを操作できる状態オブジェクトごとにローカル変数を作成します。 状態変数は、init クラスの組み込みのStatefulProcessor メソッドをオーバーライドすることによって定義および初期化されます。

getValueStateの初期化中に、getListStategetMapState、およびStatefulProcessorメソッドを使用して、任意の量の状態オブジェクトを定義します。

各状態オブジェクトには、次のものが必要です。

  • ユニークな名前
  • 指定されたスキーマ
    • Python では、スキーマは明示的に指定されます。
    • Scala で、状態スキーマを指定する Encoder を渡します。

また、オプションの Time to Live (TTL) 期間をミリ秒単位で指定することもできます。 マップ状態を実装する場合は、マップ キーと値に別のスキーマ定義を指定する必要があります。

状態情報のクエリ、更新、出力方法のロジックは、個別に処理されます。 「状態変数を使用する」を参照してください。

状態保持型アプリケーションの例

サポートされている各型の状態変数の例など、 transformWithStateを使用してカスタム ステートフル プロセッサを定義および使用するための基本的な構文を次に示します。 その他の例については、「 ステートフル アプリケーションの例」を参照してください。

Python では、状態値とのすべての対話にタプルが使用されます。 つまり、Python コードは、 putupdate などの操作を使用するときにタプルを使用して値を渡す必要があり、 getを使用するときにタプルを処理することを想定しています。

たとえば、値の状態のスキーマが単一の整数である場合は、次のようなコードを実装します。

current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0]  # Extracts the first item in the tuple
new_value = current_value + 1           # Calculate a new value
value_state.update((new_value,))        # Pass the new value formatted as a tuple

これは、 ListState 内の項目や MapState内の値にも当てはまります。

Python(プログラミング言語)

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("countAsString", StringType(), True),
    ]
)

class SimpleCounterProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    value_state_schema = StructType([StructField("count", IntegerType(), True)])
    list_state_schema = StructType([StructField("count", IntegerType(), True)])
    self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
    self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
    # Schema can also be defined using strings and SQL DDL syntax
    self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

  def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
    count = 0
    for pdf in rows:
      list_state_rows = [(120,), (20,)] # A list of tuples
      self.list_state.put(list_state_rows)
      self.list_state.appendValue((111,))
      self.list_state.appendList(list_state_rows)
      pdf_count = pdf.count()
      count += pdf_count.get("value")
    self.value_state.update((count,)) # Count is passed as a tuple
    iter = self.list_state.get()
    list_state_value = next(iter1)[0]
    value = count
    user_key = ("user_key",)
    if self.map_state.exists():
      if self.map_state.containsKey(user_key):
        value += self.map_state.getValue(user_key)[0]
    self.map_state.updateValue(user_key, (value,)) # Value is a tuple
    yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=SimpleCounterProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
)

スカラ (プログラミング言語)

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
  @transient private var countState: ValueState[Int] = _
  @transient private var listState: ListState[Int] = _
  @transient private var mapState: MapState[String, Int] = _

  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    countState = getHandle.getValueState[Int]("countState",
      Encoders.scalaLong, TTLConfig.NONE)
    listState = getHandle.getListState[Int]("listState",
      Encoders.scalaInt, TTLConfig.NONE)
    mapState = getHandle.getMapState[String, Int]("mapState",
      Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, String)],
      timerValues: TimerValues): Iterator[(String, String)] = {
    var count = countState.getOption().getOrElse(0)
    for (row <- inputRows) {
      val listData = Array(120, 20)
      listState.put(listData)
      listState.appendValue(count)
      listState.appendList(listData)
      count += 1
    }
    val iter = listState.get()
    var listStateValue = 0
    if (iter.hasNext) {
      listStateValue = iter.next()
    }
    countState.update(count)
    var value = count
    val userKey = "userKey"
    if (mapState.exists()) {
      if (mapState.containsKey(userKey)) {
        value += mapState.getValue(userKey)
      }
    }
    mapState.updateValue(userKey, value)
    Iterator((key, count.toString))
  }
}

val q = spark
        .readStream
        .format("delta")
        .load("$srcDeltaTableDir")
        .as[(String, String)]
        .groupByKey(x => x._1)
        .transformWithState(
            new SimpleCounterProcessor(),
            TimeMode.None(),
            OutputMode.Update(),
        )
        .writeStream...

StatefulProcessorHandle

PySpark には、ユーザー定義の Python コードと状態情報の対話方法を制御する関数へのアクセスを提供する StatefulProcessorHandle クラスが含まれています。 StatefulProcessorHandleを初期化するときは、常にhandleをインポートしてStatefulProcessor変数に渡す必要があります。

handle変数は、Python クラスのローカル変数を状態変数に結び付けます。

Scala では、 getHandle メソッドを使用します。

初期状態を指定する

必要に応じて、最初のマイクロバッチで使用する初期状態を指定できます。 これは、既存のワークフローを新しいカスタム アプリケーションに移行する場合、ステートフル 演算子をアップグレードしてスキーマまたはロジックを変更する場合、または自動的に修復できず手動による介入が必要なエラーを修復する場合に便利です。

状態ストア リーダーを使用して、既存のチェックポイントから状態情報を照会します。 「構造化ストリーミング状態情報の読み取り」をご覧ください。

既存の Delta テーブルをステートフル アプリケーションに変換する場合は、 spark.read.table("table_name") を使用してテーブルを読み取り、結果の DataFrame を渡します。 必要に応じて、新しいステートフル アプリケーションに準拠するようにフィールドを選択または変更できます。

入力行と同じグループ化キー スキーマを持つ DataFrame を使用して初期状態を指定します。

Python では、 handleInitialState を使用して、 StatefulProcessorの定義中に初期状態を指定します。 Scala では、個別のクラス StatefulProcessorWithInitialStateが使用されます。

状態変数を使用する

サポートされている状態オブジェクトは、状態の取得、既存の状態情報の更新、または現在の状態のクリアを行うメソッドを提供します。 サポートされている各状態の型には、実装されるデータ構造に対応するメソッドの一意の実装があります。

観察された各グループ化キーには、専用の状態情報があります。

  • レコードは、実装するロジックに基づいて生成され、指定した出力スキーマを使用します。 レコードの生成について参照してください。
  • statestore リーダーを使用して、状態ストアの値にアクセスできます。 このリーダーにはバッチ機能があり、待ち時間の短いワークロードを対象としていません。 「構造化ストリーミング状態情報の読み取り」をご覧ください。
  • handleInputRowsを使用して指定されたロジックは、キーのレコードがマイクロバッチ内に存在する場合にのみ発生します。 「 入力行の処理」を参照してください。
  • handleExpiredTimerを使用して、レコードの観測に依存せずに発火する、時間ベースのロジックを実装します。 「プログラムのタイミングイベント」を参照してください。

状態オブジェクトは、次の意味でキーをグループ化することによって分離されます。

  • 状態値は、別のグループ化キーに関連付けられているレコードの影響を受けません。
  • 値の比較またはグループ化キー間の状態の更新に依存するロジックを実装 することはできません

グループ化キー内の値を比較できます。 MapStateを使用して、カスタム ロジックで使用できる 2 番目のキーを持つロジックを実装します。 たとえば、user_idでグループ化し、IPアドレスを用いてMapStateでキー設定を行うことで、同時ユーザーセッションを追跡するロジックを実装できます。

状態を操作するための高度な考慮事項

状態変数に書き込むと、RocksDB への書き込みがトリガーされます。 パフォーマンスを最適化するために、Databricks では、特定のキーの反復子内のすべての値を処理し、可能な限り 1 回の書き込みで更新をコミットすることをお勧めします。

状態の更新は耐障害性があります。 マイクロバッチの処理が完了する前にタスクがクラッシュした場合、最後に成功したマイクロバッチの値が再試行時に使用されます。

状態の値には、組み込みの既定値はありません。 ロジックで既存の状態情報を読み取る必要がある場合は、ロジックの実装中に exists メソッドを使用します。

MapState 変数には、個々のキーを確認したり、null 状態のロジックを実装するためにすべてのキーを一覧表示したりする追加機能があります。

レコードを出力する

ユーザー定義ロジックは、 transformWithState がレコードを出力する方法を制御します。 レコードはグループ化キーごとに出力されます。

カスタム ステートフル アプリケーションでは、レコードを出力する方法を決定するときに状態情報がどのように使用されるかを想定しません。また、特定の条件に対して返されるレコードの数は、none、1、または Many のいずれかになります。

handleInputRowsまたはhandleExpiredTimerを使用してレコードを出力するロジックを実装します。 「入力行の処理」および「プログラムの時間指定イベント」を参照してください。

複数の状態値を実装し、レコードを出力するための複数の条件を定義できますが、出力されるすべてのレコードで同じスキーマを使用する必要があります。

Python(プログラミング言語)

Python では、outputStructTypeの呼び出し中に、transformWithStateInPandas キーワードを使用して出力スキーマを定義します。

pandas DataFrame オブジェクトと yieldを使用してレコードを出力します。

必要に応じて、空の DataFrame を yield できます。 update出力モードと組み合わせると、空の DataFrame を出力すると、グループ化キーの値が null に更新されます。

スカラ (プログラミング言語)

Scala では、 Iterator オブジェクトを使用してレコードを出力します。 出力のスキーマは、出力されたレコードから派生します。

必要に応じて、空の Iteratorを出力できます。 update出力モードと組み合わせると、空のIteratorを出力すると、グループ化キーの値が null に更新されます。

入力行の処理

handleInputRows メソッドを使用して、ストリーミング クエリで観察されたレコードが状態値と対話して更新する方法のロジックを定義します。 handleInputRows メソッドで定義したハンドラーは、Structured Streaming クエリによってレコードが処理されるたびに実行されます。

transformWithStateで実装されているほとんどのステートフル アプリケーションでは、コア ロジックは handleInputRows を使用して定義されます。

処理されたマイクロバッチ更新ごとに、特定のグループ化キーのマイクロバッチ内のすべてのレコードを反復子を使用して使用できます。 ユーザー定義ロジックは、現在のマイクロバッチのすべてのレコードとステートストア内の値と対話できます。

プログラムのタイムドイベント

タイマーを使用すると、指定した条件からの経過時間に基づいてカスタム ロジックを実装できます。

タイマーを操作する場合は、 handleExpiredTimer メソッドを実装します。

グループ化キー内では、タイマーはタイムスタンプによって一意に識別されます。

タイマーの有効期限が切れると、結果はアプリケーションに実装されているロジックによって決まります。 一般的なパターンは次のとおりです。

  • 状態変数に格納されている情報を出力します。
  • 格納されている状態情報の削除。
  • 新しいタイマーの作成。

期限が切れたタイマーは、関連付けられているキーのレコードがマイクロバッチで処理されない場合でもトリガーされます。

タイム モデルを指定する

StatefulProcessortransformWithStateに渡す場合は、タイム モデルを指定する必要があります。 次のオプションがサポートされています。

  • ProcessingTime
  • EventTime
  • NoTime または TimeMode.None()

NoTimeを指定すると、プロセッサでタイマーがサポートされないことを意味します。

組み込みのタイマー値

Databricks では、カスタム ステートフル アプリケーションでシステム クロックを呼び出すのは推奨しません。これにより、タスクの失敗時に再試行が信頼性に欠ける可能性があります。 処理時間または基準値にアクセスする必要がある場合は、 TimerValues クラスのメソッドを使用します。

TimerValues 説明
getCurrentProcessingTimeInMs エポック以降の現在のバッチの処理時間のタイムスタンプをミリ秒単位で返します。
getCurrentWatermarkInMs エポック以降のミリ秒単位で現在のバッチのウォーターマークのタイムスタンプを返します。

処理時間は、マイクロバッチが Apache Spark によって処理される時間を表します。 Kafka などの多くのストリーミング ソースには、システム処理時間も含まれます。

ストリーミング クエリの透かしは、多くの場合、イベント時間またはストリーミング ソースの処理時間に対して定義されます。 「透かしを適用してデータ処理のしきい値を制御する」を参照してください。

透かしとウィンドウの両方を transformWithStateと組み合わせて使用できます。 TTL、タイマー、 MapState または ListState 機能を利用して、カスタム ステートフル アプリケーションに同様の機能を実装できます。

状態の有効期間 (TTL) とは何ですか?

transformWithState で使用される状態値は、オプションの Time to Live (TTL) 仕様をサポートします。 TTL の有効期限が切れると、値は状態ストアから削除されます。 TTL は状態ストア内の値とのみ対話します。つまり、状態情報を削除するロジックを実装できますが、TTL によって状態値が削除されるため、ロジックを直接トリガーすることはできません。

Von Bedeutung

TTL を実装しない場合は、無限の状態の増加を回避するために、他のロジックを使用して状態の削除を処理する必要があります。

TTL は状態値ごとに適用され、状態の種類ごとに異なる規則が適用されます。

  • 状態変数のスコープは、キーのグループ化に設定されます。
  • ValueState オブジェクトの場合、グループ化キーごとに格納される値は 1 つだけです。 TTL はこの値に適用されます。
  • ListStateオブジェクトの場合、リストには多数の値を含めることができます。 TTL は、リスト内の各値に個別に適用されます。
  • MapState オブジェクトの場合、各マップ キーには関連付けられた状態値があります。 TTL は、マップ内の各キーと値のペアに個別に適用されます。

すべての状態の種類について、状態情報が更新されると TTL がリセットされます。

TTL のスコープは ListState内の個々の値ですが、リスト内の値を更新する唯一の方法は、 put メソッドを使用して ListState 変数の内容全体を上書きすることです。

タイマーと TTL の違いは何ですか?

状態変数にはタイマーと Time to Live (TTL) の間に重複がいくつかありますが、タイマーは TTL よりも広範な機能セットを提供します。

TTL は、ユーザーが指定した期間更新されていない状態情報を削除します。 これにより、ユーザーはチェックされていない状態の増加を防ぎ、古い状態エントリを削除できます。 マップとリストは値ごとに TTL を実装するため、TTL を設定することで最近更新された状態値のみを考慮する関数を実装できます。

タイマーを使用すると、レコードの出力など、状態の削除以外のカスタム ロジックを定義できます。 必要に応じて、タイマーを使用して特定の状態値の状態情報をクリアできます。さらに、値を出力したり、タイマーに基づいて他の条件付きロジックをトリガーしたりする柔軟性が向上します。