次の方法で共有


機械学習パイプラインをトリガーする

適用対象:Azure Machine Learning SDK v1 for Python

重要

この記事では、Azure Machine Learning SDK v1 の使用に関する情報を提供します。 SDK v1 は 2025 年 3 月 31 日の時点で非推奨となり、サポートは 2026 年 6 月 30 日に終了します。 SDK v1 は、その日付までインストールして使用できます。

2026 年 6 月 30 日より前に SDK v2 に移行することをお勧めします。 SDK v2 の詳細については、「 Azure Machine Learning Python SDK v2SDK v2 リファレンスとは」を参照してください。

この記事では、Azure で実行するパイプラインをプログラムでスケジュールする方法について説明します。 経過時間またはファイルシステムの変更に基づいてスケジュールを作成できます。 時間ベースのスケジュールを使用して、データ ドリフトの監視などの日常的なタスクを実行できます。 変更ベースのスケジュールを使用して、新しいデータのアップロードや編集中の古いデータなど、不規則または予測不可能な変更に対応できます。

スケジュールを作成する方法を学習したら、スケジュールを取得して非アクティブ化する方法を学習します。 最後に、他の Azure サービス (Azure Logic Apps と Azure Data Factory) を使用してパイプラインを実行する方法について説明します。 ロジック アプリを使用すると、より複雑なトリガー ロジックまたは動作が可能になります。 Azure Data Factory パイプラインを使用すると、より大きなデータ オーケストレーション パイプラインの一部として機械学習パイプラインを呼び出すことができます。

前提条件

必要な値を取得する

パイプラインをスケジュールするには、ワークスペースへの参照、発行されたパイプラインの識別子、スケジュールを作成する実験の名前が必要です。 これらの値は、次のコードを使用して取得できます。

import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:
    print(experiment.name)

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

experiment_name = "MyExperiment" 
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" 

スケジュールを作成する

定期的にパイプラインを実行するには、スケジュールを作成します。 Schedule によって、パイプライン、実験、トリガーが関連付けられます。 トリガーには、ジョブ間の待機時間を定義する ScheduleRecurrence 、または変更を監視するディレクトリを指定するデータストア パスを指定できます。 どちらの場合も、パイプライン識別子と、スケジュールを作成する実験の名前が必要です。

Python ファイルの先頭に Schedule クラスと ScheduleRecurrence クラスをインポートします。


from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

時間ベースのスケジュールを作成する

ScheduleRecurrence コンストラクターには、"Minute""Hour""Day""Week"、または"Month"のいずれかの文字列に設定する必要がある必須のfrequency引数があります。 また、開始時刻の間に経過するfrequency単位の数を指定する整数interval引数も必要です。 省略可能な引数を使用すると、 ScheduleRecurrence ドキュメントで説明されているように、開始時刻をより具体的にすることができます。

15 分ごとにジョブを開始する Schedule を作成します。

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",
                            pipeline_id=pipeline_id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

変更ベースのスケジュールを作成する

ファイルの変更によってトリガーされるパイプラインは、時間ベースのスケジュールよりも効率的な場合があります。 ファイルが変更される前に、または新しいファイルがデータ ディレクトリに追加されたときに何かを実行したい場合は、そのファイルを前処理することができます。 データストアに対する変更や、データストア内の特定のディレクトリ内の変更を監視できます。 特定のディレクトリを監視する場合、そのディレクトリのサブディレクトリ内の変更はジョブをトリガーしません。

変更ベースのスケジュールでは、Azure Blob Storage の監視のみがサポートされます。

ファイルリアクティブ Scheduleを作成するには、Schedule.create の呼び出しで datastore パラメーターを設定する必要があります。 フォルダーを監視するには、path_on_datastore 引数を設定します。

polling_interval引数を使用すると、データストアが変更をチェックする頻度を分単位で指定できます。

パイプラインが DataPathPipelineParameter を使用して構築された場合は、data_path_parameter_name 引数を設定することで、その変数を変更されたファイルの名前に設定できます。

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")

スケジュールを作成するための省略可能な引数

前に説明した引数に加えて、 status 引数を "Disabled" に設定して、非アクティブなスケジュールを作成できます。 continue_on_step_failureを使用すると、パイプラインの既定のエラー動作をオーバーライドするブール値を渡すことができます。

スケジュールされたパイプラインを表示する

ブラウザーで、Azure Machine Learning Studio に移動します。 左側のウィンドウで、[エンドポイント] アイコンを選択します。 [エンドポイント] ウィンドウ 、[ リアルタイム エンドポイント] を選択します。 これにより、ワークスペースに発行されたパイプラインの一覧が表示されます。

[エンドポイント] ウィンドウを示すスクリーンショット。

このページでは、ワークスペース内のすべてのパイプラインに関する概要情報 (名前、説明、状態など) を確認できます。 パイプラインの名前を選択すると、詳細情報を取得できます。 結果のページでは、個々のジョブに関する情報を取得することもできます。

パイプラインを非アクティブ化する

