次の方法で共有


Azure Machine Learning の Apache Spark ジョブの構成

適用対象:Azure CLI ml extension v2 (現行)Python SDK azure-ai-ml v2 (現行)

Azure Machine Learning 統合と Azure Synapse Analytics を併用すると、Azure Synapse を利用する分散コンピューティング機能に簡単にアクセスできます。これにより、Azure Machine Learning 上の Apache Spark ジョブをスケーリングすることができます。

この記事では、Azure Machine Learning サーバーレス Spark コンピューティング、Azure Data Lake Storage (ADLS) Gen 2 ストレージ アカウント、ユーザー ID パススルーを使って Spark ジョブをいくつかの簡単な手順で送信する方法について説明します。

Azure Machine Learning での Apache Spark の概念の詳細については、こちらのリソースを参照してください。

前提条件

適用対象:Azure CLI ml 拡張機能 v2 (現行)

Azure ストレージ アカウントにロールの割り当てを追加する

Apache Spark ジョブを送信する前に、入力と出力のデータ パスにアクセスできることを確認する必要があります。 ログイン ユーザーのユーザー ID に共同作成者ストレージ BLOB データ共同作成者のロールを割り当て、読み取りと書き込みのアクセスを有効にします。

ユーザー ID に適切なロールを割り当てるには:

  1. Microsoft Azure portalを開きます。

  2. ストレージ アカウント サービスを検索して選択します。

    Microsoft Azure portal のストレージ アカウント サービスの検索と選択を示すスクリーンショット (拡大可)。

  3. [ストレージ アカウント] ページで、一覧から Azure Data Lake Storage (ADLS) Gen 2 ストレージ アカウントを選択します。 ストレージ アカウントの概要を示すページが開きます。

    Azure Data Lake Storage (ADLS) Gen 2 ストレージ アカウントストレージ アカウントの選択を示すスクリーンショット (拡大可)。

  4. 左ペインから [アクセス制御 (IAM)] を選択します。

  5. [ロールの割り当ての追加] を選択します。

    Azure アクセス キー画面を示す展開可能なスクリーンショット。

  6. ストレージ BLOB データ共同作成者ロールを検索します。

  7. [ストレージ BLOB データ共同作成者] ロールを選択します。

  8. [次へ] を選択します。

    Azure の [ロール割り当ての追加] 画面を示す展開可能なスクリーンショット。

  9. ユーザー、グループ、またはサービス プリンシパル を選択します。

  10. [+ メンバーの選択] を選択します。

  11. [選択] の下のテキスト ボックスで、ユーザー ID を検索します。

  12. [選択したメンバー] の下に表示されるように、リストからユーザー ID を選びます。

  13. 適切なユーザー ID を選びます。

  14. [次へ] を選択します。

    Azure の [ロールの割り当ての追加] 画面の [メンバー] タブを示す展開可能なスクリーンショット。

  15. [レビューと割り当て] を選択します。

    Azure の [ロールの割り当ての追加] 画面で [確認と割り当て] タブを示す展開可能なスクリーンショット。

  16. 共同作成者ロールの割り当てについて手順 2 から 13 を繰り返します。

ユーザー ID に適切なロールが割り当てられると、Azure Data Lake Storage (ADLS) Gen 2 ストレージ アカウント内のデータにアクセスできるようになります。

パラメーター化された Python コードを作成する

Spark ジョブには、引数を受け取る Python スクリプトが必要です。 このスクリプトをビルドするために、対話型データ ラングリングから開発された Python コードを変更できます。 サンプルの Python スクリプトを次に示します。

# titanic.py
import argparse
from operator import add
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

parser = argparse.ArgumentParser()
parser.add_argument("--titanic_data")
parser.add_argument("--wrangled_data")

args = parser.parse_args()
print(args.wrangled_data)
print(args.titanic_data)

df = pd.read_csv(args.titanic_data, index_col="PassengerId")
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(args.wrangled_data, index_col="PassengerId")

  • この Python コード サンプルでは、pyspark.pandas を使用します。これをサポートしているのは Spark ランタイム バージョン 3.2 のみです。
  • titanic.py ファイルが src という名前のフォルダーにアップロードされていることを確認してください。 src フォルダーは、Python スクリプト/ノートブック、またはスタンドアロン Spark ジョブを定義する YAML 仕様ファイルを作成したのと同じディレクトリにある必要があります。

そのスクリプトには --titanic_data--wrangled_data の次の 2 つの引数があります。 これらの引数は、それぞれ入力データ パスと出力フォルダーを渡します。 スクリプトでは titanic.csv ファイルを使用します (こちらで入手可能)。 このファイルを、Azure Data Lake Storage (ADLS) Gen 2 ストレージ アカウントで作成されたコンテナーにアップロードします。

スタンドアロン Spark ジョブを送信する

適用対象:Azure CLI ml 拡張機能 v2 (現行)

ヒント

Spark ジョブは以下から送信できます。

この YAML 仕様の例では、スタンドアロンの Spark ジョブを示します。 Azure Machine Learning サーバーレス Spark コンピューティング、ユーザー ID パススルー、abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> という形式の入力および出力データの URI が使われます。 ここでは、<FILE_SYSTEM_NAME> はコンテナー名と一致します。

$schema: http://azureml/sdk-2-0/SparkJob.json
type: spark

code: ./src 
entry:
  file: titanic.py

conf:
  spark.driver.cores: 1
  spark.driver.memory: 2g
  spark.executor.cores: 2
  spark.executor.memory: 2g
  spark.executor.instances: 2

inputs:
  titanic_data:
    type: uri_file
    path: abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv
    mode: direct

outputs:
  wrangled_data:
    type: uri_folder
    path: abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled/
    mode: direct

args: >-
  --titanic_data ${{inputs.titanic_data}}
  --wrangled_data ${{outputs.wrangled_data}}

identity:
  type: user_identity

resources:
  instance_type: standard_e4s_v3
  runtime_version: "3.2"

上の YAML 仕様ファイルの内容:

  • code プロパティには、パラメーター化された titanic.py ファイルを含むフォルダーの相対パスを定義します。
  • resource プロパティは、サーバーレス Spark コンピューティングで使用される instance_type と Apache Spark runtime_version の値を定義します。 現在サポートされているインスタンスの種類の値は次のとおりです。
    • standard_e4s_v3
    • standard_e8s_v3
    • standard_e16s_v3
    • standard_e32s_v3
    • standard_e64s_v3

この YAML ファイルを az ml job create コマンドの --file パラメーターで指定して、次のようにスタンドアロン Spark ジョブを作成できます。

az ml job create --file <YAML_SPECIFICATION_FILE_NAME>.yaml --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --workspace-name <AML_WORKSPACE_NAME>

ヒント

Azure Synapse ワークスペースに既存の Synapse Spark プールが存在する場合があります。 既存の Synapse Spark プールを使う場合は、Azure Machine Learning ワークスペース内の Synapse Spark プールをアタッチする手順に従ってください。

次のステップ