次の方法で共有


ユーザー定義スカラー関数 - Scala

この記事には、Scala ユーザー定義関数 (UDF) の例が含まれています。 ここでは、UDF の登録方法、UDF の呼び出し方法、Spark SQL での部分式の評価順序に関する注意点を示します。 詳しくは、「外部ユーザー定義スカラー関数 (UDF)」をご覧ください。

標準アクセス モード (以前の共有アクセス モード) を使用する Unity カタログ対応コンピューティング リソース上の Scala UDF には、Databricks Runtime 14.2 以降が必要です。

関数を UDF として登録する

val squared = (s: Long) => {
  s * s
}
spark.udf.register("square", squared)

Spark SQL で UDF を呼び出す

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test

DataFrame で UDF を使用する

import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

評価順序と null チェック

Spark SQL (SQL、DataFrame、Dataset API を含む) では、部分式の評価の順序は保証されません。 特に、演算子や関数の入力は、必ずしも左から右へ、またはその他の決まった順序で評価されるとは限りません。 たとえば、AND および OR 論理式には、左から右への "短絡" セマンティクスはありません。

したがって、クエリの最適化および計画の際に式や句の順序は並べ替えられる可能性があるため、ブール式の副作用や評価の順序および WHEREHAVING 句の順序に依存することは危険です。 具体的には、UDF が NULL チェックのために SQL のショートサーキット セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われるという保証はありません。 たとえば、次のように入力します。

spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

この WHERE 句を使用しても、null を除外した後に strlen UDF が呼び出されることは保証されません。

適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。

  • UDF 自体を null で認識し、UDF 自体の内部で null チェックを実行する
  • IF または CASE WHEN 式を使用して null チェックを実行し、条件分岐で UDF を呼び出す
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

型指定されたデータセット API

この機能は、Databricks Runtime 15.4 以降の標準アクセス モードの Unity カタログ対応クラスターでサポートされています。

型指定されたデータセット API を使用すると、ユーザー定義関数を使用して結果のデータセットに対してマップ、フィルター、集計などの変換を実行できます。

たとえば、次の Scala アプリケーションでは、map() API を使用して、結果列の数値をプレフィックス付き文字列に変更します。

spark.range(3).map(f => s"row-$f").show()

この例では map() API を使用していますが、これは、他の型指定されたデータセット API (filter()mapPartitions()foreach()foreachPartition()reduce()flatMap() など) にも適用されます。

Scala UDF の機能と Databricks ランタイムの互換性

次の Scala 機能では、標準 (共有) アクセス モードの Unity カタログ対応クラスターで使用する場合、Databricks ランタイムの最小バージョンが必要です。

特徴 Minimimum Databricks ランタイム バージョン
スカラー UDF Databricks Runtime 14.2
Dataset.mapDataset.mapPartitionsDataset.filterDataset.reduceDataset.flatMap Databricks Runtime 15.4
KeyValueGroupedDataset.flatMapGroupsKeyValueGroupedDataset.mapGroups Databricks Runtime 15.4
(ストリーミング) foreachWriter Sink Databricks Runtime 15.4
(ストリーミング) foreachBatch Databricks Runtime 16.1
(ストリーミング) KeyValueGroupedDataset.flatMapGroupsWithState Databricks Runtime 16.2