公開されているがスケジュールされていない Pipeline がある場合は、次のコードを使用して無効にすることができます。

pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()

パイプラインがスケジュールされている場合は、最初にスケジュールをキャンセルする必要があります。 ポータルから、または次のコードを実行して、スケジュールの識別子を取得します。

ss = Schedule.list(ws)
for s in ss:
    print(s)

無効にするスケジュールの schedule_id を作成したら、次のコードを実行します。

def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

stop_by_schedule_id(ws, schedule_id)

その後、Schedule.list(ws) を再度実行すると、空のリストが取得されます。

複雑なトリガーに Logic Apps を使用する

Logic Apps を使用して、より複雑なトリガー ルールまたは動作を作成できます。

ロジック アプリを使用して Machine Learning パイプラインをトリガーするには、発行された Machine Learning パイプラインの REST エンドポイントが必要です。 パイプラインを作成して公開します。 次に、パイプライン ID を使用して、PublishedPipeline の REST エンドポイントを見つけます。

# You can find the pipeline ID in Azure Machine Learning studio

published_pipeline = PublishedPipeline.get(ws, id="<pipeline-id-here>")
published_pipeline.endpoint 

Azure でロジック アプリを作成する

次に、 ロジック アプリを作成します。 ロジック アプリがプロビジョニングされたら、以下の手順に従ってパイプラインのトリガーを構成します。

  1. システム割り当てマネージド ID を作成 して、アプリに Azure Machine Learning ワークスペースへのアクセス権を付与します。

  2. 左側のウィンドウで、[開発ツール] セクションで [ロジック アプリ テンプレート] を選択します。

  3. 空のワークフローのテンプレートを選択します。

    空のロジック アプリ テンプレートのボタンを示すスクリーンショット。

  4. デザイナーで、[トリガーの 追加] を選択します。

  5. [トリガーの追加] ペインで、blob を検索します。 BLOB が追加または変更されたとき (プロパティのみ) トリガーを選択します。

    ロジック アプリにトリガーを追加する方法を示すスクリーンショット。

  6. [ 接続の作成 ] ウィンドウで、BLOB の追加または変更を監視する Blob Storage アカウントの接続情報を指定し、[ 新規作成] を選択します。 監視するコンテナーを選択します。

    [ Interval] \(間隔\) と [ Frequency]\(頻度 \) の値を選択します。

    このトリガーは、選択したコンテナーを監視しますが、サブフォルダーは監視しません。

  7. BLOB が変更されたとき、または新しい BLOB が検出されたときに実行される HTTP アクションを追加します。 トリガーの下にあるプラス記号 (+) を選択し、[ アクションの追加] を選択します。

  8. [ アクションの追加 ] ウィンドウで、 HTTP アクションを選択します。 表示されない場合は検索できます。

    HTTP アクションを追加する方法を示すスクリーンショット。

  9. 結果のウィンドウで、[ HTTP] を選択します。

    以下の設定を使用してアクションを構成します。

    設定
    URI 公開されたパイプラインのエンドポイント。 「前提条件」を参照してください。
    方式 POST
    認証の種類 ( [詳細設定] の下) マネージド ID
  10. 持っている DataPath PipelineParameters の値を設定するようにスケジュールを構成します。

    {
      "DataPathAssignments": {
        "input_datapath": {
          "DataStoreName": "<datastore-name>",
          "RelativePath": "@{triggerBody()?['Name']}" 
        }
      },
      "ExperimentName": "MyRestPipeline",
      "ParameterAssignments": {
        "input_string": "sample_string3"
      },
      "RunSource": "SDK"
    }
    

    ワークスペースに追加した DataStoreName前提条件として使用します。

    HTTP 設定を示すスクリーンショット。

  11. 保存 を選択します。

重要

Azure ロールベースのアクセス制御 (Azure RBAC) を使用してパイプラインへのアクセスを管理する場合は、パイプライン シナリオ (トレーニングまたはスコアリング) のアクセス許可を設定します。

Azure Data Factory パイプラインから機械学習パイプラインを呼び出す

Azure Data Factory パイプラインでは、Machine Learning 実行パイプライン アクティビティによって Azure Machine Learning パイプラインが実行されます。 このアクティビティは、Azure Data Factory のオーサリング ページの [ Machine Learning ] のメニューにあります。

Azure Data Factory 作成環境での機械学習パイプライン アクティビティを示すスクリーンショット。

次の手順

この記事では、Azure Machine Learning SDK for Python を使用して、2 つの異なる方法でパイプラインをスケジュールしました。 経過時間に基づいて 1 つのスケジュールがトリガーされます。 他のスケジュールは、指定した Datastore またはそのストアのディレクトリ内でファイルが変更された場合にトリガーされます。 ポータルを使用してパイプラインと個々のジョブを調べる方法を確認しました。 スケジュールを無効にしてパイプラインの実行を停止する方法を学習しました。 最後に、パイプラインをトリガーする Azure ロジック アプリを作成しました。

これらの記事に詳しい説明があります。