@table
修饰器可用于定义具体化视图和流式表。 在 Python 中,DLT 根据定义查询确定是将数据集更新为具体化视图还是流式处理表。
若要在 Python 中定义具体化视图,请将 @table
应用于对数据源执行静态读取的查询。 要定义流式表,请将@table
应用于对数据源执行流式读取的查询,或使用create_streaming_table() 函数。
语法
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-___location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect(...)
def <function-name>():
return (<query>)
参数
@dlt.expect()
是可选的 DLT 期望子句。 可以包含多个预期。 请参阅 期望值。
参数 | 类型 | DESCRIPTION |
---|---|---|
函数 | function |
必填。 从用户定义的查询返回 Apache Spark 数据帧或流数据帧的函数。 |
name |
str |
表名称。 如果未提供,则默认为函数名称。 |
comment |
str |
表的说明。 |
spark_conf |
dict |
用于执行此查询的 Spark 配置列表 |
table_properties |
dict |
表的dict 的 。 |
path |
str |
表数据的存储位置。 如果未设置,请使用包含表的架构的托管存储位置。 |
partition_cols |
list |
用于对表进行分区的一个或多个列的列表。 |
cluster_by |
list |
对表启用动态聚类,并定义要用作聚类键的列。 请参阅使用液体聚类分析 Delta 表。 |
schema |
str 或 StructType |
表的架构定义。 架构可以定义为 SQL DDL 字符串,或使用 Python StructType 定义。 |
temporary |
bool |
创建表,但不将表发布到元存储。 该表可用于管道,但不能在管道外部访问。 临时表在管道的生存期内保留。 默认值为“False”。 |
row_filter |
str |
(公共预览版)表的行筛选器子句。 请参阅发布具有行筛选器和列掩码的表。 |
指定架构是可选的,可以使用 PySpark StructType
或 SQL DDL 来完成。 指定架构时,可以选择包括生成的列、列掩码,以及主键和外键。 请参阅:
例子
import dlt
# Specify a schema
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
# Specify a schema with SQL DDL, use a generated column, and set clustering columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
# Specify partition columns
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
# Specify table constraints
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
""")
def sales():
return ("...")
# Specify a row filter and column mask
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)")
def sales():
return ("...")