次の方法で共有


機械学習パイプライン ステップとの間でのデータの移動 (Python)

適用対象:Azure Machine Learning SDK v1 for Python

重要

この記事では、Azure Machine Learning SDK v1 の使用に関する情報を提供します。 SDK v1 は 2025 年 3 月 31 日の時点で非推奨となり、サポートは 2026 年 6 月 30 日に終了します。 SDK v1 は、その日付までインストールして使用できます。

2026 年 6 月 30 日より前に SDK v2 に移行することをお勧めします。 SDK v2 の詳細については、「 Azure Machine Learning Python SDK v2SDK v2 リファレンスとは」を参照してください。

この記事では、データのインポート、データの変換、Azure Machine Learning パイプラインのステップ間でのデータの移動を行うコードを提供します。 Azure Machine Learning でデータがどのように動作するかの概要については、Azure ストレージ サービスのデータへのアクセスに関する記事を参照してください。 Azure Machine Learning パイプラインの利点と構造については、「 Azure Machine Learning パイプラインとは」を参照してください。

この記事では、次の方法について説明します。

  • 既存のデータに Dataset オブジェクトを使用する
  • ステップ内でデータにアクセスする
  • トレーニングや検証のサブセットなど、Dataset データをサブセットに分割する
  • 次のパイプライン ステップにデータを転送する OutputFileDatasetConfig オブジェクトを作成する
  • パイプライン ステップへの入力として OutputFileDatasetConfig オブジェクトを使用する
  • OutputFileDatasetConfigから永続化したい新しいDatasetオブジェクトを作成する

前提条件

既存のデータに Dataset オブジェクトを使用する

パイプラインにデータを取り込む方法としては、データセット オブジェクトを使用することをお勧めします。 Dataset オブジェクトは、ワークスペース全体で使用できる永続的なデータを表します。

Dataset オブジェクトを作成および登録する方法は多数あります。 表形式データセットは、1 つ以上のファイルで使用できる区切りデータ用です。 ファイル データセットは、バイナリ データ (画像など) または解析するデータ用です。 Dataset オブジェクトを作成する最も簡単なプログラム的方法は、ワークスペース ストレージまたはパブリック URL で既存の Blob を使用することです。

datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))

datastore_path = [
    DataPath(datastore, 'animals/dog/1.jpg'),
    DataPath(datastore, 'animals/dog/2.jpg'),
    DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)

さまざまなオプションと異なるソースからのデータセットの作成、それらを登録して Azure Machine Learning UI で確認する方法、データ サイズとコンピューティング容量の対話方法、およびバージョン管理の詳細については、「 Azure Machine Learning データセットの作成」を参照してください。

データセットをスクリプトに渡す

データセットのパスをスクリプトに渡すには、Dataset オブジェクトの as_named_input() メソッドを使用します。 結果の DatasetConsumptionConfig オブジェクトを引数としてスクリプトに渡すか、パイプライン スクリプトに inputs 引数を使用して、 Run.get_context().input_datasets[]を使用してデータセットを取得できます。

名前付き入力を作成した後、アクセス モード ( FileDataset のみ) を選択できます( as_mount() または as_download())。 スクリプトがデータセット内のすべてのファイルを処理し、コンピューティング リソース上のディスクがデータセットに十分な大きさである場合は、ダウンロード アクセス モードをお勧めします。 ダウンロード アクセス モードを使用すると、実行時のデータ ストリーミングのオーバーヘッドを回避できます。 スクリプトがデータセットのサブセットにアクセスする場合、またはコンピューティングに対して大きすぎる場合は、マウント アクセス モードを使用します。 詳細については、「 マウントとダウンロード」を参照してください。

データセットをパイプライン ステップに渡すには、次の手順を実行します。

  1. DatasetConsumptionConfig オブジェクトを作成するには、TabularDataset.as_named_input()またはFileDataset.as_named_input() (末尾に s なし) を使用します
  2. FileDatasetのみ:as_mount()またはas_download()を使用してアクセス モードを設定します。 TabularDatasetでは、アクセス モードを設定できません。
  3. argumentsまたはinputsを使用して、データセットをパイプライン ステップに渡します。

次のスニペットは、iris_dataset (TabularDataset) を使用して、PythonScriptStep コンストラクター内でこれらの手順を組み合わせる一般的なパターンを示しています。


train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[iris_dataset.as_named_input('iris')]
)

注意

これらの引数 (つまり、 "train_data""train.py"clusteriris_dataset) の値を独自のデータに置き換える必要があります。 上記のスニペットは、呼び出しの形式のみを示しており、Microsoft サンプルの一部ではありません。

random_split()take_sample()などのメソッドを使用して、複数の入力を作成したり、パイプライン ステップに渡されるデータの量を減らしたりすることもできます。

seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)

train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[train.as_named_input('train'), test.as_named_input('test')]
)

スクリプト内でデータセットにアクセスする

パイプライン ステップ スクリプトへの名前付き入力は、Run オブジェクト内でディクショナリとして使用できます。 Run.get_context()を使用してアクティブなRun オブジェクトを取得し、input_datasetsを使用して名前付き入力のディクショナリを取得します。 inputs引数ではなくarguments引数を使用してDatasetConsumptionConfig オブジェクトを渡した場合は、ArgumentParser コードを使用してデータにアクセスします。 次のスニペットでは、両方の手法が示されています。

パイプライン定義スクリプト

# Code is for demonstration only: It would be confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    # Datasets passed as arguments
    arguments=['--training-folder', train.as_named_input('train').as_download()],
    # Datasets passed as inputs
    inputs=[test.as_named_input('test').as_download()]
)

train.pyPythonScriptStep から参照されるスクリプト

