次の方法で共有


自動CDCフローを作成

create_auto_cdc_flow()関数は、Lakeflow 宣言パイプライン変更データ キャプチャ (CDC) 機能を使用して、変更データ フィード (CDF) からのソース データを処理するフローを作成します。

この関数は、前の関数の apply_changes()を置き換えます。 2 つの関数のシグネチャは同じです。 Databricks では、新しい名前を使用するように更新することをお勧めします。

重要

変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 create_auto_cdc_flow() ターゲット テーブルのスキーマを指定する場合は、__START_AT フィールドと同じデータ型で __END_AT および sequence_by 列を含める必要があります。

必要なターゲット テーブルを作成するには、Lakeflow 宣言型パイプライン Python インターフェイスで create_streaming_table() 関数を使用します。

構文

import dlt

dlt.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

create_auto_cdc_flow を処理する場合、INSERT イベントと UPDATE イベントの既定の動作では、ソースから CDC イベントを "アップサート" します。つまり、指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETE イベントの処理は、apply_as_deletes パラメーターを使用して指定できます。

変更フィードを使用した CDC 処理の詳細については、「 AUTO CDC API: Lakeflow 宣言型パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。 create_auto_cdc_flow() 関数の使用例については、「例: CDF ソース データを使用した SCD タイプ 1 と SCD タイプ 2 の処理」を参照してください。

パラメーター

パラメーター タイプ 説明
target str 必須。 更新するテーブルの名前。 create_streaming_table() 関数を使用して、create_auto_cdc_flow() 関数を実行する前にターゲット テーブルを作成できます。
source str 必須。 CDC レコードを含むデータ ソース。
keys list 必須。 ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。 次のいずれかを指定できます。
  • 文字列の一覧: ["userId", "orderId"]
  • Spark SQL col() 関数の一覧: [col("userId"), col("orderId")]col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。
sequence_by strcol()、または struct() 必須。 ソース データ内の CDC イベントの論理順序を指定する列名。 Lakeflow 宣言パイプラインでは、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。 指定された列は、並べ替え可能なデータ型である必要があります。 次のいずれかを指定できます。
  • 文字列: "sequenceNum"
  • Spark SQL col() 関数: col("sequenceNum")col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。
  • struct() 複数の列を組み合わせて同順位を解消する: struct("timestamp_col", "id_col")、まず最初の構造体フィールドで順序を付け、同順位がある場合は次に2番目のフィールドで並べ替えを行います。
ignore_null_updates bool ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、ignore_null_updatesTrue の場合、null を持つ列はターゲット内の既存の値を保持します。 これは、null の値を持つ入れ子になった列にも適用されます。 ignore_null_updatesFalse の場合、既存の値は null 値で上書きされます。
既定値は Falseです。
apply_as_deletes str または expr() CDC イベントをアップサート処理ではなく DELETE として扱う必要がある場合を指定します。 次のいずれかを指定できます。
  • 文字列: "Operation = 'DELETE'"
  • Spark SQL expr() 関数: expr("Operation = 'DELETE'")

順序の誤ったデータを処理するために、削除された行は基になる Delta テーブルの廃棄標識として一時的に保持され、これらの廃棄標識をフィルターで除外するビューがメタストアに作成されます。 保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。
apply_as_truncates str または expr() CDC イベントを完全なテーブル TRUNCATE として扱う必要がある場合に指定します。 次のいずれかを指定できます。
  • 文字列: "Operation = 'TRUNCATE'"
  • Spark SQL expr() 関数: expr("Operation = 'TRUNCATE'")

この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースでのみ使用する必要があります。 この apply_as_truncates パラメーターは、SCD タイプ 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。
column_list または except_column_list list ターゲット テーブルに含める列のサブセット。 column_list を使用して、含める列の完全な一覧を指定します。 except_column_list を使用して、除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。 既定では、column_list 引数または except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。
stored_as_scd_type str または int レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。 SCD タイプ 1 の場合は 1 に、SCD タイプ 2 の場合は 2 に設定します。 既定値は SCD タイプ 1 です。
track_history_column_list または track_history_except_column_list list ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。 track_history_except_column_listを使用して、追跡から除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。 既定では、track_history_column_list 引数または track_history_except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。