create_auto_cdc_flow()
関数は、Lakeflow 宣言パイプライン変更データ キャプチャ (CDC) 機能を使用して、変更データ フィード (CDF) からのソース データを処理するフローを作成します。
注
この関数は、前の関数の apply_changes()
を置き換えます。 2 つの関数のシグネチャは同じです。 Databricks では、新しい名前を使用するように更新することをお勧めします。
重要
変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。
create_auto_cdc_flow()
ターゲット テーブルのスキーマを指定する場合は、__START_AT
フィールドと同じデータ型で __END_AT
および sequence_by
列を含める必要があります。
必要なターゲット テーブルを作成するには、Lakeflow 宣言型パイプライン Python インターフェイスで create_streaming_table() 関数を使用します。
構文
import dlt
dlt.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
create_auto_cdc_flow
を処理する場合、INSERT
イベントと UPDATE
イベントの既定の動作では、ソースから CDC イベントを "アップサート" します。つまり、指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。
DELETE
イベントの処理は、apply_as_deletes
パラメーターを使用して指定できます。
変更フィードを使用した CDC 処理の詳細については、「 AUTO CDC API: Lakeflow 宣言型パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
create_auto_cdc_flow()
関数の使用例については、「例: CDF ソース データを使用した SCD タイプ 1 と SCD タイプ 2 の処理」を参照してください。
パラメーター
パラメーター | タイプ | 説明 |
---|---|---|
target |
str |
必須。 更新するテーブルの名前。
create_streaming_table() 関数を使用して、create_auto_cdc_flow() 関数を実行する前にターゲット テーブルを作成できます。 |
source |
str |
必須。 CDC レコードを含むデータ ソース。 |
keys |
list |
必須。 ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。 次のいずれかを指定できます。
|
sequence_by |
str 、col() 、または struct() |
必須。 ソース データ内の CDC イベントの論理順序を指定する列名。 Lakeflow 宣言パイプラインでは、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。 指定された列は、並べ替え可能なデータ型である必要があります。 次のいずれかを指定できます。
|
ignore_null_updates |
bool |
ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、ignore_null_updates が True の場合、null を持つ列はターゲット内の既存の値を保持します。 これは、null の値を持つ入れ子になった列にも適用されます。
ignore_null_updates が False の場合、既存の値は null 値で上書きされます。既定値は False です。 |
apply_as_deletes |
str または expr() |
CDC イベントをアップサート処理ではなく DELETE として扱う必要がある場合を指定します。 次のいずれかを指定できます。
順序の誤ったデータを処理するために、削除された行は基になる Delta テーブルの廃棄標識として一時的に保持され、これらの廃棄標識をフィルターで除外するビューがメタストアに作成されます。 保持間隔は、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。 |
apply_as_truncates |
str または expr() |
CDC イベントを完全なテーブル TRUNCATE として扱う必要がある場合に指定します。 次のいずれかを指定できます。
この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースでのみ使用する必要があります。 この apply_as_truncates パラメーターは、SCD タイプ 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。 |
column_list または except_column_list |
list |
ターゲット テーブルに含める列のサブセット。
column_list を使用して、含める列の完全な一覧を指定します。
except_column_list を使用して、除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。 既定では、column_list 引数または except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。 |
stored_as_scd_type |
str または int |
レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。 SCD タイプ 1 の場合は 1 に、SCD タイプ 2 の場合は 2 に設定します。 既定値は SCD タイプ 1 です。 |
track_history_column_list または track_history_except_column_list |
list |
ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。
track_history_column_list を使用して、追跡する列の完全なリストを指定します。
track_history_except_column_list を使用して、追跡から除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。
col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。 既定では、track_history_column_list 引数または track_history_except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。 |