create_sink

重要

DLT create_sink API 目前为公共预览版

create_sink() 函数将来自 DLT 管道的数据写入事件流服务(如 Apache Kafka 或 Azure 事件中心)或 Delta 表。 使用 create_sink() 函数创建接收器后,在追加流中使用接收器,将数据写入接收器。 追加流是 create_sink() 函数支持的唯一流类型。 不支持其他流类型,例如 apply_changes

Delta 接收器支持 Unity Catalog 外部表和托管表,以及 Hive 元存储托管表。 表名必须是完全限定的。 例如,Unity 目录表必须使用三层标识符:<catalog>.<schema>.<table>。 Hive 元存储表必须使用 <schema>.<table>

注释

  • 运行完全刷新更新不会清除汇聚器中的数据。 任何重新处理的数据都将被追加到接收器,现有数据不会被更改。
  • sink API 不支持 DLT 预期。

语法

import dlt

dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)

参数

参数 类型 DESCRIPTION
name str 必填。 一个字符串,它标识接收器并用于引用和管理接收器。 接收器名称对于管道(包括在所有源代码中,如作为管道一部分的笔记本或模块)必须是唯一的。
format str 必填。 定义输出格式的字符串,kafkadelta
options dict 格式为 {"key": "value"} 的接收器选项的列表,其中键和值都是字符串。 支持 Kafka 和 Delta 接收器支持的所有 Databricks Runtime 选项。

例子

import dlt

# Create a Kafka sink
dlt.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dlt.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dlt.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)