次の方法で共有


MLflow と Ray の統合

MLflow は、機械学習と AI のワークロードを管理するためのオープンソース プラットフォームです。 Ray と MLflow を組み合わせると、Ray でワークロードを分散し、トレーニング中に生成されたモデル、メトリック、パラメーター、およびメタデータを MLflow で追跡できます。

この記事では、MLflow と次の Ray コンポーネントを統合する方法について説明します。

  • Ray Core: Ray Tune と Ray Train の対象外となる汎用分散アプリケーション
  • Ray Train: 分散モデル トレーニング
  • Ray Tune: 分散ハイパーパラメーター チューニング
  • Model Serving: リアルタイム推論用のモデルのデプロイ

Ray Core と MLflow の統合

Ray Core は、汎用分散アプリケーションの基本的な構成要素を提供します。 これにより、Python の関数とクラスを複数のノードにスケーリングできます。

このセクションでは、Ray Core と MLflow を統合するための次のパターンについて説明します。

  • Ray ドライバー プロセスから MLflow モデルをログに記録する
  • 子プロセス実行から MLflow モデルをログに記録する

Ray ドライバー プロセスから MLflow をログに記録する

一般に、ワーカー ノードからではなく、ドライバー プロセスから MLflow モデルをログに記録することをお勧めします。 これは、ステートフルな参照をリモート ワーカーに渡す際に複雑さが増すためです。

たとえば、MLflow Tracking Server がワーカー ノード内から MLflow Client を使用して初期化されていないため、次のコードは失敗します。

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

代わりに、メトリックをドライバー ノードに返します。 一般に、メトリックとメタデータのサイズは小さいため、メモリの問題を引き起こさずにドライバーに転送することができます。

上の例を例にとり、Ray タスクから返されたメトリックをログに記録するように更新します。

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

大規模な Pandas テーブル、イメージ、プロット、モデルなど、大規模なアーティファクトを保存する必要があるタスクの場合、Databricks ではアーティファクトをファイルとして保持することをお勧めします。 次に、ドライバー コンテキスト内でアーティファクトを再読み込みするか、保存されたファイルへのパスを指定して MLflow でオブジェクトを直接ログに記録します。

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

MLflow の子プロセス実行として Ray タスクをログに記録する

子プロセス実行を使用して、Ray Core と MLflow を統合できます。 これには、次の手順が含まれます。

  1. 親プロセス実行を作成する: ドライバー プロセスで親プロセス実行を初期化します。 この実行は、後続のすべての子プロセス実行の階層コンテナーとして機能します。
  2. 子プロセス実行を作成する: 各 Ray タスク内で、親プロセス実行の下で子プロセス実行を開始します。 各子プロセス実行では、独自のメトリックを個別にログに記録できます。

この方法を実装するには、各 Ray タスクで必要なクライアント認証情報と親 run_id を確実に受け取れるようにします。 このセットアップにより、実行間の階層的な親子関係が確立されます。 次のコード スニペットは、認証情報を取得し、親 run_id に渡す方法を示しています。

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray Train と MLflow

Ray Train モデルを MLflow にログ記録する最も簡単な方法は、トレーニング実行によって生成されたチェックポイントを使用することです。 トレーニング実行が完了したら、ネイティブのディープ ラーニング フレームワーク (PyTorch や TensorFlow など) でモデルを再読み込みし、対応する MLflow コードでログに記録します。

この方法により、モデルが正しく保存され、評価またはデプロイの準備が整います。

次のコードは、Ray Train チェックポイントからモデルを再読み込みし、MLflow にログ記録します。

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

一般に、Ray Train を使用してオブジェクトをドライバー ノードに送り返すのがベスト プラクティスですが、最終的な結果を保存する方が、ワーカー プロセスのトレーニング履歴全体よりも簡単です。

トレーニング実行からの複数のモデルを保存するには、ray.train.CheckpointConfig に保持するチェックポイントの数を指定します。 そうすれば、単一のモデルを保存するのと同じ方法でモデルを読み込み、ログに記録できます。

MLflow は、モデルトレーニング中のフォールト トレランスの処理ではなく、モデルのライフサイクルを追跡する役割を担います。 フォールト トレランスの管理は Ray Train が代わりに行います。

Ray Train で指定されたトレーニング メトリックを保存するには、結果オブジェクトからメトリックを取得し、MLflow を使用して保存します。

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Spark クラスターと Ray クラスターを適切に構成し、リソース割り当ての問題を回避するには、resources_per_worker 設定を調整する必要があります。 具体的には、各 Ray ワーカーの CPU の数を、Ray ワーカー ノードで使用可能な CPU の合計数より 1 つ少なく設定します。 トレーナーが使用可能なすべてのコアを Ray アクター用に予約すると、リソース競合エラーが発生する可能性があるため、この調整は非常に重要です。

Ray Tune と MLflow

Ray Tune と MLflow を統合すると、Databricks 内でハイパーパラメーター チューニング実験を効率的に追跡してログに記録できます。 この統合では、MLflow の実験追跡機能を利用して、Ray タスクから直接メトリックと結果を記録します。

ログ記録のための子プロセス実行のアプローチ

Ray Core タスクからのログ記録と同様に、Ray Tune アプリケーションでは、子プロセス実行アプローチを使用して、各試行またはチューニング イテレーションのメトリックをログに記録できます。 子プロセス実行アプローチを実装するには、次の手順に従ってください。

  1. 親プロセス実行を作成する: ドライバー プロセスで親プロセス実行を初期化します。 この実行は、後続のすべての子プロセス実行のメイン コンテナーとして機能します。
  2. 子プロセス実行をログ記録する: 各 Ray Tune タスクは、親プロセス実行の下に子プロセス実行を作成し、実験結果の明確な階層を維持します。

次の例では、MLflow を使用して Ray Tune タスクの認証と Ray Tune タスクからのログ記録を行う方法を示します。

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow

mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)

def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1

def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)

def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()

with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

モデル提供

Databricks クラスターで Ray Serve を使用してリアルタイム推論を行うと、外部アプリケーションと対話する際のネットワーク セキュリティと接続の制限が原因で課題が生じます。

Databricks では、Model Serving を使用して、運用環境の機械学習モデルを REST API エンドポイントにデプロイすることをお勧めします。 詳細については、「カスタム モデルのデプロイ」を参照してください。