该 dlt
模块使用修饰器实现其大部分核心功能。 这些修饰器接受定义流式处理或批处理查询并返回 Apache Spark 数据帧的函数。 以下语法显示了定义 DLT 数据集的简单示例:
import dlt
@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
本页概述了在 DLT 中定义数据集的函数和查询。 有关可用修饰器的完整列表,请参阅 DLT 开发人员参考。
用于定义数据集的函数不应包含与数据集无关的任意 Python 逻辑,包括对第三方 API 的调用。 DLT 在规划、验证和更新期间多次运行这些函数。 包括任意逻辑可能会导致意外结果。
读取数据以开始数据集定义
用于定义 DLT 数据集的函数通常以 spark.read
或 spark.readStream
作开头。 这些读取操作返回一个静态或流式处理的 DataFrame 对象,您可以在返回 DataFrame 之前定义其他转换。 返回 DataFrame 的其他 Spark 操作的示例包括 spark.table
或 spark.range
。
函数不应引用函数外部定义的数据帧。 尝试引用在不同范围定义的数据帧可能会导致出现意外行为。 有关用于创建多个表的元编程模式的示例,请参阅 循环中创建 for
表。
以下示例显示了使用批处理或流式处理逻辑读取数据的基本语法:
import dlt
# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
如果需要从外部 REST API 读取数据,请使用 Python 自定义数据源实现此连接。 请参阅 PySpark 自定义数据源。
注释
可以从 Python 数据集合(包括 pandas DataFrame、字典和列表)创建任意的 Apache Spark DataFrame。 这些模式在开发和测试期间可能很有用,但大多数生产 DLT 数据集定义应首先从文件、外部系统或现有表或视图加载数据开始。
链接转换
DLT 几乎支持所有 Apache Spark 数据帧转换。 可以在数据集定义函数中包含任意数量的转换,但应确保使用的方法始终返回 DataFrame 对象。
如果你有一个用于驱动多个下游任务的中间转换,但不需要将其具体化为表,则使用 @dlt.view()
向管道添加临时视图。 然后,可以在多个下游数据集定义中使用spark.read.table("temp_view_name")
来引用此视图。 以下语法演示了此模式:
import dlt
@dlt.view()
def a():
return spark.read.table("source").filter(...)
@dlt.table()
def b():
return spark.read.table("b").groupBy(...)
@dlt.table()
def c():
return spark.read.table("c").groupBy(...)
这可确保 DLT 在管道规划过程中充分了解视图中的转换,并防止与在数据集定义外部运行的任意 Python 代码相关的潜在问题。
在你的函数中,可以将 DataFrame 链接在一起,以创建新的 DataFrame,而不需要将增量结果写入视图、物化视图或流式处理表,如以下示例所示:
import dlt
@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
如果所有数据帧都使用批处理逻辑执行其初始读取,则返回结果为静态数据帧。 如果有任何流式查询,则返回结果为流式数据帧。
返回数据帧
@dlt.table()
对于修饰器,返回静态数据帧意味着要定义具体化视图。 返回流式处理数据帧意味着要定义流式处理表。 大多数修饰器都适用于流式处理和静态数据帧,而其他修饰器则要求流式处理数据帧。
用于定义数据集的函数必须返回 Spark 数据帧。 切勿在 DLT 数据集代码中使用执行保存或写入文件或表的方法。
不应在 DLT 代码中使用的 Apache Spark 操作示例:
collect()
count()
toPandas()
save()
saveAsTable()
start()
toTable()
注释
DLT 还支持将 Pandas on Spark 用于数据集定义函数。 请参阅 Spark 上的 Pandas API。
在 Python 管道中使用 SQL
PySpark 支持 spark.sql
作员使用 SQL 编写 DataFrame 代码。 在 DLT 源代码中使用此模式时,它将编译为具体化视图或流式处理表。
下面的代码示例等效于对数据集查询逻辑使用 spark.read.table("catalog_name.schema_name.table_name")
:
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read
和 dlt.read_stream
(旧版)
该 dlt
模块包括由 dlt.read()
和 dlt.read_stream()
引入的函数,以支持旧版管道发布模式中的功能。 支持这些方法,但 Databricks 建议始终使用 spark.read.table()
和 spark.readStream.table()
函数,原因如下:
- 函数
dlt
对读取当前管道外部定义的数据集的支持有限。 - 这些
spark
函数支持指定用于读取操作的选项,例如skipChangeCommits
。dlt
函数不支持指定选项。