適用対象:Azure Machine Learning SDK v1 for Python
重要
この記事では、Azure Machine Learning SDK v1 の使用に関する情報を提供します。 SDK v1 は 2025 年 3 月 31 日の時点で非推奨となり、サポートは 2026 年 6 月 30 日に終了します。 SDK v1 は、その日付までインストールして使用できます。
2026 年 6 月 30 日より前に SDK v2 に移行することをお勧めします。 SDK v2 の詳細については、「 Azure Machine Learning Python SDK v2 と SDK v2 リファレンスとは」を参照してください。
この記事では、機械学習パイプラインを同僚や顧客と共有する方法について説明します。
機械学習パイプラインは、機械学習タスクの再利用可能なワークフローです。 パイプラインの利点の 1 つは、コラボレーションの強化です。 また、パイプラインのバージョンを設定して、新しいバージョンの作業中に顧客が現在のモデルを使用できるようにすることもできます。
前提条件
パイプライン リソースを格納する Azure Machine Learning ワークスペース を作成します。
Azure Machine Learning SDK をインストールして開発環境を構成するか、SDK が既にインストールされている Azure Machine Learning コンピューティング インスタンスを使用します。
機械学習パイプラインを作成して実行します。 この要件を満たす方法の 1 つは、「 チュートリアル: バッチ スコアリングのための Azure Machine Learning パイプラインを構築する」を完了することです。 その他のオプションについては、「 Azure Machine Learning SDK を使用した機械学習パイプラインの作成と実行」を参照してください。
パイプラインを発行する
実行中のパイプラインを作成したら、異なる入力で実行されるようにパイプラインを発行できます。 発行されたパイプラインの REST エンドポイントがパラメーターを受け入れるようにするには、異なる引数に PipelineParameter
オブジェクトを使用するようにパイプラインを構成する必要があります。
パイプライン パラメーターを作成するには、既定値の PipelineParameter オブジェクトを使用します。
from azureml.pipeline.core.graph import PipelineParameter pipeline_param = PipelineParameter( name="pipeline_arg", default_value=10)
次に示すように、
PipelineParameter
オブジェクトをパラメーターとしてパイプライン内のいずれかのステップに追加します。compareStep = PythonScriptStep( script_name="compare.py", arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param], inputs=[ comp_data1, comp_data2], outputs=[out_data3], compute_target=compute_target, source_directory=project_folder)
このパイプラインを発行します。このパイプラインは、呼び出されたときにパラメーターを受け入れます。
published_pipeline1 = pipeline_run1.publish_pipeline( name="My_Published_Pipeline", description="My Published Pipeline Description", version="1.0")
パイプラインを発行したら、UI で確認できます。 パイプライン ID は、発行されたパイプラインの一意識別子です。
発行されたパイプラインを実行する
発行されたすべてのパイプラインに REST エンドポイントがあります。 パイプライン エンドポイントを使用すると、Python 以外のクライアントを含む外部システムからパイプラインの実行をトリガーできます。 このエンドポイントを使用すると、バッチ スコアリングと再トレーニングのシナリオでマネージド反復性を実現できます。
重要
Azure ロールベースのアクセス制御 (RBAC) を使用してパイプラインへのアクセスを管理する場合は、 パイプライン シナリオ (トレーニングまたはスコアリング) のアクセス許可を設定します。
前のパイプラインの実行を呼び出すには、Microsoft Entra 認証ヘッダー トークンが必要です。 トークンを取得するプロセスについては、 AzureCliAuthentication クラス リファレンスと Azure Machine Learning ノートブックの認証で 説明されています。
from azureml.pipeline.core import PublishedPipeline
import requests
response = requests.post(published_pipeline1.endpoint,
headers=aad_token,
json={"ExperimentName": "My_Pipeline",
"ParameterAssignments": {"pipeline_arg": 20}})
POST 要求の json
引数には、 ParameterAssignments
キーに対して、パイプライン パラメーターとその値を含むディクショナリが含まれている必要があります。 さらに、 json
引数には次のキーを含めることができます。
鍵 | 説明 |
---|---|
ExperimentName |
エンドポイントに関連付けられている実験の名前。 |
Description |
エンドポイントを説明するフリーフォーム テキスト。 |
Tags |
要求のラベル付けと注釈付けに使用できる自由形式のキーと値のペア。 |
DataSetDefinitionValueAssignments |
再トレーニングを行わずにデータセットを変更するために使用されるディクショナリ。 (この記事の後半の説明を参照してください)。 |
DataPathAssignments |
再トレーニングを行わずにデータパスを変更するために使用されるディクショナリ。 (この記事の後半の説明を参照してください)。 |
C を使用して発行されたパイプラインを実行する#
次のコードでは、C# からパイプラインを非同期的に呼び出す方法を示します。 部分コード スニペットは、呼び出し構造を示しています。 完全なクラスやエラー処理は示されていません。 これは Microsoft サンプルの一部ではありません。
[DataContract]
public class SubmitPipelineRunRequest
{
[DataMember]
public string ExperimentName { get; set; }
[DataMember]
public string Description { get; set; }
[DataMember(IsRequired = false)]
public IDictionary<string, string> ParameterAssignments { get; set; }
}
// ... in its own class and method ...
const string RestEndpoint = "your-pipeline-endpoint";
using (HttpClient client = new HttpClient())
{
var submitPipelineRunRequest = new SubmitPipelineRunRequest()
{
ExperimentName = "YourExperimentName",
Description = "Asynchronous C# REST api call",
ParameterAssignments = new Dictionary<string, string>
{
{
// Replace with your pipeline parameter keys and values
"your-pipeline-parameter", "default-value"
}
}
};
string auth_key = "your-auth-key";
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);
// Submit the job
var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
if (!submitResponse.IsSuccessStatusCode)
{
await WriteFailedResponse(submitResponse); // ... method not shown ...
return;
}
var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
var obj = JObject.Parse(result);
// ... use `obj` dictionary to access results
}
Java を使用して発行されたパイプラインを実行する
次のコードは、認証を必要とするパイプラインの呼び出しを示しています。 ( 「Azure Machine Learning のリソースとワークフローの認証を設定する」を参照してください)。パイプラインがパブリックにデプロイされている場合、 authKey
を生成する呼び出しは必要ありません。 部分的なコード スニペットには、Java クラスと例外処理の定型コードは示されていません。 このコードでは、空のOptional
を返す可能性がある関数を連結するためにOptional.flatMap
を使用します。 flatMap
を使用すると、コードが短縮され、明確になりますが、getRequestBody()
により例外が受け入れられることに注意してください。
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;
String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";
HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();
HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token));
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc. (`scoringResult.orElse()`) ...
static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
String clientIdParam = String.format("client_id=%s", clientId);
String resourceParam = String.format("resource=%s", resourceManagerUrl);
String clientSecretParam = String.format("client_secret=%s", clientSecret);
String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(authUrl))
.POST(HttpRequest.BodyPublishers.ofString(bodyString))
.build();
return request;
}
static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(scoringUri))
.header("Authorization", String.format("Token %s", authKey))
.POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
.build();
return request;
}
static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
System.out.println(String.format("Unexpected server response %d", response.statusCode()));
return Optional.empty();
}
return Optional.of(response.body());
}catch(Exception x)
{
System.out.println(x.toString());
return Optional.empty();
}
}
class AuthenticationBody {
String access_token;
String token_type;
int expires_in;
String scope;
String refresh_token;
String id_token;
AuthenticationBody() {}
}
再トレーニングを行わずにデータセットとデータパスを変更する
さまざまなデータセットとデータパスでトレーニングと推論を行うこともできます。 たとえば、より小さなデータセットでトレーニングし、完全なデータセットに対する推論を行う場合があります。 要求の json
引数のDataSetDefinitionValueAssignments
キーを使用して、データセットを切り替えることができます。 DataPathAssignments
を使用してデータパスを切り替えることができます。 この手法は、次の両方で似ています。
パイプライン定義スクリプトで、データセットの
PipelineParameter
を作成します。PipelineParameter
からDatasetConsumptionConfig
またはDataPath
を作成します。tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv') tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset) tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
機械学習スクリプトで、
Run.get_context().input_datasets
を使用して動的に指定されたデータセットにアクセスします。from azureml.core import Run input_tabular_ds = Run.get_context().input_datasets['tabular_dataset'] dataframe = input_tabular_ds.to_pandas_dataframe() # ... etc. ...
機械学習スクリプトは、
PipelineParameter
(tabular_ds_param
) の値ではなく、DatasetConsumptionConfig
(tabular_dataset
) に指定された値にアクセスします。パイプライン定義スクリプトで、パラメーターとして
DatasetConsumptionConfig
をPipelineScriptStep
に設定します。train_step = PythonScriptStep( name="train_step", script_name="train_with_dataset.py", arguments=["--param1", tabular_ds_consumption], inputs=[tabular_ds_consumption], compute_target=compute_target, source_directory=source_directory) pipeline = Pipeline(workspace=ws, steps=[train_step])
推論 REST 呼び出しでデータセットを動的に切り替えるには、
DataSetDefinitionValueAssignments
を使用します。tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset') tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset') ds1_id = tabular_ds1.id d22_id = tabular_ds2.id response = requests.post(rest_endpoint, headers=aad_token, json={ "ExperimentName": "MyRestPipeline", "DataSetDefinitionValueAssignments": { "tabular_ds_param": { "SavedDataSetReference": {"Id": ds1_id #or ds2_id }}}})
データセットと PipelineParameter の表示とDataPath と PipelineParameter の表示に関するノートブックには、この手法の完全な例が含まれています。
バージョン管理されたパイプライン エンドポイントを作成する
複数の発行済みパイプラインを含むパイプライン エンドポイントを作成できます。 この手法を使用すると、機械学習パイプラインを反復処理して更新するときに、固定 REST エンドポイントが提供されます。
from azureml.pipeline.core import PipelineEndpoint
published_pipeline = PublishedPipeline.get(workspace=ws, id="My_Published_Pipeline_id")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
pipeline=published_pipeline, description="Test description Notebook")
パイプライン エンドポイントにジョブを送信する
パイプライン エンドポイントの既定のバージョンにジョブを送信できます。
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)
また、特定のバージョンにジョブを送信することもできます。
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)
REST API を使用して同じことを行うことができます。
rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint,
headers=aad_token,
json={"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"1": "united", "2":"city"}})
スタジオで発行されたパイプラインを使用する
スタジオから発行されたパイプラインを実行することもできます。
Azure Machine Learning スタジオにサインインします。
左側のメニューで、[エンドポイント] を選択 します。
パイプライン エンドポイントを選択します。
実行または使用する、またはパイプライン エンドポイントの以前の実行の結果を確認する、特定のパイプラインを選択します。
発行されたパイプラインを無効にする
発行されたパイプラインの一覧からパイプラインを非表示にするには、スタジオまたは SDK で無効にします。
# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()
p.enable()
を使用して、もう一度有効にすることができます。 詳細については、 PublishedPipeline クラス リファレンスを参照してください。
次のステップ
- GitHub でこれらの Jupyter ノートブックを使用して、機械学習パイプラインをさらに詳しく調べることができます。
- azureml-pipelines-core パッケージと azureml-pipelines-steps パッケージの SDK リファレンスを参照してください。
- パイプラインのデバッグとトラブルシューティングに関するヒントについては、「パイプライン をデバッグする方法」を 参照してください。