次の方法で共有


ノートブックを調整し、ノートブックでコードをモジュール化する

ノートブックを調整し、ノートブックでコードをモジュール化する方法について説明します。 例を参照し、ノートブック オーケストレーションに代替方法を使用するタイミングを理解します。

オーケストレーションとコードのモジュール化メソッド

次の表は、ノートブックの調整とノートブック内のコードのモジュール化に使用できるメソッドを比較しています。

メソッド 利用シーン 注記
Lakeflow 作業 ノートブックオーケストレーション (推奨) ノートブックを調整するための推奨される方法。
タスクの依存関係、スケジュール、トリガーを含む複雑なワークフローをサポートします。 運用環境のワークロードに堅牢でスケーラブルなアプローチを提供しますが、セットアップと構成が必要です。
dbutils.notebook.run() ノートブックのオーケストレーション 動的なパラメーター セットに対するノートブックのループなど、ジョブでユース ケースをサポートできない場合は、 dbutils.notebook.run() を使用します。
呼び出しごとに新しいエフェメラル ジョブを開始します。これによってオーバーヘッドが増加し、高度なスケジュール機能が不足する可能性があります。
ワークスペース ファイル コードモジュール化 (推奨) コードをモジュール化するための推奨される方法。
ワークスペースに格納されている再利用可能なコード ファイルにコードをモジュール化します。 デバッグと単体テストを向上するために、リポジトリを使用したバージョン管理と IDE との統合をサポートします。 ファイル パスと依存関係を管理するための追加のセットアップが必要です。
%run コードのモジュール化 ワークスペース ファイルにアクセスできない場合は、 %run を使用します。
関数または変数をインラインで実行して、他のノートブックからインポートするだけです。 プロトタイプ作成に便利ですが、維持が困難な密結合コードにつながる可能性があります。 パラメーターの受け渡しやバージョン管理はサポートされていません。

%run 対。 dbutils.notebook.run()

%run コマンドを使用すると、ノートブック内に別のノートブックを含めることができます。 サポート関数を別のノートブックに配置することで、 %run を使用してコードをモジュール化できます。 また、分析にステップを実装するノートブックの連結にも使用できます。 %run を使用すると、呼び出されたノートブックがすぐに実行され、その中で定義されている関数と変数を呼び出し元のノートブックで使用できるようになります。

dbutils.notebook API は、ノートブックにパラメーターを渡したり、ノートブックから値を返したりできるため、%runを補完します。 これを使用すると、依存関係を含む複雑なワークフローとパイプラインを作成できます。 たとえば、ディレクトリ内のファイルの一覧を取得し、別のノートブックに名前を渡すことができます。これは、 %runでは不可能です。 戻り値に基づいて if-then-else ワークフローを作成することもできます。

%run とは異なり、dbutils.notebook.run() メソッドではノートブックを実行する新しいジョブを開始します。

すべての dbutils API と同様に、これらのメソッドは Python と Scala でのみ使用できます。 ただし、dbutils.notebook.run() を使用すると、R ノートブックを呼び出すことができます。

%run を使用してノートブックをインポートする

次の例では、1 つめのノートブックで関数 reverse が定義されています。これは、%run マジックを使用して shared-code-notebook を実行した後に、2 つめのノートブックで使用できます。

共有コード ノートブック

ノートブックのインポート例

両方のノートブックがワークスペース内の同じディレクトリ内にあるため、./のプレフィックス ./shared-code-notebookを使用して、現在実行中のノートブックに対してパスを解決する必要があることを示します。 %run ./dir/notebook のようにノートブックをディレクトリに整理することも、%run /Users/username@organization.com/directory/notebook のように絶対パスを使用することもできます。

注意

  • %run はセル内に単独で存在している必要があります。このコマンドでノートブック全体がインラインで実行されるからです。
  • %run を使用して Python ファイルを実行し、そのファイル内で定義されているエンティティをノートブックに import することはできません。 Python ファイルからインポートするには、ファイルを使用してコードをモジュール化する方法に関する記事を参照してください。 または、そのファイルをパッケージして Python ライブラリを作成し、その Python ライブラリから Azure Databricks ライブラリを作成します。ノートブックの実行に使用するクラスターにこのライブラリをインストールする必要があります。
  • %runを使用してウィジェットを含むノートブックを実行すると、既定では、指定したノートブックがウィジェットの既定値で実行されます。 ウィジェットに値を渡すこともできます。 %runで Databricks ウィジェットを使用する を参照してください。

dbutils.notebook.runを使用して新しいジョブを開始する

ノートブックを実行し、その終了値を返します。 このメソッドでは、すぐに実行される一時的なジョブを開始します。

dbutils.notebook API で使用できるメソッドは runexit です。 パラメーターと戻り値はどちらも文字列でなければなりません。

run(path: String, timeout_seconds: int, arguments: Map): String

timeout_seconds パラメーターは、実行のタイムアウトを制御します (0 はタイムアウトがないことを意味します)。 runの呼び出しは、指定した時間内に終了しない場合に例外を投げます。 Azure Databricks がダウンしたまま 10 分を超えると、timeout_seconds に関係なく、ノートブックの実行は失敗します。

arguments パラメーターでは、ターゲット ノートブックのウィジェットの値を設定します。 具体的には、実行しているノートブックに A という名前のウィジェットがあり、引数パラメーターの一部としてキーと値のペア ("A": "B")run() の呼び出しに渡す場合、ウィジェット A の値を取得すると "B" が返されます。 ウィジェットの作成と操作の手順については、「Databricks ウィジェット」の記事を参照してください。

注意

  • arguments パラメーターには、ラテン文字 (ASCII 文字セット) のみを使用できます。 非 ASCII 文字を使用すると、エラーが返されます。
  • dbutils.notebook API を使用して作成されたジョブは、30 日以内に完了する必要があります。

run 使用法

Python(プログラミング言語)

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

スカラ (プログラミング言語)

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

ノートブック間で構造化データを渡す

このセクションでは、ノートブック間で構造化データを渡す方法について説明します。

Python(プログラミング言語)

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

スカラ (プログラミング言語)

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

エラーを処理する

このセクションでは、エラーを処理する方法について説明します。

Python(プログラミング言語)

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

スカラ (プログラミング言語)

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here, we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

複数のノートブックを同時に実行する

複数のノートブックを同時に実行するには、標準の Scala と Python のコンストラクトを使用します。たとえば、スレッド (ScalaPython) やフューチャ (ScalaPython) です。 ノートブック例には、これらのコンストラクトの使用方法が記載されています。

  1. 次の 4 つのノートブックをダウンロードします。 ノートブックは Scala で記述されています。
  2. ワークスペース内の 1 つのフォルダーにノートブックをインポートします。
  3. 同時に実行するノートブックを実行します。

同時に実行するノートブック

ノートブックを入手

並列実行するノートブック

ノートブックを入手

テスト用ノートブック

ノートブックを入手

テスト 2 用ノートブック

ノートブックを入手