次の方法で共有


CREATE FLOW (Lakeflow 宣言パイプライン)

CREATE FLOW ステートメントを使用して、Lakeflow 宣言パイプライン テーブルのフローまたはバックフィルを作成します。

構文

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

パラメーター

  • flow_name

    作成するフローの名前。

  • コメント

    フローの説明 (省略可能)。

  • 自動 CDC INTO

    フローを定義するAUTO CDC ... INTOステートメントであり、create_auto_cdc_flow_specAUTO CDC ... INTO ステートメントまたは INSERT INTO ステートメントを含める必要があります。 ソース クエリで変更データ セマンティクスを使用する場合は、 AUTO CDC ... INTO を使用します。

    詳細については、「 AUTO CDC INTO (Lakeflow 宣言型パイプライン)」を参照してください。

  • ターゲット_テーブル

    更新が必要なテーブル。 これはストリーミング テーブルである必要があります。

  • INSERT を に変換する

    ターゲット テーブルに挿入されるテーブル クエリを定義します。 ONCE オプションを指定しない場合、クエリはストリーミング クエリである必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python と SkipChangeCommits オプションを使用してエラーを処理できます。

    INSERT INTO は、 AUTO CDC ... INTOと相互に排他的です。 ソース データに変更データ キャプチャ (CDC) 機能が含まれている場合は、 AUTO CDC ... INTO を使用します。 ソースが使用しない場合は、 INSERT INTO を使用します。

    ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。

  • ワンス

    必要に応じて、バックフィルなどの 1 回限りのフローとしてフローを定義します。 ONCEを使用すると、次の 2 つの方法でフローが変更されます。

    • ソース query または create_auto_cdc_flow_spec はストリーミング テーブルではありません。
    • フローは既定で 1 回実行されます。 パイプラインが完全な更新で更新された場合、 ONCE フローが再度実行され、データが再作成されます。

例示

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;