手記
この記事では、Databricks Runtime 13.1 以降の Databricks Connect について説明します。
Databricks Connect for Python では、 ユーザー定義関数 (UDF) がサポートされています。 UDF を含む DataFrame 操作が実行されると、UDF は Databricks Connect によってシリアル化され、要求の一部としてサーバーに送信されます。
Databricks Connect for Scala の UDF の詳細については、「 Databricks Connect for Scala のユーザー定義関数」を参照してください。
手記
ユーザー定義関数はシリアル化および逆シリアル化されるため、クライアントの Python バージョンは Azure Databricks コンピューティングの Python バージョンと一致する必要があります。 サポートされているバージョンについては、 バージョンのサポート マトリックスを参照してください。
UDF を定義する
Databricks Connect for Python で UDF を作成するには、次のサポートされている関数のいずれかを使用します。
- PySpark ユーザー定義関数
- PySpark ストリーミング関数
たとえば、次の Python は、列の値を 2 乗する単純な UDF を設定します。
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
依存関係を持つ UDF
Von Bedeutung
この機能は パブリック プレビュー 段階であり、Databricks Connect for Python 16.4 以降と、Databricks Runtime 16.4 以降を実行しているクラスターが必要です。 この機能を使用するには、ワークスペースで Unity Catalog のプレビュー機能「Enhanced Python UDFs」 を有効にします。
Databricks Connect では、UDF に必要な Python 依存関係の指定がサポートされています。 これらの依存関係は、UDF の Python 環境の一部として Databricks コンピューティングにインストールされます。
この機能を使用すると、ユーザーは、基本環境で提供されるパッケージに加えて、UDF に必要な依存関係を指定できます。 また、 基本環境で提供されているものとは異なるバージョンのパッケージをインストールするためにも使用できます。
依存関係は、次のソースからインストールできます。
- PyPI パッケージ
- PyPI パッケージは、 PEP 508 に従って指定できます (
dice
、pyjokes<1
、simplejson==3.19.*
など)。
- PyPI パッケージは、 PEP 508 に従って指定できます (
- Unity カタログ ボリュームに格納されているファイル
- 両方のホイール パッケージ (
.whl
) ファイルと gzipped tar ファイル (.tar.gz
) がサポートされています。 ユーザーには、re:[UC] ボリューム内のファイルREAD_FILE
アクセス許可が付与されている必要があります。 - Unity カタログ ボリュームからパッケージをインストールする場合、UDF を呼び出すには、ソース ボリューム
READ VOLUME
アクセス許可が必要です。 すべてのアカウント ユーザーにこのアクセス許可を付与すると、新しいユーザーに対して自動的に有効になります。 - Unity カタログ ボリューム ファイルは、
dbfs:<path>
やdbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl
など、dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz
として指定する必要があります。
- 両方のホイール パッケージ (
UDF にカスタム依存関係を含めるには、 withDependencies
を使用して環境で指定し、その環境を使用して Spark セッションを作成します。 依存関係は Databricks コンピューティングにインストールされ、この Spark セッションを使用するすべての UDF で使用できます。
次のコードでは、PyPI パッケージ dice
を依存関係として宣言しています。
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
または、ボリューム内のホイールの依存関係を指定するには、次のようにします。
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks ノートブックおよびジョブでの動作
ノートブックとジョブでは、UDF の依存関係を REPL に直接インストールする必要があります。 Databricks Connect は、指定されたすべての依存関係が既にインストールされていることを確認して REPL Python 環境を検証し、インストールされていない場合は例外をスローします。
PyPI と Unity カタログの両方のボリューム依存関係について、ノートブック環境の検証が行われます。 ボリュームの依存関係は、ホイール ファイルの 場合は PEP-427 以降の標準 Python パッケージ仕様に従い、ソース配布ファイルの場合は PEP-241 以降に従ってパッケージ化する必要があります。 Python パッケージ標準の詳細については、 PyPA のドキュメントを参照してください。
制限事項
- ローカル開発マシン上の Python ホイールやソース配布などのファイルを依存関係として直接指定することはできません。 最初に、Unity カタログ ボリュームにアップロードする必要があります。
- UDF の依存関係は、ウィンドウ関数に対する
pyspark.sql.streaming.DataStreamWriter.foreach
、pyspark.sql.streaming.DataStreamWriter.foreachBatch
、pandas 集計 UDF ではサポートされていません。
例示
次の例では、環境で PyPI とボリュームの依存関係を定義し、その環境とのセッションを作成してから、それらの依存関係を使用する UDF を定義して呼び出します。
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Python 基本環境
UDF は、クライアントではなく Databricks コンピューティングで実行されます。 UDF が実行される基本 Python 環境は、Databricks コンピューティングによって異なります。
クラスターの場合、基本 Python 環境は、クラスターで実行されている Databricks Runtime バージョンの Python 環境です。 この基本環境の Python バージョンとパッケージの一覧は、Databricks Runtime リリース ノートのシステム環境とインストールされている Python ライブラリセクションにあります。
サーバーレス コンピューティングの場合、基本 Python 環境は、次の表に従って サーバーレス環境のバージョン に対応します。
Databricks Connect バージョン | UDF サーバーレス環境 |
---|---|
16.4.1 以降、Python 3.12 | バージョン 3 |
15.4.10 から 16.0 未満、Python 3.12 | バージョン 3 |
15.4.10 から 16.0 未満、Python 3.11 | バージョン 2 |
15.4.0 から 15.4.9、16.0 から 16.3 | 最新のサーバーレス コンピューティング。 安定した Python 環境を実現するには、15.4.10 LTS 以降または 16.4.1 LTS 以降に移行してください。 |