CREATE FLOW
ステートメントを使用して、Lakeflow 宣言パイプライン テーブルのフローまたはバックフィルを作成します。
構文
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
パラメーター
flow_name
作成するフローの名前。
コメント
フローの説明 (省略可能)。
-
フローを定義する
AUTO CDC ... INTO
ステートメントであり、create_auto_cdc_flow_spec
。AUTO CDC ... INTO
ステートメントまたはINSERT INTO
ステートメントを含める必要があります。 ソース クエリで変更データ セマンティクスを使用する場合は、AUTO CDC ... INTO
を使用します。詳細については、「 AUTO CDC INTO (Lakeflow 宣言型パイプライン)」を参照してください。
ターゲット_テーブル
更新が必要なテーブル。 これはストリーミング テーブルである必要があります。
INSERT を に変換する
ターゲット テーブルに挿入されるテーブル クエリを定義します。
ONCE
オプションを指定しない場合、クエリはストリーミング クエリである必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python とSkipChangeCommits
オプションを使用してエラーを処理できます。INSERT INTO
は、AUTO CDC ... INTO
と相互に排他的です。 ソース データに変更データ キャプチャ (CDC) 機能が含まれている場合は、AUTO CDC ... INTO
を使用します。 ソースが使用しない場合は、INSERT INTO
を使用します。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
ワンス
必要に応じて、バックフィルなどの 1 回限りのフローとしてフローを定義します。
ONCE
を使用すると、次の 2 つの方法でフローが変更されます。- ソース
query
またはcreate_auto_cdc_flow_spec
はストリーミング テーブルではありません。 - フローは既定で 1 回実行されます。 パイプラインが完全な更新で更新された場合、
ONCE
フローが再度実行され、データが再作成されます。
- ソース
例示
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;