次の方法で共有


スナップショットからの自動CDC作成フロー

Von Bedeutung

この機能はパブリック プレビュー段階です。

create_auto_cdc_from_snapshot_flow関数は、Lakeflow 宣言パイプライン変更データ キャプチャ (CDC) 機能を使用して、データベース スナップショットからのソース データを処理するフローを作成します。 AUTO CDC FROM SNAPSHOT API を使用して CDC を実装する方法を参照してください。

この関数は、前の関数の apply_changes_from_snapshot()を置き換えます。 2 つの関数のシグネチャは同じです。 Databricks では、新しい名前を使用するように更新することをお勧めします。

Von Bedeutung

この操作にはターゲット ストリーミング テーブルが必要です。

必要なターゲット テーブルを作成するには、 create_streaming_table() 関数を使用します。

構文

import dlt

dlt.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

AUTO CDC FROM SNAPSHOT を処理する場合、既定の動作では、ターゲットに同じキーを持つ一致するレコードが存在しないときに、新しい行を挿入します。 一致するレコードが存在する場合は、行の値のいずれかが変更された場合にのみ更新されます。 ターゲットに存在するが、ソースには存在しなくなったキーを持つ行は削除されます。

スナップショットを使用した CDC 処理の詳細については、「 AUTO CDC API: Lakeflow 宣言パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。 create_auto_cdc_from_snapshot_flow() 関数の使用例については、定期的なスナップショット インジェスト履歴スナップショット インジェストの例を参照してください。

パラメーター

パラメーター タイプ 説明
target str 必須。 更新するテーブルの名前。 create_streaming_table() 関数を使用して、create_auto_cdc_from_snapshot_flow() 関数を実行する前にターゲット テーブルを作成できます。
source str または lambda function 必須。 定期的にスナップショットを作成するテーブルまたはビューの名前、または処理されるスナップショット DataFrame とスナップショット バージョンを返す Python ラムダ関数。 source引数の実装を参照してください。
keys list 必須。 ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。 次のいずれかを指定できます。
  • 文字列の一覧: ["userId", "orderId"]
  • Spark SQL col() 関数の一覧: [col("userId"), col("orderId"]
    col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。
stored_as_scd_type str または int レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。 SCD タイプ 1 の場合は 1 に、SCD タイプ 2 の場合は 2 に設定します。 既定値は SCD タイプ 1 です。
track_history_column_list または track_history_except_column_list list ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。 track_history_except_column_listを使用して、追跡から除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。 既定では、track_history_column_list 引数または track_history_except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。

source 引数を実装する

create_auto_cdc_from_snapshot_flow() 関数には source 引数が含まれます。 履歴スナップショットを処理する場合、source 引数は、処理されるスナップショット データが含まれる Python DataFrame と、スナップショット バージョンの 2 つの値を create_auto_cdc_from_snapshot_flow() 関数に返す、Python ラムダ関数であることが期待されます。

次に、ラムダ関数のシグネチャを示します。

lambda Any => Optional[(DataFrame, Any)]
  • ラムダ関数に対する引数は、直近で処理されたスナップショット バージョンです。
  • ラムダ関数の戻り値は、None または 2 つの値のタプルです。タプルの最初の値は、処理されるスナップショットが含まれる DataFrame です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。

ラムダ関数を実装して呼び出す例:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Lakeflow 宣言型パイプライン ランタイムは、 create_auto_cdc_from_snapshot_flow() 関数を含むパイプラインがトリガーされるたびに、次の手順を実行します。

  1. next_snapshot_and_version 関数を実行して、次のスナップショット DataFrame と対応するスナップショット バージョンを読み込みます。
  2. DataFrame が返されない場合、その実行は終了し、パイプラインの更新は完了としてマークされます。
  3. 新しいスナップショットの変更を検出し、それらをターゲット テーブルに増分的に適用します。
  4. ステップ #1に戻り、次のスナップショットとそのバージョンを読み込みます。