次の方法で共有


ストリーミング テーブル

ストリーミング テーブルは、ストリーミングまたは増分データ処理の追加サポートを備えた Delta テーブルです。 ストリーミング テーブルは、ETL パイプライン内の 1 つ以上のフローの対象にすることができます。

ストリーミング テーブルは、次の理由からデータ インジェストに適しています。

  • 各入力行は 1 回だけ処理されます。これは、インジェスト ワークロードの大部分をモデル化します (つまり、行をテーブルに追加またはアップサートすることによって)。
  • 追記可能な大量のデータを処理できます。

ストリーミング テーブルは、次の理由から、待機時間の短いストリーミング変換にも適しています。

  • 行と時間枠に関する理由
  • 大量のデータを処理する
  • 待機時間が短い

次の図は、ストリーミング テーブルのしくみを示しています。

ストリーミング テーブルのしくみを示す図

各更新で、ストリーミング テーブルに関連付けられているフローは、ストリーミング ソース内の変更された情報を読み取り、そのテーブルに新しい情報を追加します。

ストリーミング テーブルは、1 つのパイプラインによって定義および更新されます。 パイプラインのソース コードでストリーミング テーブルを明示的に定義します。 パイプラインによって定義されたテーブルは、他のパイプラインでは変更または更新できません。 1 つのストリーミング テーブルに追加する複数のフローを定義できます。

Databricks SQL でパイプラインの外部にストリーミング テーブルを作成すると、Databricks によって、このテーブルの更新に使用される非表示のパイプラインが作成されます。

フローの詳細については、「 Lakeflow 宣言型パイプライン フローを使用したデータの増分読み込みと処理」を参照してください。

インジェスト用のストリーミング テーブル

ストリーミング テーブルは、追加専用のデータ ソース用に設計され、入力を 1 回だけ処理します。

次の例は、ストリーミング テーブルを使用してクラウド ストレージから新しいファイルを取り込む方法を示しています。

Python(プログラミング言語)

import dlt

# create a streaming table
@dlt.table
def customers_bronze():
  return (
    spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .option("cloudFiles.inferColumnTypes", "true")
     .load("/Volumes/path/to/files")
  )

データセット定義で spark.readStream 関数を使用すると、Lakeflow 宣言パイプラインによってデータセットがストリームとして扱われ、作成されたテーブルがストリーミング テーブルになります。

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
  "/volumes/path/to/files",
  format => "json"
);

ストリーミング テーブルへのデータの読み込みの詳細については、「 Lakeflow 宣言型パイプラインを使用したデータの読み込み」を参照してください。

次の図は、追加専用ストリーミング テーブルのしくみを示しています。

追加専用ストリーミング テーブルのしくみを示す図

ストリーミング テーブルに既に追加されている行は、パイプラインに対する後の更新では再クエリされません。 クエリを変更する場合 (たとえば、 SELECT LOWER (name) から SELECT UPPER (name))、既存の行は大文字に更新されませんが、新しい行は大文字になります。 完全更新をトリガーして、ソース テーブルから以前のすべてのデータを再クエリして、ストリーミング テーブル内のすべての行を更新できます。

ストリーミング テーブルと低遅延ストリーミング

ストリーミング テーブルは、境界付き状態での待機時間の短いストリーミング用に設計されています。 ストリーミング テーブルではチェックポイント管理が使用されるため、待機時間の短いストリーミングに適しています。 ただし、そのために、ストリームが自然に区切られていること、またはウォーターマークによって区切られていることが想定されています。

自然に境界付けられたストリームは、明確に定義された開始と終了を持つストリーミング データ ソースによって生成されます。 自然境界ストリームの例として、ファイルの最初のバッチが配置された後に新しいファイルが追加されないファイルのディレクトリからデータを読み取る方法があります。 ストリームは、ファイルの数が有限であるため、境界付けされたと見なされ、すべてのファイルが処理された後にストリームが終了します。

ウォーターマークを使用してストリームをバインドすることもできます。 Spark の構造化ストリーミングでのウォーターマークは、システムが時間枠を完了と見なすまで遅延イベントを待機する期間を指定することで遅れたデータを処理するのに役立つメカニズムです。 ウォーターマークがない無制限のストリームは、メモリ圧力によりパイプラインが失敗する可能性があります。

ステートフル ストリーム処理の詳細については、「ウォーターマークを使用して Lakeflow 宣言パイプラインでステートフル処理を最適化する」を参照してください。

ストリーム スナップショット結合

ストリーム スナップショット結合は、ストリームと、ストリームの開始時にスナップショットが作成されるディメンションの間の結合です。 ディメンション テーブルはスナップショットとして扱われるため、ストリームの開始後にディメンションが変更された場合、これらの結合は再計算されません。また、ディメンション テーブルを再読み込みまたは更新しない限り、ストリームの開始後のディメンション テーブルへの変更は反映されません。 これは、結合で小さな不一致を受け入れる場合に妥当な動作です。 たとえば、トランザクションの数が顧客数よりも桁違いに多い場合は、おおよその結合が許容されます。

次のコード例では、ディメンション テーブル (顧客) と、増え続けるデータセットを含む 2 行のトランザクションを結合します。 sales_reportと呼ばれるテーブル内のこれら 2 つのデータセット間の結合を具体化します。 外部プロセスが新しい行 (customer_id=3, name=Zoya) を追加して顧客テーブルを更新した場合、静的ディメンション テーブルはストリームの開始時にスナップショット化されたため、この新しい行は結合に存在しないことに注意してください。

import dlt

@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
  return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
  return spark.read.table("customers")

@dlt.table
def sales_report():
  facts = spark.readStream.table("v_transactions")
  dims = spark.read.table("v_customers")

  return (
    facts.join(dims, on="customer_id", how="inner"
  )

ストリーミング テーブルの制限事項

ストリーミング テーブルには、次の制限があります。

  • 限られた進化: データセット全体を再計算せずにクエリを変更できます。 ストリーミング テーブルには 1 行しか表示されないため、異なる行で異なるクエリを操作できます。 つまり、データセットで実行されているすべての以前のバージョンのクエリに注意する必要があります。 既に処理されているストリーミング テーブルのデータを更新するには、完全な更新が必要です。
  • ステート管理: ストリーミングテーブルは低遅延のため、操作するストリームが自然に境界付けされているか、または透かしを使用して境界付けされているかを確認する必要があります。 詳細については、「ウォーターマークを使用して Lakeflow 宣言パイプラインのステートフル処理を最適化する」を参照してください。
  • 結合は再計算されません。 ストリーミング テーブルの結合は、ディメンションが変更されたときに再計算されません。 この特性は、"高速だが間違った" シナリオに適している可能性があります。 ビューを常に正しくする場合は、具体化されたビューを使用できます。 具体化されたビューは、ディメンションが変更されたときに自動的に結合を再計算するため、常に正しいです。 詳細については、「 具体化されたビュー」を参照してください。