次の方法で共有


UDF でタスク コンテキストを取得する

Batch Unity カタログ Python UDF または PySpark UDF の実行中にコンテキスト情報を取得するには、TaskContext PySpark API を使用します。

たとえば、ユーザーの ID やクラスター タグなどのコンテキスト情報は、外部サービスにアクセスするためにユーザーの ID を検証できます。

要求事項

TaskContext を使用してコンテキスト情報を取得する

タブを選択すると、PySpark UDF または Batch Unity カタログ Python UDF の TaskContext の例が表示されます。

PySpark UDFの

次の PySpark UDF の例では、ユーザーのコンテキストが出力されます。

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

バッチ Unity Catalog Python UDF

次の Batch Unity カタログ Python UDF の例では、サービス資格情報を使用して AWS Lambda 関数を呼び出すユーザーの ID を取得します。

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

登録後に UDF を呼び出します。

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

TaskContext プロパティ

TaskContext.getLocalProperty() メソッドには、次のプロパティ キーがあります。

プロパティ キー 説明 使用例
user 現在 UDF を実行しているユーザー tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id 現在の UDF に関連付けられている Spark ジョブ グループ ID tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags JSON ディクショナリの文字列表現として書式設定されたキーと値のペアとしてのクラスター メタデータ タグ tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region ワークスペースが存在するリージョン tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId 実行コンテキストの Databricks アカウント ID tc.getLocalProperty("accountId")
->"1234567890123456"
orgId ワークスペース ID (DBSQL では使用できません) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion クラスターの Databricks ランタイム バージョン (DBSQL 以外の環境) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion DBSQL バージョン (DBSQL 環境) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"