状態に保持されているデータを効果的に管理するには、集計、結合、重複除去など、Lakeflow 宣言パイプラインでステートフル ストリーム処理を実行するときにウォーターマークを使用します。 この記事では、Lakeflow 宣言型パイプライン クエリでウォーターマークを使用する方法について説明し、推奨される操作の例を示します。
注
集計を実行するクエリが増分処理され、更新ごとに完全に再計算されないようにするには、透かしを使用する必要があります。
ウォーターマークとは?
ストリーム処理では、 ウォーターマーク は、集計などのステートフル操作を実行するときにデータを処理するための時間ベースのしきい値を定義できる Apache Spark 機能です。 到着したデータは、しきい値に達するまで処理され、その時点でしきい値によって定義された時間枠が閉じられます。 ウォーターマークを使用すると、クエリの処理中に、主に大規模なデータセットの処理や実行時間の長い処理を行う場合の問題を回避できます。 このような問題には、結果が出るまでの待機時間が長いことや、状態に保持されるデータ量が多いためにメモリ不足 (OOM) エラーが発生することが含まれることもあります。 ストリーミング データは本質的に順序付けされないため、ウォーターマークは時間枠の集計などの正しい計算操作もサポートします。
ストリーム処理でウォーターマークを使用する方法の詳細については、「Apache Spark Structured Streaming でのウォーターマーク」および「データ処理のしきい値を制御するためのウォーターマークの適用」を参照してください。
ウォーターマークを定義する方法は?
基準値を定義するには、タイムスタンプ フィールドと、 到着遅延データ の時間しきい値を表す値を指定します。 定義された時間しきい値の後に到着した場合、データは遅延と見なされます。 たとえば、しきい値が 10 分として定義されている場合、10 分のしきい値の後に到着したレコードは削除される場合があります。
定義されたしきい値の後に到着するレコードは削除される可能性があるため、待機時間と正確性の要件を満たすしきい値を選択することが重要です。 しきい値を小さくすると、レコードの出力が早くなりますが、遅延レコードが削除される可能性が高くなります。 しきい値を大きくすると、待機時間は長くなりますが、データの完全性が高くなる可能性があります。 しきい値を大きくすると、状態サイズが大きいため、追加のコンピューティング リソースが必要になる場合もあります。 しきい値はデータと処理の要件に依存するため、最適なしきい値を決定するには、処理のテストと監視が重要です。
Python で withWatermark()
関数を使用して、透かしを定義します。 SQL では、WATERMARK
句を使用してウォーターマークを定義します。
Python(プログラミング言語)
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
ストリーム間の結合でウォーターマークを使用する
ストリーム間の結合の場合は、結合の両側にウォーターマークと時間間隔句を定義する必要があります。 各結合ソースにはデータの不完全なビューがあるため、それ以上一致できない場合にストリーミング エンジンに通知するには、時間間隔句が必要です。 時間間隔句では、ウォーターマークの定義に使用するのと同じフィールドを使用する必要があります。
各ストリームでウォーターマークに異なるしきい値が必要になる場合があるため、ストリームに同じしきい値を設定する必要はありません。 データの欠落を回避するために、ストリーミングエンジンは、最も遅いストリームに基づいて1つの共通基準点を維持します。
次の例では、広告インプレッションのストリームと、ユーザーが広告をクリックしたストリームを結合します。 この例では、クリックはインプレッションから 3 分以内に発生する必要があります。 3 分の時間間隔が経過すると、一致しなくなった状態の行が削除されます。
Python(プログラミング言語)
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
ウォーターマークを使用してウィンドウ集計を実行する
ストリーミング データに対する一般的なステートフル操作は、ウィンドウ集計です。 ウィンドウ集計はグループ化された集計に似ていますが、定義されたウィンドウの一部である行のセットに対して集計値が返される点が異なります。
ウィンドウは特定の長さとして定義でき、そのウィンドウの一部であるすべての行に対して集計操作を実行できます。 Spark ストリーミングでは、次の 3 種類のウィンドウがサポートされています。
- タンブリング (固定) ウィンドウ: 固定サイズで重複しない一連の連続する時間間隔です。 入力レコードは 1 つのウィンドウにのみ属します。
- スライディング ウィンドウ: タンブリング ウィンドウと同様に、スライディング ウィンドウは固定サイズですが、ウィンドウが重複してレコードが複数のウィンドウに分類される場合があります。
データがウィンドウの末尾を越えて透かしの長さを超えて到着すると、ウィンドウに対して新しいデータは受け入れられなかったり、集計の結果が出力されたり、ウィンドウの状態が削除されたりします。
次の例では、固定ウィンドウを使用して 5 分ごとにインプレッションの合計を計算します。 この例では、select 句はエイリアス impressions_window
を使用し、ウィンドウ自体は GROUP BY
句の一部として定義されています。 ウィンドウは、透かしと同じタイムスタンプ列 (この例の clickTimestamp
列) に基づいている必要があります。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Python の同様の例では、時間単位の固定ウィンドウで利益を計算します。
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
ストリーミング レコードの重複除去
構造化ストリーミングでは、1 回だけ処理が保証されますが、データ ソースからレコードが自動的に重複除去されることはありません。 たとえば、多くのメッセージ キューには少なくとも 1 回の保証があるため、これらのメッセージ キューの 1 つから読み取るときに重複するレコードが想定されるはずです。 dropDuplicatesWithinWatermark()
関数を使用すると、指定したフィールドのレコードを重複除去できます。これにより、一部のフィールドが異なる場合 (イベント時刻や到着時刻など) でも、ストリームから重複を削除できます。 この dropDuplicatesWithinWatermark()
関数を使用するには、ウォーターマークを指定する必要があります。 基準値で指定された時間内に到着した重複データはすべて削除されます。
順序付けされたデータは重要です。なぜなら、順序が乱れたデータが原因で、ウォーターマーク値が不正確に進んでしまうためです。 その後、より古いデータが到着すると、遅延と見なされ、削除されます。 withEventTimeOrder
オプションを使用して、ウォーターマークで指定されたタイムスタンプに基づいて初期スナップショットを順番に処理します。 withEventTimeOrder
オプションは、データセットを定義するコードまたは を使用してspark.databricks.delta.withEventTimeOrder.enabled
で宣言できます。 例えば次が挙げられます。
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
注
withEventTimeOrder
オプションは Python でのみサポートされています。
次の例では、データは clickTimestamp
順に処理され、重複する userId
と clickAdId
列を含む互いに 5 秒以内に到着したレコードが削除されます。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
ステートフル処理用にパイプライン構成を最適化する
運用環境の問題や過剰な待機時間を防ぐために、Databricks では、特に処理で大量の中間状態を保存する必要がある場合に、ステートフル ストリーム処理に対して RocksDB ベースの状態管理を有効にすることをお勧めします。
サーバーレス パイプラインでは、状態ストアの構成が自動的に管理されます。
パイプラインをデプロイする前に次の構成を設定することで、RocksDB ベースの状態管理を有効にすることができます。
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
RocksDB の構成に関する推奨事項など、RocksDB 状態ストアの詳細については、「Azure Databricks で RocksDB 状態ストアを構成する」を参照してください。