更改数据捕获(CDC)是一种数据集成模式,用于捕获对源系统中数据所做的更改,例如插入、更新和删除。 这些变更(表示为列表)通常称为“CDC 源”。 如果对 CDC 源进行操作,而不是读取整个源数据集,则可以更快地处理数据。 事务型数据库(例如 SQL Server、MySQL 和 Oracle)生成 CDC 数据源。 Delta 表生成自己的 CDC 源,称为变更数据源 (CDF)。
下图显示,当源表中包含员工数据的行被更新时,它会在 CDC 数据流中生成一组新行,仅包含这些更改。 CDC 源的每一行通常包含附加的元数据,包括操作(例如 UPDATE
)以及一个可以用来确定性地对 CDC 源中每一行进行排序的列,以便处理无序更新。 例如,下图中 sequenceNum
列用于确定 CDC 数据流中的行顺序。
处理数据变更流:仅保留最新数据还是保留历史数据版本
处理变更的数据源称为慢变维度 (SCD)。 在处理CDC源数据时,您需要做出一个选择:
- 是否仅保留最新数据(即覆盖现有数据) ? 这称为 SCD 类型 1。
- 或者,是否保留数据更改的历史记录? 这称为 SCD 类型 2。
SCD 类型 1 处理涉及在发生更改时用新数据覆盖旧数据。 这意味着不会保留任何更改历史记录。 只有最新版本的数据可用。 这是一种简单的方法,通常在更改历史记录不重要(例如更正错误或更新客户电子邮件地址等非关键字段)时使用。
SCD 类型 2 处理通过创建其他记录来捕获不同版本的数据,从而维护数据更改的历史记录。 每个版本的数据都带有时间戳或标记有元数据,允许用户在发生更改时进行跟踪。 当跟踪数据演变(如跟踪客户地址随时间变化进行分析)非常重要时,这非常有用。
使用 Lakeflow 声明性管道的 SCD 类型 1 和类型 2 处理示例
本部分中的示例演示如何使用 SCD 类型 1 和类型 2。
步骤 1:准备示例数据
在此实例中,你将生成一个示例 CDC 数据流。 首先,创建笔记本并将以下代码粘贴到其中。 将代码块开头的变量更新为有权创建表和视图的目录和架构。
此代码创建一个新的 Delta 表,其中包含多个更改记录。 架构如下所示:
-
id
- 此员工的唯一标识符的整数 -
name
- 字符串,员工名称 -
age
- 整数,员工年龄 -
operation
- 更改类型(例如,INSERT
或UPDATE
DELETE
) -
sequenceNum
- 整数,标识源数据中 CDC 事件的逻辑顺序。 Lakeflow 声明性管道使用这种排序来处理按无序顺序到达的变更事件。
# update these to the catalog and schema where you have permissions
# to create tables and views.
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
def write_employees_cdf_to_delta():
data = [
(1, "Alex", "chef", "FR", "INSERT", 1),
(2, "Jessica", "owner", "US", "INSERT", 2),
(3, "Mikhail", "security", "UK", "INSERT", 3),
(4, "Gary", "cleaner", "UK", "INSERT", 4),
(5, "Chris", "owner", "NL", "INSERT", 6),
# out of order update, this should be dropped from SCD Type 1
(5, "Chris", "manager", "NL", "UPDATE", 5)
(6, "Pat", "mechanic", "NL", "DELETE", 8),
(6, "Pat", "mechanic", "NL", "INSERT", 7)
]
columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")
write_employees_cdf_to_delta()
可以使用以下 SQL 命令预览此数据:
SELECT *
FROM mycatalog.myschema.employees_cdf
步骤 2:使用 SCD 类型 1 仅保留最新数据
建议在 Lakeflow 声明性管道中使用 AUTO CDC
API 来处理 SCD 类型 1 表中的更改数据馈送。
- 创建新的笔记本。
- 将以下代码粘贴到其中。
- 创建并连接到管道。
该 employees_cdf
函数将读取我们前面刚刚创建的表作为流,因为 create_auto_cdc_flow
API 将用于更改数据捕获处理,因此需要将更改流作为输入。 使用 decorator @dlt.view
包装它,因为不想将此流具体化为表。
然后,使用 dlt.create_target_table
创建一个包含处理此更改数据馈送结果的流式处理表。
最后,使用 dlt.create_auto_cdc_flow
处理变更数据馈源。 让我们看看每个论点:
-
target
- 之前定义的目标流式处理表。 -
source
- 查看您之前定义的更改记录流。 -
keys
- 标识更改源中的唯一行。 因为您正在使用id
作为唯一标识符,只需提供id
作为唯一标识列即可。 -
sequence_by
- 指定源数据中 CDC 事件的逻辑顺序的列名称。 你需要此顺序来处理不按顺序到达的变更事件。 提供sequenceNum
作为排序列。 -
apply_as_deletes
- 由于示例数据包含删除操作,因此请使用apply_as_deletes
指示应将 CDC 事件视为DELETE
而不是更新插入的情况。 -
except_column_list
- 包含不希望包含在目标表中的列的列表。 在此示例中,你将使用此参数排除sequenceNum
和operation
。 -
stored_as_scd_type
- 指示要使用的 SCD 类型。
import dlt
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType
catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"
@dlt.view
def employees_cdf():
return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")
dlt.create_target_table(f"{catalog}.{schema}.{employees_table_current}")
dlt.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_current}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
单击“ 开始”运行此管道。
然后,在 SQL 编辑器中运行以下查询,验证是否已正确处理更改记录:
SELECT *
FROM mycatalog.myschema.employees_current
注释
员工 Chris 的无序更新已正确删除,因为他们的角色仍设置为“所有者”而不是“经理”。
步骤 3:使用 SCD 类型 2 来保留历史数据
在此示例中,将创建第二个目标表,该 employees_historical
表包含对员工记录的更改的完整历史记录。
将此代码添加到管道。 此处的唯一区别是 stored_as_scd_type
设置为 2 而不是 1。
dlt.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")
dlt.create_auto_cdc_flow(
target=f"{catalog}.{schema}.{employees_table_historical}",
source=employees_cdf_table,
keys=["id"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 2
)
单击“ 开始”运行此管道。
然后,在 SQL 编辑器中运行以下查询,验证是否已正确处理更改记录:
SELECT *
FROM mycatalog.myschema.employees_historical
你将看到对员工的所有更改,包括已删除的员工,如 Pat。
步骤 4:清理资源
完成后,请按照以下步骤清理资源:
删除流水线:
注释
删除管道时,它会自动删除
employees
和employees_historical
表。- 单击 “作业和管道”,然后查找要删除的管道的名称。
- 单击
在管道名称所在的同一行中,然后单击“ 删除”。
删除笔记本。
删除包含变更数据流的表:
- 单击“ 新建 > 查询”。
- 粘贴并运行以下 SQL 代码,根据需要调整目录和架构:
DROP TABLE mycatalog.myschema.employees_cdf
使用 MERGE INTO
和 foreachBatch
用于更改数据捕获的缺点
Databricks 提供了一个 MERGE INTO
SQL 命令,可用于 foreachBatch
API 将行向上插入 Delta 表。 本部分探讨如何将此方法用于简单的用例,但此方法在应用于实际方案时变得越来越复杂和脆弱。
在此示例中,你将使用与前面示例中相同的样本数据更改馈送。
使用 MERGE INTO
和 foreachBatch
的 Naive 实现
创建笔记本并将以下代码复制到其中。 根据需要更改catalog
、schema
和employees_table
变量。 应将变量 catalog
和 schema
设置为 Unity Catalog 中可以创建表的位置。
运行笔记本时,它会执行以下操作:
- 在
create_table
中创建目标表。 与自动处理此步骤不同的create_auto_cdc_flow
是,必须指定架构。 - 以流的形式读取变更数据源。 使用
upsertToDelta
方法处理每个微批次,该方法运行MERGE INTO
命令。
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"
def upsertToDelta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF.sparkSession.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING)
""")
create_table()
cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")
cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()
若要查看结果,请运行以下 SQL 查询:
SELECT *
FROM mycatalog.myschema.employees_merge
遗憾的是,结果不正确,如下所示:
同一个微批中对同一密钥的多个更新
第一个问题是,代码不会处理同一微包中同一键的多个更新。 例如,您使用 INSERT
插入员工 Chris,然后将他们的角色从所有者更新为经理。 这结果应该是一行,但实际上是两行。
当微包中有多个对同一密钥的更新时,哪个更改会获胜?
逻辑变得更加复杂。 下面的代码示例通过 sequenceNum
检索最新行,并将该数据仅合并到目标表中,如图所示:
- 按主键分组。
id
- 获取具有该键批处理中最大
sequenceNum
的行的所有列。 - 将行重新展开。
按照如下所示更新upsertToDelta
方法,然后运行代码。
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
查询目标表时,可以看到名为 Chris 的员工具有正确的角色,但仍存在其他问题要解决,因为你仍然在目标表中显示已删除的记录。
跨微批的无序更新
本部分探讨跨微批次出现无序更新的问题。 下图说明了这个问题:如果 Chris 的行在第一个微批处理中有一个UPDATE操作,然后在后续的微批处理中有一个INSERT操作,该怎么办? 代码无法正确处理。
当多个微批处理中对同一键进行无序更新时,哪些更改会获胜?
若要解决此问题,请展开代码以在每个行中存储版本,如下所示:
- 存储上次更新行时的
sequenceNum
。 - 对于每个新行,请检查时间戳是否大于存储的时间戳,然后应用以下逻辑:
- 如果大于,请使用目标中的新数据。
- 否则,请将数据保留在源中。
首先,更新 createTable
方法以存储 sequenceNum
,因为你将使用它为每行设定版本:
def create_table():
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
(id INT, name STRING, age INT, country STRING, sequenceNum INT)
""")
接下来,更新 upsertToDelta
以处理行版本。
UPDATE SET
的 MERGE INTO
子句需要单独处理每一列。
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
处理删除
遗憾的是,代码仍存在问题。 它不处理 DELETE
操作,因为员工 Pat 仍在目标表中的事实证明了这一点。
假设删除到达同一个微包。 若要处理它们,请在更改数据记录指示删除时再次更新 upsertToDelta
该方法以删除该行,如下所示:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
处理删除后无序到达的更新
遗憾的是,上述代码仍然不太正确,因为它不会处理在 DELETE
microbatche 中后跟无序 UPDATE
的情况。
处理这一案例的算法需要记录删除操作,以便能处理后续的无序更新。 为此,请按以下步骤操作:
- 不要立即删除行,而是通过使用时间戳或
sequenceNum
进行软删除。 软删除的行会被逻辑删除。 - 将所有用户重定向到筛掉逻辑删除的视图。
- 生成一个清理作业,该作业会随时间推移删除墓碑。
使用以下代码:
def upsertToDelta(microBatchDF, batchId):
microBatchDF = microBatchDF.groupBy("id").agg(
max_by(struct("*"), "sequenceNum").alias("row")
).select("row.*").createOrReplaceTempView("updates")
spark.sql(f"""
MERGE INTO {catalog}.{schema}.{employees_table} t
USING updates s
ON s.id = t.id
WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
WHEN MATCHED THEN UPDATE SET
name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
WHEN NOT MATCHED THEN INSERT *
""")
用户不能直接使用目标表,因此请创建一个视图,以便他们可以查询:
CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL
最后,创建定期移除逻辑删除的行的清理作业:
DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY