重要
DLT create_sink
API 目前为公共预览版。
该create_sink()
函数将来自 DLT 管道的数据写入事件流服务(如 Apache Kafka 或 Azure 事件中心)或 Delta 表。 使用 create_sink()
函数创建接收器后,在追加流中使用接收器,将数据写入接收器。 追加流是 create_sink()
函数支持的唯一流类型。 不支持其他流类型,例如 apply_changes
。
Delta 接收器支持 Unity Catalog 外部表和托管表,以及 Hive 元存储托管表。 表名必须是完全限定的。 例如,Unity 目录表必须使用三层标识符:<catalog>.<schema>.<table>
。 Hive 元存储表必须使用 <schema>.<table>
。
注释
- 运行完全刷新更新不会清除汇聚器中的数据。 任何重新处理的数据都将被追加到接收器,现有数据不会被更改。
sink
API 不支持 DLT 预期。
语法
import dlt
dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)
参数
参数 | 类型 | DESCRIPTION |
---|---|---|
name |
str |
必填。 一个字符串,它标识接收器并用于引用和管理接收器。 接收器名称对于管道(包括在所有源代码中,如作为管道一部分的笔记本或模块)必须是唯一的。 |
format |
str |
必填。 定义输出格式的字符串,kafka 或 delta 。 |
options |
dict |
格式为 {"key": "value"} 的接收器选项的列表,其中键和值都是字符串。 支持 Kafka 和 Delta 接收器支持的所有 Databricks Runtime 选项。
|
例子
import dlt
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dlt.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dlt.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)