@dlt.append_flow
修饰器为 DLT 表创建追加流或回填。 该函数必须返回 Apache Spark 流式处理数据帧。 请参阅 使用 DLT 流以增量方式加载和处理数据。
追加流可以面向流式处理表或接收器。
语法
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
参数
参数 | 类型 | DESCRIPTION |
---|---|---|
函数 | function |
必填。 从用户定义的查询返回 Apache Spark 流式处理数据帧的函数。 |
target |
str |
必填。 作为追加流的目标的表或接收器的名称。 |
name |
str |
流名称。 如果未提供,则默认为函数名称。 |
comment |
str |
流的说明。 |
spark_conf |
dict |
用于执行此查询的 Spark 配置列表 |
例子
import dlt
# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))