了解如何使用 Lakeflow 声明性管道为数据业务流程和自动加载程序创建和部署包含变更数据捕获的 ETL(提取、转换和加载)管道。 ETL 管道实现从源系统读取数据、根据要求转换数据(如数据质量检查和记录重复数据)以及将数据写入目标系统(如数据仓库或数据湖)的步骤。
在本教程中,你将使用 MySQL 数据库中的数据表customers
来执行以下操作:
- 使用 Debezium 或任何其他工具从事务数据库中提取更改,并将其保存在云对象存储(S3 文件夹、ADLS、GCS) 中。 你将跳过设置外部 CDC 系统以简化本教程。
- 使用自动加载程序以增量方式从云对象存储加载消息,并将原始消息
customers_cdc
存储在表中。 自动加载程序将推断架构并处理架构演变。 - 添加一个视图
customers_cdc_clean
以使用期望来检查数据质量。 例如,id
不应该是null
,因为您需要使用它来执行插入更新操作。 - 对清理的CDC数据执行更新插入,以将更改应用到最终
AUTO CDC ... INTO
表 - 显示 Lakeflow 声明式流水线如何创建类型 2 缓慢变化维度(SCD2),以记录所有更改。
目标是近乎实时地引入原始数据,并为分析师团队构建一个表,同时确保数据质量。
本教程使用 Medallion Lakehouse 体系结构,在该体系结构中,它通过铜层引入原始数据,使用银层清理和验证数据,并使用黄金层应用维度建模和聚合。 有关详细信息,请参阅什么是奖牌湖屋体系结构?
您将要实现的流程如下:
有关 Lakeflow 声明性管道、自动加载器和 CDC 的详细信息,请参阅 Lakeflow 声明性管道、 什么是自动加载程序?以及 什么是变更数据捕获(CDC)?
要求
若要完成此教程,必须满足以下要求:
- 登录到 Azure Databricks 工作区。
- 为工作区启用 Unity 目录 。
- 为帐户启用 无服务器计算 。 某些工作区区域不支持 Serverless Lakeflow Declarative Pipelines。 有关具有有限区域可用性的功能,请参阅可用的区域。
- 有权 创建计算资源 或 访问计算资源。
- 有权 在目录中创建新架构。 所需的权限为
ALL PRIVILEGES
或USE CATALOG
和CREATE SCHEMA
。 - 有权在现有架构中创建新卷。 所需的权限为
ALL PRIVILEGES
或USE SCHEMA
和CREATE VOLUME
。
在 ETL 管道中更改数据捕获
更改数据捕获(CDC)是捕获对事务数据库(例如 MySQL 或 PostgreSQL)或数据仓库的记录所做的更改的过程。 CDC 捕获数据的删除、添加和更新等操作,通常以流的形式在外部系统中重新构建表。 CDC 支持增量加载,同时无需批量加载更新。
注释
若要简化本教程,请跳过设置外部 CDC 系统。 您可以认为系统已上线运行,并将 CDC 数据保存为对象存储(如 S3、ADLS、GCS)中的 JSON 文件。
捕获 CDC
各种 CDC 工具可以使用。 开源领导者解决方案之一是 Debezium,但简化数据源的其他实现存在,例如 Fivetran、Qlik Replicate、Streamset、Talend、Oracle GoldenGate 和 AWS DMS。
在本教程中,你将使用来自外部系统(如 Debezium 或 DMS)的 CDC 数据。 Debezium 捕获每个已更改的数据行。 它通常向 Kafka 日志发送数据更改的历史记录,或将它们保存为文件。
必须从 customers
表(JSON 格式)引入 CDC 信息,检查它是否正确,然后在 Lakehouse 中具体化客户表。
Debezium 的 CDC 输入
对于每次更改,你将收到一条 JSON 消息,其中包含正在更新的行的所有字段(id
、、、firstname
lastname
email
、 address
。 此外,还将提供额外的元数据信息,包括:
-
operation
:一个操作码,通常(DELETE
、APPEND
、UPDATE
)。 -
operation_date
:每个操作记录的日期和时间戳。
Debezium 等工具可以生成更高级的输出,例如更改前的行值,但本教程会省略它们,以便于简单。
步骤 0:设置教程数据
首先,必须创建新的笔记本,并将本教程中使用的演示文件安装到工作区中。
单击左上角的“ 新建 ”。
单击 “笔记本”。
将笔记本的标题从 无标题笔记本 <日期和时间> 更改为 Pipelines 教程设置。
在笔记本顶部的标题旁边,将笔记本的默认语言设置为 Python。
若要生成本教程中使用的数据集,请在第一个单元格中输入以下代码,然后按下Shift + Enter来运行代码。
# You can change the catalog, schema, dbName, and db. If you do so, you must also # change the names in the rest of the tutorial. catalog = "main" schema = dbName = db = "dbdemos_dlt_cdc" volume_name = "raw_data" spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`') spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`') volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exists, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
若要预览本教程中使用的数据,请在下一个单元格中输入代码并键入 Shift + Enter 以运行代码:
display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
步骤 1:创建管道
首先,将在 Lakeflow 声明性管道中创建 ETL 管道。 Lakeflow 声明性管道通过使用 Lakeflow 声明性管道语法解析笔记本或文件中定义的依赖项(称为 源代码)来创建管道。 每个源代码文件只能包含一种语言,但可以在管道中添加多种语言的笔记本或文件。 若要了解详细信息,请参阅 Lakeflow 声明式管道
重要
将 源代码 字段留空,以便自动创建和配置用于源代码创作的笔记本。
本教程使用无服务器计算和 Unity 目录。 对于未指定的所有配置选项,请使用默认设置。 如果工作区中未启用或支持无服务器计算,则可以使用默认计算设置完成本教程。 如果使用默认计算设置,则必须在创建管道 UI 的目标部分,手动在存储选项下选择Unity Catalog。
若要在 Lakeflow 声明性管道中创建新的 ETL 管道,请执行以下步骤:
- 在边栏中,单击“ 管道”。
- 单击“创建管道”和“ETL 管道”。
- 在 管道名称中,键入唯一的管道名称。
- 选中“无服务器”复选框。
- 在管道模式下选择“触发”。 这将使用 AvailableNow 触发器运行数据流,该触发器处理所有现有数据,然后结束数据流。
- 在 目标中,若要配置在其中发布表的 Unity 目录位置,请选择现有 目录 并在 架构 中编写一个新名称,以在目录中创建新架构。
- 单击 “创建” 。
此时会显示新管道的管道 UI。
将为管道自动创建和配置空白源代码笔记本。 笔记本在用户目录下的新文件夹中创建。 新目录和文件的名称与管道的名称匹配。 例如,/Users/someone@example.com/my_pipeline/my_pipeline
。
- 访问此笔记本的链接位于“管道详细信息”面板中的“源代码”字段下。 单击链接以打开笔记本,然后继续执行下一步。
- 单击右上角的“连接”以打开计算配置菜单。
- 将鼠标悬停在步骤 1 中创建的管道的名称上。
- 单击连接。
- 在顶部笔记本的标题旁边,选择笔记本的默认语言(Python 或 SQL)。
重要
笔记本只能包含单个编程语言。 不要在管道源代码笔记本中混合 Python 和 SQL 代码。
开发 Lakeflow 声明性管道时,可以选择 Python 或 SQL。 本教程包含两种语言的示例。 根据语言选择,检查是否选择默认笔记本语言。
若要详细了解对 Lakeflow 声明性管道代码开发的笔记本支持,请参阅 在 Lakeflow 声明性管道中使用笔记本开发和调试 ETL 管道。
步骤 2:使用自动加载程序以增量方式引入数据
第一步是将原始数据从云存储引入铜层。
出于多种原因,这很有挑战性,因为必须:
- 大规模运行,可能引入数百万个小文件。
- 推断架构和 JSON 类型。
- 使用不正确的 JSON 架构处理错误的记录。
- 负责架构演变(例如,客户表中的新列)。
自动加载程序简化了此引入,包括架构推理和架构演变,同时扩展到数百万个传入文件。 自动加载程序可通过 cloudFiles
在 Python 中使用,通过 SELECT * FROM STREAM read_files(...)
在 SQL 中使用,并支持多种格式(JSON、CSV、Apache Avro 等):
将表设置为流式表将保证仅处理新的传入数据。 如果不将其定义为流数据表,将扫描并引入所有现有数据。 有关详细信息,请参阅 流式处理表 。
若要使用自动加载程序引入传入数据,请将以下代码复制并粘贴到笔记本中的第一个单元格中。 可以使用 Python 或 SQL,具体取决于在上一步中选择的笔记本的默认语言。
Python语言
from dlt import * from pyspark.sql.functions import * # Create the target bronze table dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers", format => "json", inferColumnTypes => "true" )
单击“ 开始” 以启动连接的管道的更新。
步骤 3:清理并设定数据质量标准以进行跟踪
定义铜层后,通过添加预期来控制数据质量,通过检查以下条件来创建银层:
- ID 不得为
null
. - CDC操作类型必须有效。
-
json
必须已被自动加载程序充分读取。
如果未满足其中一个条件,将删除该行。
有关详细信息,请参阅 通过管道预期管理数据质量。
单击 编辑 和 在下方插入单元格 以插入新的空单元格。
要通过使用已清理的表格创建银层并添加约束条件,请将以下代码复制并粘贴到笔记本的新单元格中。
Python语言
dlt.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( dlt.read_stream("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;
单击“ 开始” 以启动连接的管道的更新。
步骤 4:使用 AUTO CDC 流具体化客户表
该 customers
表将包含最 up-to日期视图,并且是原始表的副本。
手动实现这一点并不容易。 必须考虑重复数据删除等事项才能保留最新的行。
但是,Lakeflow 声明性管道使用 AUTO CDC
操作解决了这些难题。
单击 编辑 和 在下方插入单元格 以插入新的空单元格。
若要使用
AUTO CDC
Lakeflow 声明性管道处理 CDC 数据,请将以下代码复制并粘贴到笔记本中的新单元格中。Python语言
dlt.create_streaming_table(name="customers", comment="Clean, materialized customers") dlt.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )
SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;
单击“ 开始” 以启动连接的管道的更新。
步骤 5:缓慢变化维度类型 2(SCD2)
通常需要创建一张表来跟踪由APPEND
、UPDATE
和DELETE
引起的所有更改。
- 历史记录:您希望保留关于表格所有更改的记录。
- 可跟踪性:你希望查看发生了哪些操作。
使用 Lakeflow 声明性管道的 SCD2 技术
Delta 支持更改数据流(CDF),并且可以在 SQL 和 Python 中查询表修改< c0 /> 。 但是,CDF 的主要用例是捕获管道中的更改,而不是从头开始创建表更改的完整视图。
如果有无序事件,则处理起来会变得特别复杂。 如果必须按时间戳对更改进行排序并接收过去发生的修改,则必须在 SCD 表中追加新条目并更新以前的条目。
Lakeflow 声明性管道消除了这种复杂性,并允许你创建一个单独的表,其中包含从时间开始的所有修改。 然后,此表可以大规模使用,并具有特定的分区/zorder 列(如果需要)。 无序字段将根据_sequence_by自动处理。
若要创建 SCD2 表,必须使用选项: STORED AS SCD TYPE 2
在 SQL 或 stored_as_scd_type="2"
Python 中。
注释
您还可以使用选项限制功能所跟踪的列:TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
单击 编辑 和 在下方插入单元格 以插入新的空单元格。
将以下代码复制并粘贴到笔记本中的新单元格中。
Python语言
# create the table dlt.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dlt.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updates
SQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW cusotmers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;
单击“ 开始” 以启动连接的管道的更新。
步骤 6:创建具体化视图,用于跟踪已更改其信息最多的人员
该表 customers_history
包含用户对其信息所做的所有历史更改。 现在,你将在黄金层中创建一个简单的物化视图,用于跟踪记录谁更改个人信息最频繁的情况。 这可用于真实场景中的欺诈检测分析或用户建议。 此外,使用 SCD2 进行更改已经为我们删除了重复项,因此我们可以直接按照每个用户 ID 对行进行计数。
单击 编辑 和 在下方插入单元格 以插入新的空单元格。
将以下代码复制并粘贴到笔记本中的新单元格中。
Python语言
@dlt.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( dlt.read("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )
SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY id
单击“ 开始” 以启动连接的管道的更新。
步骤 7:创建用于运行 ETL 管道的作业
接下来,创建一个工作流,以使用 Databricks 作业自动执行数据引入、处理和分析步骤。
- 在工作区中,单击“
边栏中的工作流,然后单击“创建作业”。
- 在任务标题框中,将 “新建作业 <”日期和时间> 替换为作业名称。 例如,
CDC customers workflow
。 - 在“任务名称”中输入首个任务的名称,例如 。
- 在 “类型”中,选择 “管道”。
- 在 管道中,选择在步骤 1 中创建的管道。
- 单击 “创建” 。
- 若要运行工作流,请单击“ 立即运行”。 若要查看运行的详细信息,请单击“ 运行 ”选项卡。单击该任务可查看任务运行的详细信息。
- 若要查看工作流完成后的结果,请单击“ 转到最新成功运行 ”或作业运行的 开始时间 。 此时会出现“输出”页,其中显示了查询结果。
有关作业运行的详细信息,请参阅 Lakeflow 作业的监视和可观测性 。
步骤 8:计划作业
若要按计划运行 ETL 管道,请执行以下步骤:
- 单击“
边栏中的工作流。
- 在“名称”列中单击作业名称。 侧面板显示为 “作业详细信息”。
- 在“计划和触发器”面板中单击“添加触发器”,然后在触发器类型中选择“计划”。
- 指定时间段、开始时间和时区。
- 单击“ 保存”。