AUTO CDC INTO (Lakeflow 声明性管道)

使用 AUTO CDC ... INTO 语句创建一个流,该流利用 Lakeflow 声明性管道的更改数据捕获(CDC)功能。 此语句读取来自 CDC 的源的更改,并将其应用于流式处理目标。

语法

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

使用与其他 Lakeflow 声明性管道查询相同的 CONSTRAINT 子句定义目标的数据质量约束。 请参阅通过管道预期管理数据质量

INSERTUPDATE 事件的默认行为是从源中更新或插入 CDC 事件:更新目标表中与指定键匹配的行,或在目标表中不存在匹配记录时插入新行。 可以使用 DELETE 条件指定对 APPLY AS DELETE WHEN 事件的处理。

重要

您必须声明一个目标流式处理表,以将更改应用于该表。 可以选择为目标表指定架构。 对于 SCD 类型 2 表,指定目标表的架构时,还必须包含数据类型与 __START_AT 字段相同的 __END_ATsequence_by 列。

请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获

参数

  • flow_name

    要创建的流的名称。

  • source

    数据的源。 源必须是 流媒体 源。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

    有关流数据的详细信息,请参阅 使用管道转换数据

  • KEYS

    用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。

    若要定义列的组合,请使用以逗号分隔的列列表。

    此条款是必需的。

  • IGNORE NULL UPDATES

    允许引入包含目标列子集的更新。 当 CDC 事件与现有行匹配并指定 IGNORE NULL UPDATES 时,具有 null 值的列将保留其目标中的现有值。 这也适用于具有 null 值的嵌套列。

    此子句是可选的。

    默认设置是用 null 值覆盖现有列。

  • APPLY AS DELETE WHEN

    指定何时应将 CDC 事件视为 DELETE 而不是更新插入。

    对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用pipelines.cdc.tombstoneGCThresholdInSeconds配置保留间隔。

    此子句是可选的。

  • APPLY AS TRUNCATE WHEN

    指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

    仅支持为 SCD 类型 1 使用 APPLY AS TRUNCATE WHEN 子句。 SCD 类型 2 不支持截断操作。

    此子句是可选的。

  • SEQUENCE BY

    用于指定源数据中 CDC 事件的逻辑顺序的列名。 Lakeflow 声明性管道使用这种排序来处理按无序顺序到达的变更事件。

    如果需要多个列进行排序处理,请使用表达式 STRUCT:它将先按第一个结构字段进行排序,然后如果有相同的值,再按第二个字段排序,依此类推。

    指定的列必须是可排序的数据类型。

    该语句是必需的。

  • COLUMNS

    指定要包含在目标表中的列子集。 您可以选择:

    • 指定要包含的完整列列表:COLUMNS (userId, name, city)
    • 指定要排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。

    当未指定 COLUMNS 子句时,默认设置是包含目标表中的所有列。

  • STORED AS

    将记录存储为 SCD 类型 1 还是 SCD 类型 2。

    此子句是可选的。

    默认值为 SCD 类型 1。

  • TRACK HISTORY ON

    指定输出列的子集,以便在这些指定列发生任何更改时生成历史记录。 您可以选择:

    • 指定要跟踪的列的完整列表: COLUMNS (userId, name, city)
    • 指定要从跟踪中排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。 默认设置为当发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON *

例子

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);