適用対象:Azure Machine Learning SDK v1 for Python
重要
この記事では、Azure Machine Learning SDK v1 の使用に関する情報を提供します。 SDK v1 は、2025 年 3 月 31 日の時点で非推奨です。 サポートは 2026 年 6 月 30 日に終了します。 SDK v1 は、その日付までインストールして使用できます。
2026 年 6 月 30 日より前に SDK v2 に移行することをお勧めします。 SDK v2 の詳細については、「 Azure Machine Learning CLI と Python SDK v2 とは」 および SDK v2 リファレンスを参照してください。
警告
Python SDK v1 で利用できる Azure Machine Learning との Azure Synapse Analytics の統合は非推奨になっています。 ユーザーは、Azure Machine Learning に登録された Synapse ワークスペースをリンク サービスとして引き続き使用できます。 ただし、新しい Synapse ワークスペースは、リンクされたサービスとして Azure Machine Learning に登録できなくなります。 CLI v2 と Python SDK v2 で利用できるサーバーレス Spark コンピューティングと、アタッチされた Synapse Spark プールの使用をお勧めします。 詳細については、「 Azure Machine Learning での Apache Spark ジョブの構成」を参照してください。
この記事では、Azure Synapse Analytics を利用した Apache Spark プールを、Azure Machine Learning パイプラインのデータ準備ステップのコンピューティング ターゲットとして使用する方法について説明します。 データの準備やトレーニングなどの特定のステップに適したコンピューティング リソースを 1 つのパイプラインで使用する方法について説明します。 データを Spark ステップ用に準備する方法と、次のステップに渡す方法についても説明します。
前提条件
すべてのパイプライン リソースをホストする Azure Machine Learning ワークスペース を作成する
開発環境を構成して Azure Machine Learning SDK をインストールするか、SDK が既にインストールされている Azure Machine Learning コンピューティング インスタンスを使用します
Azure Synapse Analytics ワークスペースと Apache Spark プールを作成します。 詳しくは、「クイック スタート: Synapse Studio を使用してサーバーレス Apache Spark プールを作成する」をご覧ください。
Azure Machine Learning ワークスペースと Azure Synapse Analytics ワークスペースをリンクする
Azure Synapse Analytics ワークスペースで Apache Spark プールを作成して管理します。 Apache Spark プールを Azure Machine Learning ワークスペースと統合するには、Azure Synapse Analytics ワークスペースにリンクする必要があります。 Azure Machine Learning ワークスペースと Azure Synapse Analytics ワークスペースをリンクしたら、次のものを使って Apache Spark プールをアタッチできます
Python SDK (後で説明します)
Azure Resource Manager (ARM) テンプレート。 詳しくは、ARM テンプレートの例をご覧ください
- コマンド ラインを使って ARM テンプレートに従い、リンク サービスを追加し、次のコード サンプルを使って Apache Spark プールをアタッチできます。
az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
重要
Synapse ワークスペースに正常にリンクするには、ユーザーが Synapse ワークスペースの所有者ロールを付与されている必要があります。 Azure portal でご自身のアクセス権を確認してください。
リンクされたサービスは、作成時にシステム割り当てマネージド ID (SAI) を取得します。 このリンク サービス SAI を Synapse Studio から "Synapse Apache Spark 管理者" ロールに割り当てて、Spark ジョブを送信できるようにする必要があります。 詳細については、「 Synapse Studio で Synapse RBAC ロールの割り当てを管理する方法 」を参照してください。
Azure Machine Learning ワークスペース ユーザーには、リソース管理 Azure portal からの "共同作成者" ロールも必要です。
Azure Synapse Analytics ワークスペースと Azure Machine Learning ワークスペースの間のリンクを取得する
次のコード サンプルは、ワークスペース内のリンクされたサービスを取得する方法を示しています。
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration
ws = Workspace.from_config()
for service in LinkedService.list(ws) :
print(f"Service: {service}")
# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')
このコード サンプルでは、Workspace.from_config()
config.json
ファイル内の構成を使用して Azure Machine Learning ワークスペースにアクセスします。 (詳しくは、ワークスペース構成ファイルの作成に関する記事をご覧ください)。 次に、このコードにより、ワークスペースで利用できるすべてのリンク サービスが出力されます。 最後に、LinkedService.get()
によって、'synapselink1'
という名前のリンクされたサービスが取得されます。
Azure Machine Learning のコンピューティング先として Apache Spark プールをアタッチする
Apache Spark プールを使用して機械学習パイプラインのステップを強化するには、次のコード サンプルに示すように、パイプライン ステップの ComputeTarget
としてアタッチする必要があります。
from azureml.core.compute import SynapseCompute, ComputeTarget
attach_config = SynapseCompute.attach_configuration(
linked_service = linked_service,
type="SynapseSpark",
pool_name="spark01") # This name comes from your Synapse workspace
synapse_compute=ComputeTarget.attach(
workspace=ws,
name='link1-spark01',
attach_configuration=attach_config)
synapse_compute.wait_for_completion()
コードは、最初に SynapseCompute
を構成します。
linked_service
引数は、前の手順で作成または取得した LinkedService
オブジェクトです。
type
引数は SynapseSpark
である必要があります。
pool_name
の SynapseCompute.attach_configuration()
引数は、Azure Synapse Analytics ワークスペースの既存のプールのそれと一致している必要があります。 Azure Synapse Analytics ワークスペースでの Apache Spark プールの作成について詳しくは、「クイック スタート: Synapse Studio を使用してサーバーレス Apache Spark プールを作成する」をご覧ください。
attach_config
の型は ComputeTargetAttachConfiguration
。
構成を作成したら、ComputeTarget
値とWorkspace
値、および機械学習ワークスペース内でコンピューティングを参照する名前を渡して、機械学習ComputeTargetAttachConfiguration
を作成します。
ComputeTarget.attach()
の呼び出しは非同期であるため、このサンプルは、呼び出しが完了するまでブロックされます。
リンクされた Apache Spark プールを使用する SynapseSparkStep
を作成する
サンプル ノートブックの Apache spark プールの Spark ジョブでは、単純な機械学習パイプラインが定義されています。 このノートブックでは、最初に、前のステップで定義した synapse_compute
を利用したデータ準備手順が定義されています。 次に、ノートブックでは、トレーニングにいっそう適したコンピューティング先を利用するトレーニング ステップが定義されます。 サンプル ノートブックでは、タイタニック号の生存者のデータベースを使って、データの入力と出力を示します。 実際にデータをクリーンアップしたり、予測モデルを作成したりすることはありません。 このサンプルではトレーニングが実際に行われることはないため、トレーニング ステップでは安価な CPU ベースのコンピューティング リソースを使います。
データは、DatasetConsumptionConfig
オブジェクトを通して機械学習パイプラインに送られます。このオブジェクトは表形式のデータまたはファイルのセットを保持できます。 データは、多くの場合、ワークスペースのデータストア内にある BLOB ストレージのファイルから取得されます。 次のコード サンプルは、機械学習パイプラインの入力を作成する一般的なコードを示しています。
from azureml.core import Dataset
datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")
# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()
このコード サンプルでは、Titanic.csv
ファイルが BLOB ストレージにあることが想定されています。 このコードでは、TabularDataset
と FileDataset
の両方としてファイルを読み取る方法を示します。 このコードは、入力を複製したり、単一のデータ ソースをテーブルを含むリソースとして解釈したり、厳密にファイルとして解釈したりすると混乱を招くため、デモンストレーションのみを目的としています。
重要
FileDataset
を入力として使うには、azureml-core
のバージョン 1.20.0
以降が必要です。 これは、後で説明するように、Environment
クラスで指定できます。 ステップが完了したら、次のコード サンプルで示すように、出力データを格納できます。
from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")
このコード サンプルの datastore
は、データを test
という名前のファイルに格納します。 データは、Machine Learning ワークスペース内で Dataset
という名前の registered_dataset
として使用できます。
パイプラインのステップには、データに加えて、ステップごとの Python の依存関係がある場合があります。 さらに、個々の SynapseSparkStep
オブジェクトでは、正確な Azure Synapse Apache Spark 構成を指定できます。 デモとして、次のコード サンプルでは、 azureml-core
パッケージのバージョンが少なくとも 1.20.0
する必要があることを指定しています。 前に説明したように、azureml-core
を入力として使うには、FileDataset
パッケージについてのこの要件が必要です。
from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")
step_1 = SynapseSparkStep(name = 'synapse-spark',
file = 'dataprep.py',
source_directory="./code",
inputs=[step1_input1, step1_input2],
outputs=[step1_output],
arguments = ["--tabular_input", step1_input1,
"--file_input", step1_input2,
"--output_dir", step1_output],
compute_target = 'link1-spark01',
driver_memory = "7g",
driver_cores = 4,
executor_memory = "7g",
executor_cores = 2,
num_executors = 1,
environment = env)
このコードでは、Azure Machine Learning パイプラインの 1 つのステップが指定されています。 このコードの environment
の値では特定の azureml-core
バージョンが設定され、必要に応じて他の conda または pip の依存関係をコードで追加できます。
SynapseSparkStep
は、ローカル コンピューターの ./code
サブディレクトリを ZIP に圧縮してアップロードします。 コンピューティング サーバー上にそのディレクトリが再作成され、ステップによってそのディレクトリから dataprep.py
スクリプトが実行されます。 そのステップの inputs
と outputs
は、前に説明した step1_input1
、step1_input2
、step1_output
オブジェクトです。
dataprep.py
スクリプト内でこれらの値にアクセスする最も簡単な方法は、これらを arguments
という名前に関連付けることです。
SynapseSparkStep
コンストラクターに対する次の引数セットは、Apache Spark を制御します。
compute_target
は、以前にコンピューティング ターゲットとしてアタッチした'link1-spark01'
リソースです。 その他のパラメーターでは、使用するメモリとコアが指定されます。
サンプル ノートブックでは、dataprep.py
に対して次のコードを使います。
import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset
print(azureml.core.VERSION)
print(os.environ)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()
# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()
# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()
sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)
この "データ準備" スクリプトは実際のデータ変換を行いませんが、データを取得し、データを Spark データフレームに変換する方法と、基本的な Apache Spark 操作を行う方法を示しています。 Azure Machine Learning スタジオで出力を確認するには、次のスクリーンショットで示すように、子ジョブを開き、[出力とログ] タブを選んで、logs/azureml/driver/stdout
ファイルを開きます。
パイプラインで SynapseSparkStep
を使用する
次の例では、SynapseSparkStep
で作成された の出力を使っています。 パイプライン内の他のステップには、それら独自の環境があり、これらは手持ちのタスクに適したさまざまなコンピューティング リソース上で実行される場合があります。 サンプル ノートブックでは、小規模な CPU クラスターで "トレーニング ステップ" が実行されます。
from azureml.core.compute import AmlCompute
cpu_cluster_name = "cpucluster"
if cpu_cluster_name in ws.compute_targets:
cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
print('Found existing cluster, use it.')
else:
compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
print('Allocating new CPU compute cluster')
cpu_cluster.wait_for_completion(show_output=True)
step2_input = step1_output.as_input("step2_input").as_download()
step_2 = PythonScriptStep(script_name="train.py",
arguments=[step2_input],
inputs=[step2_input],
compute_target=cpu_cluster_name,
source_directory="./code",
allow_reuse=False)
このコードでは、必要に応じて新しいコンピューティング リソースが作成されます。 その後、step1_output
の結果がトレーニング ステップの入力に変換されます。
as_download()
オプションは、データがコンピューティング リソースに移動されるため、アクセスが高速になることを意味します。 データが大きすぎてローカル コンピューティングのハード ドライブに収まらない場合は、as_mount()
オプションを使って、FUSE
ファイル システムでデータをストリーミングする必要があります。 この 2 番目のステップの compute_target
は 'cpucluster'
で、データ準備ステップで使用した 'link1-spark01'
リソースではありません。 このステップでは、前のステップで使った train.py
スクリプトの代わりに、簡単な dataprep.py
スクリプトを使います。 サンプル ノートブックには、詳細な train.py
スクリプトが含まれています。
すべてのステップを定義した後は、パイプラインを作成して実行できます。
from azureml.pipeline.core import Pipeline
pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)
このコードでは、Azure Synapse Analytics (step_1
) を利用する Apache Spark プール上のデータ準備ステップと、トレーニング ステップ (step_2
) で構成されるパイプラインが作成されます。 Azure は、ステップ間のデータの依存関係を調べて、実行グラフを計算します。 この場合は、単純な依存関係が 1 つだけあります。 ここでは、step2_input
には step1_output
が必ず必要です。
pipeline.submit
呼び出しは、必要に応じて、synapse-pipeline
という名前の実験を作成し、その中でジョブを非同期的に開始します。 パイプライン内の個々のステップは、このメイン ジョブの子ジョブとして実行され、Studio の [実験] ページでそれらのステップを監視および確認できます。