AUTO CDC ... INTO
ステートメントを使用して、Lakeflow 宣言パイプライン変更データ キャプチャ (CDC) 機能を使用するフローを作成します。 このステートメントは、CDC ソースから変更を読み取り、ストリーミング ターゲットに適用します。
- CDC の詳細については、「 変更データ キャプチャ (CDC) とは」を参照してください。
-
AUTO CDC
の使用の詳細については、「AUTO CDC API: Lakeflow 宣言型パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。 -
CREATE FLOW
の詳細については、「CREATE FLOW (Lakeflow 宣言パイプライン)」を参照してください。
構文
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
ターゲットのデータ品質制約は、他の Lakeflow 宣言パイプライン クエリと同じ CONSTRAINT
句を使用して定義します。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
INSERT
イベントと UPDATE
イベントの既定の動作では、ソースから CDC イベントを upsert します。指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。
DELETE
イベントの処理は、APPLY AS DELETE WHEN
条件で指定できます。
重要
変更を適用するには、ターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 SCD タイプ 2 テーブルの場合、ターゲット テーブルのスキーマを指定する場合は、__START_AT
フィールドと同じデータ型の__END_AT
列とsequence_by
列も含める必要があります。
「AUTO CDC API: Lakeflow 宣言パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
パラメーター
flow_name
作成するフローの名前。
source
データのソース。 ソースは ストリーミング ソースである必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python と
SkipChangeCommits
オプションを使用してエラーを処理できます。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
KEYS
ソース データ内の行を一意に識別する列または列の組み合わせ。 これらの列の値は、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。
列の組み合わせを定義するには、列のコンマ区切りのリストを使用します。
この条項は必須です。
IGNORE NULL UPDATES
ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、
null
値を持つ列はターゲット内の既存の値を保持します。 これは、null
値を持つ入れ子になった列にも適用されます。この句は省略可能です。
既定では、既存の列を
null
値で上書きします。APPLY AS DELETE WHEN
CDC イベントをアップサートではなく
DELETE
として処理する必要がある場合に指定します。SCD タイプ 2 のソースでは、順序が正しく指定されていないデータを処理するために、削除された行は基になる Delta テーブルの廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、
pipelines.cdc.tombstoneGCThresholdInSeconds
table プロパティを使用して構成できます。この句は省略可能です。
APPLY AS TRUNCATE WHEN
CDC イベントを完全なテーブル
TRUNCATE
として扱う必要がある場合に指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースでのみ使用する必要があります。APPLY AS TRUNCATE WHEN
の句は SCD タイプ 1 のみサポートしています。 SCD タイプ 2 では、切り捨て操作はサポートされていません。この句は省略可能です。
SEQUENCE BY
ソース データ内の CDC イベントの論理順序を指定する列名。 Lakeflow 宣言パイプラインでは、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。
シーケンス処理に複数の列が必要な場合は、
STRUCT
式を使用します。最初に最初の構造体フィールドで並べ替えを行い、同点の場合は2番目のフィールドで並べ替えます。指定された列は、並べ替え可能なデータ型である必要があります。
この句は必須です。
COLUMNS
ターゲット テーブルに含める列のサブセットを指定します。 次のいずれかを実行できます。
- 含める列の完全な一覧を指定します:
COLUMNS (userId, name, city)
。 - 除外する列の一覧を指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句は省略可能です。
既定では、
COLUMNS
句が指定されていない場合、ターゲット テーブルのすべての列が含まれます。- 含める列の完全な一覧を指定します:
STORED AS
レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。
この句は省略可能です。
既定値は SCD タイプ 1 です。
TRACK HISTORY ON
指定された列に変更があった場合に履歴レコードを生成する出力列のサブセットが指定されます。 次のいずれかを実行できます。
- 追跡する列の完全な一覧を指定します:
COLUMNS (userId, name, city)
。 - 追跡から除外する列の一覧を指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句は省略可能です。 既定で、
TRACK HISTORY ON *
と同等の変更がある場合、すべての出力列の履歴が追跡されます。- 追跡する列の完全な一覧を指定します:
例示
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);