Batch Unity カタログ Python UDF または PySpark UDF の実行中にコンテキスト情報を取得するには、TaskContext PySpark API を使用します。
たとえば、ユーザーの ID やクラスター タグなどのコンテキスト情報は、外部サービスにアクセスするためにユーザーの ID を検証できます。
要求事項
TaskContext は、Databricks Runtime バージョン 16.3 以降でサポートされています。
TaskContext は、次の UDF の種類でサポートされています。
TaskContext を使用してコンテキスト情報を取得する
タブを選択すると、PySpark UDF または Batch Unity カタログ Python UDF の TaskContext の例が表示されます。
PySpark UDFの
次の PySpark UDF の例では、ユーザーのコンテキストが出力されます。
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
バッチ Unity Catalog Python UDF
次の Batch Unity カタログ Python UDF の例では、サービス資格情報を使用して AWS Lambda 関数を呼び出すユーザーの ID を取得します。
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
登録後に UDF を呼び出します。
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
TaskContext プロパティ
TaskContext.getLocalProperty()
メソッドには、次のプロパティ キーがあります。
プロパティ キー | 説明 | 使用例 |
---|---|---|
user |
現在 UDF を実行しているユーザー | tc.getLocalProperty("user") -> "alice" |
spark.jobGroup.id |
現在の UDF に関連付けられている Spark ジョブ グループ ID | tc.getLocalProperty("spark.jobGroup.id") -> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
JSON ディクショナリの文字列表現として書式設定されたキーと値のペアとしてのクラスター メタデータ タグ | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags") -> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
ワークスペースが存在するリージョン | tc.getLocalProperty("spark.databricks.clusterUsageTags.region") -> "us-west-2" |
accountId |
実行コンテキストの Databricks アカウント ID | tc.getLocalProperty("accountId") -> "1234567890123456" |
orgId |
ワークスペース ID (DBSQL では使用できません) | tc.getLocalProperty("orgId") -> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
クラスターの Databricks ランタイム バージョン (DBSQL 以外の環境) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion") -> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
DBSQL バージョン (DBSQL 環境) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion") -> "2024.35" |