この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 ここでは、UDF を登録する方法、UDF を呼び出す方法、Spark SQL における部分式の評価順序に関する注意点を示します。
Databricks Runtime 14.0 以降では、Python ユーザー定義テーブル関数 (UDF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。 Python ユーザー定義テーブル関数 (UDF) を参照してください。
注
Databricks Runtime 12.2 LTS 以下では、標準アクセス モードを使用する Unity カタログ コンピューティングでは、Python UDF と Pandas UDF はサポートされていません。 スカラー Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降で、すべてのアクセス モードでサポートされています。
Databricks Runtime 13.3 LTS 以降では、SQL 構文を使用してスカラー Python UDF を Unity カタログに登録できます。 「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
必要に応じて、UDF の戻り値の型を設定できます。 既定の戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrame で UDF を使用する
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
また、次のように注釈構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL (SQL、DataFrame、データセット API を含む) では、部分式の評価の順序は保証されません。 特に、演算子や関数の入力は、必ずしも左から右へ、またはその他の決まった順序で評価されるとは限りません。 たとえば、AND
および OR
論理式には、左から右への "短絡" セマンティクスはありません。
したがって、クエリの最適化および計画の際に式や句の順序は並べ替えられる可能性があるため、ブール式の副作用や評価の順序および WHERE
と HAVING
句の順序に依存することは危険です。 具体的には、UDF が NULL チェックのために SQL のショートサーキット セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われるという保証はありません。 たとえば、次のように入力します。
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句を使用しても、null を除外した後に strlen
UDF が呼び出されることは保証されません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
- UDF 自体を null で認識し、UDF 自体の内部で null チェックを実行する
-
IF
またはCASE WHEN
式を使用して null チェックを実行し、条件分岐で UDF を呼び出す
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
タスク実行コンテキストを取得する
TaskContext PySpark API を使用して、ユーザーの ID、クラスター タグ、Spark ジョブ ID などのコンテキスト情報を取得します。 UDF でのタスク コンテキストの取得を参照してください。
制限事項
PySpark UDF には、次の制限事項が適用されます。
ファイル アクセスの制限: Databricks Runtime 14.2 以下では、共有クラスター上の PySpark UDF は Git フォルダー、ワークスペース ファイル、または Unity カタログ ボリュームにアクセスできません。
ブロードキャスト変数: 標準アクセス モード クラスターとサーバーレス コンピューティング上の PySpark UDF は、ブロードキャスト変数をサポートしていません。
-
メモリ制限: サーバーレス コンピューティング上の PySpark UDF には、PySpark UDF あたり 1 GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.