# In pipeline script
parser = argparse.ArgumentParser()
# Retrieve the dataset passed as an argument
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
# Retrieve the dataset passed as an input
testing_data_folder = Run.get_context().input_datasets['test']

渡された値は、データセット ファイルへのパスです。

登録されたデータセットは永続的であり、ワークスペース全体で共有されるため、それらを直接取得できます。

run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')

注意

上記のスニペットは、呼び出しの形式を示しています。 これらは Microsoft サンプルの一部ではありません。 引数を独自のプロジェクトの値に置き換える必要があります。

中間データに OutputFileDatasetConfig を使用する

Datasetオブジェクトは永続的なデータのみを表しますが、OutputFileDatasetConfig オブジェクトはパイプライン ステップからの一時的なデータ出力と永続的な出力データに使用できます。 OutputFileDatasetConfig では、BLOB ストレージ、ファイル共有、Azure Data Lake Storage Gen1、または Data Lake Storage Gen2 へのデータの書き込みがサポートされています。 マウント モードとアップロード モードの両方をサポートしています。 マウント モードでは、マウントされたディレクトリに書き込まれたファイルは、ファイルを閉じたときに永続的に保存されます。 アップロード モードでは、出力ディレクトリに書き込まれたファイルがジョブの最後にアップロードされます。 ジョブが失敗した場合、または取り消された場合、出力ディレクトリはアップロードされません。

OutputFileDatasetConfig オブジェクトの既定の動作は、ワークスペースの既定のデータストアに書き込みます。 arguments パラメーターを使用して、OutputFileDatasetConfig オブジェクトをPythonScriptStepに渡します。

from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')

dataprep_step = PythonScriptStep(
    name="prep_data",
    script_name="dataprep.py",
    compute_target=cluster,
    arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
    )

注意

OutputFileDatasetConfig への同時書き込みは失敗します。 1 つの OutputFileDatasetConfig を同時に使用しないでください。 分散トレーニングを使用する場合など、マルチプロセスの状況で 1 つのOutputFileDatasetConfigを共有しないでください。

トレーニング ステップの出力として OutputFileDatasetConfig を使用する

パイプラインの PythonScriptStepでは、プログラムの引数を使用して使用可能な出力パスを取得できます。 この手順が最初のステップであり、出力データを初期化する場合は、指定したパスにディレクトリを作成する必要があります。 その後、 OutputFileDatasetConfigに含めるファイルを記述できます。

parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()

# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
    f.write("Step 1's output")

最初以外のステップへの入力として OutputFileDatasetConfig を読み取る

最初のパイプライン ステップで OutputFileDatasetConfig パスにデータが書き込まれ、それがその最初のステップの出力になった場合、それを後のステップへの入力として使用できます。

次のコードの内容は以下のとおりです。

  • step1_output_data は、 PythonScriptStepstep1 の出力がアップロード アクセス モードで Data Lake Storage Gen2 データストア my_adlsgen2 に書き込まれることを示します。 Data Lake Storage Gen2 データストアにデータを書き戻すためにロールのアクセス許可を設定する方法については、「データストアを使用 して Azure 上のストレージ サービスに接続する」を参照してください。

  • step1が完了し、step1_output_dataによって示される出力先に出力が書き込まれた後、step2step1_output_dataを入力として使用する準備が整います。

# Get Data Lake Storage Gen2 datastore that's already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()

step1 = PythonScriptStep(
    name="generate_data",
    script_name="step1.py",
    runconfig = aml_run_config,
    arguments = ["--output_path", step1_output_data]
)

step2 = PythonScriptStep(
    name="read_pipeline_data",
    script_name="step2.py",
    compute_target=compute,
    runconfig = aml_run_config,
    arguments = ["--pd", step1_output_data.as_input()]

)

pipeline = Pipeline(workspace=ws, steps=[step1, step2])

ヒント

Python スクリプト step2.py のデータを読み取るプロセスは、スクリプト 内の Access データセットで前述したプロセスと同じです。 ArgumentParserを使用して、--pdの引数をスクリプトに追加してデータにアクセスします。

再利用のために OutputFileDatasetConfig オブジェクトを登録する

OutputFileDatasetConfig オブジェクトを実験の期間よりも長く使用できるようにする場合は、それをワークスペースに登録して、実験間で共有および再利用します。

step1_output_ds = step1_output_data.register_on_complete(
    name='processed_data', 
    description = 'files from step1'
)

不要になったコンテンツ OutputFileDatasetConfig 削除する

azure では、 OutputFileDatasetConfigで書き込まれた中間データは自動的に削除されません。 大量の不要なデータに対するストレージ料金を回避するには、次のいずれかのアクションを実行する必要があります。

  • 不要になった場合は、パイプライン ジョブの終了時に中間データをプログラムで削除してください。

  • 中間データ用の短期ストレージ ポリシーで BLOB ストレージを使用します。 ( 「Azure Blob Storage のアクセス層を自動化してコストを最適化する」を参照してください)。このポリシーは、ワークスペースの既定以外のデータストアでのみ設定できます。 中間データを既定ではない別のデータ ストアにエクスポートするには、OutputFileDatasetConfig を使います。

    # Get Data Lake Storage Gen2 datastore that's already registered with the workspace
    datastore = workspace.datastores['my_adlsgen2']
    step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
    
  • データを定期的に確認し、不要なデータを削除します。

注意事項

中間データの削除は、必ずデータの最終変更日から 30 日が経過した後に行ってください。 以前に中間データを削除すると、パイプラインの実行が失敗する可能性があります。これは、パイプラインが再利用のために 30 日間データが存在することを前提としているためです。

詳細については、「Azure Machine Learning のコストを計画して管理する」を参照してください。

次のステップ