重要
Lakeflow 宣言型パイプライン create_sink
API は パブリック プレビュー段階です。
create_sink()
関数は、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービス、または宣言型パイプラインから Delta テーブルに書き込みます。 create_sink()
関数を使用してシンクを作成した後、シンクにデータを書き込むには、追加フロー でシンクを使用します。 追加フローは、create_sink()
関数でサポートされている唯一のフローの種類です。 create_auto_cdc_flow
など、その他のフローの種類はサポートされていません。
Delta シンクでは、Unity カタログの外部テーブルとマネージド テーブル、および Hive メタストア マネージド テーブルがサポートされます。 テーブル名は完全修飾にする必要があります。 たとえば、Unity カタログ テーブルでは、<catalog>.<schema>.<table>
の 3 層識別子を使用する必要があります。 Hive メタストア テーブルでは、<schema>.<table>
を使用する必要があります。
注
- の完全更新更新 を実行しても、シンクからデータが消去されることはありません。 再処理されたデータはシンクに追加され、既存のデータは変更されません。
sink
API では、Lakeflow 宣言パイプラインの期待はサポートされていません。
構文
import dlt
dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)
パラメーター
パラメーター | タイプ | 説明 |
---|---|---|
name |
str |
必須。 シンクを識別し、シンクを参照および管理するために使用される文字列。 シンク名は、パイプラインの一部であるノートブックやモジュールなどのすべてのソース コードを含め、パイプラインに固有である必要があります。 |
format |
str |
必須。 kafka または delta のいずれかの出力形式を定義する文字列。 |
options |
dict |
キーと値の両方が文字列である、 {"key": "value"} 形式のシンク オプションの一覧。 Kafka シンクと Delta シンクでサポートされているすべての Databricks ランタイム オプションがサポートされています。
|
例示
import dlt
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dlt.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dlt.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)