次の方法で共有


Unity カタログの Batch Python ユーザー定義関数 (UDF)

重要

この機能はパブリック プレビュー段階にあります。

Batch Unity Catalog Python UDF は、データのバッチを操作する Python コードを記述できるようにすることで、Unity カタログ UDF の機能を拡張し、行ごとの UDF に関連するオーバーヘッドを減らすことで効率を大幅に向上させます。 これらの最適化により、Unity カタログ バッチ Python UDF は大規模なデータ処理に最適です。

要求事項

Batch Unity カタログ Python UDF には、Databricks Runtime バージョン 16.3 以降が必要です。

Batch Unity カタログ Python UDF を作成する

Batch Unity カタログ Python UDF の作成は、通常の Unity カタログ UDF の作成に似ていますが、次のものが追加されています。

  • PARAMETER STYLE PANDAS:これは、UDF が pandas 反復子を使用してバッチでデータを処理することを指定します。
  • HANDLER 'handler_function': バッチを処理するために呼び出されるハンドラー関数を指定します。

次の例では、Batch Unity カタログ Python UDF を作成する方法を示します。

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

UDF を登録したら、SQL または Python を使用して呼び出すことができます。

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Batch UDF ハンドラー関数

Batch Unity カタログ Python UDF には、バッチを処理して結果を生成するハンドラー関数が必要です。 HANDLER キーを使用して UDF を作成するときは、ハンドラー関数の名前を指定する必要があります。

ハンドラー関数は、次の処理を行います。

  1. 1 つ以上の pandas.Seriesを反復処理する反復子引数を受け取ります。 各 pandas.Series には、UDF の入力パラメーターが含まれています。
  2. ジェネレーターを反復処理し、データを処理します。
  3. ジェネレーター反復子を返します。

Batch Unity カタログ Python UDF は、入力と同じ数の行を返す必要があります。 ハンドラー関数は、各バッチの入力系列と同じ長さの pandas.Series を生成することでこれを保証します。

カスタム依存関係をインストールする

外部ライブラリのカスタム依存関係を定義することで、Databricks ランタイム環境を超えて Batch Unity カタログ Python UDF の機能を拡張できます。

カスタム依存関係 を使用して UDF を拡張するを参照してください。

バッチ UDF は、1 つまたは複数のパラメーターを受け入れる

1 つのパラメーター: ハンドラー関数は、1 つの入力パラメーターを使用すると、バッチごとに pandas.Series を反復処理する反復子を受け取ります。

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

複数のパラメーター: 複数の入力パラメーターの場合、ハンドラー関数は複数の pandas.Seriesを反復処理する反復子を受け取ります。 系列内の値は、入力パラメーターと同じ順序になります。

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

コストの高い操作を分離してパフォーマンスを最適化する

これらの操作をハンドラー関数から分離することで、計算コストの高い操作を最適化できます。 これにより、データのバッチに対する反復のたびに実行されるのではなく、1 回だけ実行されるようになります。

次の例は、高価な計算が 1 回だけ実行されるようにする方法を示しています。

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

バッチ Unity Catalog Python UDF のサービス資格情報

Batch Unity カタログ Python UDF は、Unity カタログ サービス資格情報を使用して外部クラウド サービスにアクセスできます。 これは、セキュリティ トークナイザーなどのクラウド関数をデータ処理ワークフローに統合する場合に特に便利です。

サービス資格情報を作成するには、「 サービス資格情報の作成」を参照してください。

UDF 定義の CREDENTIALS 句で使用するサービス資格情報を指定します。

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

サービス資格情報のアクセス許可

UDF 作成者には、Unity カタログ サービスの資格情報に対する ACCESS アクセス許可が必要です。 ただし、UDF 呼び出し元の場合は、UDF に対するアクセス許可 EXECUTE 付与するだけで十分です。 特に、UDF 呼び出し元は、UDF 作成者の資格情報アクセス許可を使用して UDF を実行するため、基になるサービス資格情報にアクセスする必要はありません。

一時関数の場合、作成者は常に呼び出し側です。 No-PE スコープ (専用クラスターとも呼ばれます) で実行される UDF は、代わりに呼び出し元のアクセス許可を使用します。

既定の資格情報とエイリアス

CREDENTIALS句には複数の資格情報を含めることができますが、DEFAULTとしてマークできるのは 1 つだけです。 AS キーワードを使用して、既定以外の資格情報のエイリアスを設定できます。 各資格情報には一意のエイリアスが必要です。

修正プログラムが適用されたクラウド SDK では、既定の資格情報が自動的に取得されます。 既定の資格情報は、コンピューティングの Spark 構成で指定された既定値よりも優先され、Unity カタログ UDF 定義に保持されます。

azure-identity プロバイダーを使用するには、DefaultAzureCredential パッケージをインストールする必要があります。 外部ライブラリをインストールするには、 ENVIRONMENT 句を使用します。 外部ライブラリのインストールの詳細については、「 カスタム依存関係を使用した UDF の拡張」を参照してください。

%sql
CREATE OR REPLACE TEMPORARY FUNCTION call_lambda_func(data STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "azure-mgmt-web"]', environment_version = 'None'
)
AS $$
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient

def batchhandler(it):
  # DefaultAzureCredential is automatically using 'batch-udf-service-creds-example-cred'
  web_client = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

  for vals in data:
    yield vals
$$

サービス資格情報の例 - AWS Lambda 関数

次の例では、サービス資格情報を使用して、Batch Unity カタログ Python UDF から AWS Lambda 関数を呼び出します。

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

登録後に UDF を呼び出します。

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

タスク実行コンテキストを取得する

TaskContext PySpark API を使用して、ユーザーの ID、クラスター タグ、Spark ジョブ ID などのコンテキスト情報を取得します。 UDF でのタスク コンテキストの取得を参照してください。

制限事項

  • Python 関数は NULL 値を個別に処理する必要があり、すべての型マッピングは Azure Databricks SQL 言語マッピングに従う必要があります。
  • Batch Unity カタログ Python UDF は、セキュリティで保護された分離された環境で実行され、共有ファイル システムや内部サービスにはアクセスできません。
  • ステージ内の複数の UDF 呼び出しがシリアル化され、中間結果が具体化され、ディスクに流出する可能性があります。
  • サービス資格情報は、Batch Unity カタログ Python UDF でのみ使用できます。 標準の Unity カタログ Python UDF または PySpark UDF ではサポートされていません。
  • 専用クラスターおよび一時関数の場合、関数の呼び出し元には、サービス資格情報に対する ACCESS アクセス許可が必要です。 「サービス資格情報を使用して外部クラウド サービスにアクセスするためのアクセス許可を付与する」を参照してください。
  • パブリック プレビュー機能を 有効にします。サーバーレス SQL Warehouse で UDF のネットワークを有効にして、サーバーレス SQL Warehouse コンピューティング上の外部サービスに対して Batch Unity カタログ Python UDF 呼び出しを行います。
  • サーバーレス ノートブックまたはジョブ コンピューティングで Batch Unity カタログ Python UDF 呼び出しを行うには、サーバーレス エグレス制御を構成する必要があります