dlt
モジュールは、デコレーターを使用してコア機能の多くを実装します。 これらのデコレーターは、ストリーミング クエリまたはバッチ クエリを定義し、Apache Spark DataFrame を返す関数を受け入れます。 次の構文は、Lakeflow 宣言パイプライン データセットを定義するための簡単な例を示しています。
import dlt
@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
このページでは、Lakeflow 宣言パイプラインでデータセットを定義する関数とクエリの概要について説明します。 使用可能なデコレーターの完全な一覧については、「 Lakeflow 宣言型パイプライン」開発者向けリファレンスを参照してください。
データセットの定義に使用する関数には、サードパーティの API の呼び出しなど、データセットに関係のない任意の Python ロジックを含めないようにしてください。 Lakeflow 宣言型パイプラインは、計画、検証、および更新中にこれらの関数を複数回実行します。 任意のロジックを含めると、予期しない結果になる可能性があります。
データを読み取ってデータセット定義を開始する
Lakeflow 宣言パイプライン データセットの定義に使用される関数は、通常、 spark.read
または spark.readStream
操作で始まります。 これらの読み取り操作は、DataFrame を返す前に追加の変換を定義するために使用する静的またはストリーミング DataFrame オブジェクトを返します。 DataFrame を返す Spark 操作のその他の例としては、 spark.table
や spark.range
などがあります。
関数は、関数の外部で定義されている DataFrame を参照しないでください。 別のスコープで定義されている DataFrame を参照しようとすると、予期しない動作が発生する可能性があります。 複数のテーブルを作成するためのメタプログラミング パターンの例については、「for
ループでのテーブルの作成」を参照してください。
次の例は、バッチまたはストリーミング ロジックを使用してデータを読み取るための基本的な構文を示しています。
import dlt
# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
外部 REST API からデータを読み取る必要がある場合は、Python カスタム データ ソースを使用してこの接続を実装します。 PySpark カスタム データ ソース を参照してください。
注
Pandas DataFrames、dict、リストなど、Python のデータ コレクションから任意の Apache Spark DataFrame を作成できます。 これらのパターンは、開発とテスト中に役立つ場合がありますが、ほとんどの運用 Lakeflow 宣言パイプライン データセット定義は、ファイル、外部システム、または既存のテーブルまたはビューからデータを読み込むことで開始する必要があります。
変換の連結
Lakeflow 宣言型パイプラインでは、ほぼすべての Apache Spark DataFrame 変換がサポートされています。 データセット定義関数には任意の数の変換を含めることができますが、使用するメソッドが常に DataFrame オブジェクトを返すようにする必要があります。
複数のダウンストリーム ワークロードを駆動する中間変換があるが、テーブルとして具体化する必要がない場合は、 @dlt.view()
を使用してパイプラインに一時ビューを追加します。 その後、複数のダウンストリーム データセット定義で spark.read.table("temp_view_name")
を使用して、このビューを参照できます。 次の構文は、このパターンを示しています。
import dlt
@dlt.view()
def a():
return spark.read.table("source").filter(...)
@dlt.table()
def b():
return spark.read.table("b").groupBy(...)
@dlt.table()
def c():
return spark.read.table("c").groupBy(...)
これにより、Lakeflow 宣言型パイプラインは、パイプラインの計画中にビュー内の変換を完全に認識し、データセット定義の外部で実行されている任意の Python コードに関連する潜在的な問題を防ぐことができます。
関数内では、次の例のように、DataFrame を連結して、増分結果をビュー、具体化されたビュー、ストリーミング テーブルとして書き込まずに新しい DataFrame を作成できます。
import dlt
@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
すべての DataFrame がバッチ ロジックを使用して最初の読み取りを実行する場合、返される結果は静的な DataFrame になります。 ストリーミングしているクエリがある場合、返される結果はストリーミング DataFrame になります。
DataFrame を返す
@dlt.table()
デコレーターの場合、静的な DataFrame を返すということは、具体化されたビューを定義していることを意味します。 ストリーミング DataFrame を返すということは、ストリーミング テーブルを定義していることを意味します。 ほとんどのデコレータは、ストリーミングデータフレームと静的データフレームの両方で動作しますが、他のデコレーターはストリーミングデータフレームを必要とします。
データセットの定義に使用する関数は、Spark DataFrame を返す必要があります。 Lakeflow 宣言パイプライン のデータセット コードの一部として、ファイルまたはテーブルを保存または書き込むメソッドは使用しないでください。
Lakeflow 宣言パイプライン コードで使用してはならない Apache Spark 操作の例:
collect()
count()
toPandas()
save()
saveAsTable()
start()
toTable()
注
Lakeflow 宣言パイプラインでは、データセット定義関数に対する Spark での Pandas の使用もサポートされています。 「Spark 上の Pandas API」を参照してください。
Python パイプラインで SQL を使用する
PySpark では、SQL を使用して DataFrame コードを記述する spark.sql
演算子がサポートされています。 Lakeflow 宣言型パイプラインのソース コードでこのパターンを使用すると、具体化されたビューまたはストリーミング テーブルにコンパイルされます。
次のコード例は、データセット クエリ ロジックに spark.read.table("catalog_name.schema_name.table_name")
を使用することと同じです。
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read
および dlt.read_stream
(レガシ)
dlt
モジュールには、レガシ パイプライン発行モードの機能をサポートするために導入されたdlt.read()
関数とdlt.read_stream()
関数が含まれています。 これらのメソッドはサポートされていますが、Databricks では、次の理由から常に spark.read.table()
関数と spark.readStream.table()
関数を使用することをお勧めします。
dlt
関数では、現在のパイプラインの外部で定義されたデータセットの読み取りが制限されています。spark
関数は、skipChangeCommits
などの読み取り操作のオプションの指定をサポートしています。 オプションの指定は、dlt
関数ではサポートされていません。