你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

CLI (v2) 并行作业 YAML 架构

适用于:Azure CLI ml 扩展 v2(当前)

重要

并行作业只能用作 Azure 机器学习管道作业中的单个步骤。 因此,目前没有并行作业的源 JSON 架构。 本文档列出了在管道中创建并行作业时的有效键及其值。

注意

本文档中详细介绍的 YAML 语法基于最新版本的 ML CLI v2 扩展的 JSON 架构。 此语法必定仅适用于最新版本的 ML CLI v2 扩展。 可以在 https://azuremlschemasprod.azureedge.net/ 上查找早期扩展版本的架构。

YAML 语法

密钥 类型 说明 允许的值 默认值
type 常量 必需。 作业类型。 parallel
inputs 物体 并行作业的输入字典。 键是作业上下文中的输入名称,值是输入值。

可以在 program_arguments 中使用 ${{ inputs.<input_name> }} 表达式引用输入。

可以通过使用 ${{ parent.inputs.<input_name> }} 表达式进行管道输入来引用并行作业输入。 若要了解如何将并行步骤的输入绑定到管道输入,请参阅用于在管道作业的步骤之间绑定输入和输出的表达式语法
inputs.<input_name> 数字、整数、布尔值、字符串或对象 文字值(数字、整数、布尔值或字符串类型)或包含作业输入数据规范的对象之一。
outputs 物体 并行作业的输出配置字典。 键是作业上下文中的输出名称,值是输出配置。

可以使用 ${{ parents.outputs.<output_name> }} 表达式的管道输出来引用并行作业输出。 若要了解如何将并行步骤的输出绑定到管道输出,请参阅用于在管道作业的步骤之间绑定输入和输出的表达式语法
outputs.<output_name> 物体 可以将对象留空,在这种情况下,默认情况下,输出将是 uri_folder 类型,Azure 机器学习将根据以下模板化路径为输出系统生成输出位置:{settings.datastore}/azureml/{job-name}/{output-name}/。 将通过读写挂载将文件写入输出目录。 如果要为输出指定不同的模式,请提供一个包含作业输出规范的对象。
compute 字符串 要在其上执行作业的计算目标的名称。 该值是对工作区中现有计算的引用(使用 azureml:<compute_name> 语法),也可以是对 local 的引用,以指定本地执行。

在管道中使用并行作业时,可以将此设置留空,在这种情况下,管道的 default_compute 会自动选择计算。
local
task 物体 必需。 用于定义并行作业的分布式任务的模板。 请参阅 task 键的属性
input_data 物体 必需。 定义哪些输入数据将拆分为较小批次以运行并行作业。 仅适用于使用 inputs 表达式引用并行作业 ${{ inputs.<input_name> }} 之一
mini_batch_size 字符串 定义每个微批的大小以拆分输入。

如果 input_data 是一个文件夹或一组文件,则此数字定义每个小批次的“文件计数”。 例如 10、100。
如果 input_data 为 mltable 的表格数据,则此数字定义每一个小批次大约的物理大小。 例如,100 kb、100 mb。
1
partition_keys 列表 用于将数据集分区为小型批处理的键。

如果指定此参数,则具有相同键的数据将分区到相同的小型批处理中。 如果同时指定了 partition_keysmini_batch_size,则分区键将生效。
mini_batch_error_threshold 整数 定义可在此并行作业中忽略的失败微批数量。 如果失败的微批计数高于此阈值,则并行作业将标记为失败。

如果出现以下情况,微批将标记为失败:
- run() 的返回计数小于微批输入计数。
- 在自定义 run() 代码中捕获异常。

“-1”是默认数字,表示在执行并行作业期间忽略所有失败的微批。
[-1, int.max] -1
logging_level 字符串 定义要转储到用户日志文件的日志级别。 INFO、WARNING、DEBUG 信息
resources.instance_count 整数 用于作业的节点数。 1
max_concurrency_per_instance 整数 定义每个计算节点上的进程数。

对于 GPU 计算,默认值为 1。
对于 CPU 计算,默认值为核心数。
retry_settings.max_retries 整数 定义微批失败或超时时的重试次数。 如果所有重试都失败,则微批将标记为无法通过 mini_batch_error_threshold 计算来统计。 2
retry_settings.timeout 整数 定义执行自定义 run() 函数的超时(以秒为单位)。 如果执行时间高于此阈值,则微批将会中止,并标记为微批失败以触发重试。 (0, 259200] 六十
environment_variables 物体 要在执行命令的进程上设置的环境变量键/值对的字典。

task 键的属性

密钥 类型 说明 允许的值 默认值
type 常量 必需。 任务的类型。 目前仅适用于 run_function

run_function 模式下,需要提供 codeentry_scriptprogram_arguments 以定义具有可执行函数和参数的 Python 脚本。 注意:并行作业仅支持此模式下的 Python 脚本。
run_function run_function
code 字符串 要上传并用于作业的源代码目录的本地路径。
entry_script 字符串 包含预定义并行函数的实现的 Python 文件。 有关详细信息,请参阅准备并行作业的入口脚本
environment 字符串或对象 必填 用于运行任务的环境。 此值可以是对工作区中现有版本受控环境的引用,也可以是对内联环境规范的引用。

若要引用现有环境,请使用 azureml:<environment_name>:<environment_version> 语法或 azureml:<environment_name>@latest(引用环境的最新版本)。

若要定义内联环境,请遵循环境架构。 排除 nameversion 属性,因为内联环境不支持这些属性。
program_arguments 字符串 要传递到入口脚本的参数。 可能包含对输入和输出的“--<arg_name> ${{inputs.<intput_name>}}”引用。

并行作业提供预定义参数列表,用于设置并行运行的配置。 有关详细信息,请参阅并行作业的预定义参数
append_row_to 字符串 聚合每次运行微批后的所有返回值并将其输出到此文件中。 可以使用表达式 ${{outputs.<output_name>}} 引用并行作业的输出之一

作业输入

密钥 类型 说明 允许的值 默认值
type 字符串 作业输入的类型。 为指向具有 mltable 元文件的位置的输入数据指定 mltable,或为指向文件夹源的输入数据指定 uri_folder %> uri_folder
path 字符串 用作输入的数据的路径。 可以通过几种方式指定该值:

- 数据源文件或文件夹的本地路径,例如 path: ./iris.csv。 数据将在作业提交期间上传。

- 要用作输入的文件或文件夹的云路径的 URI。 支持的 URI 类型为 azuremlhttpswasbsabfssadl。 有关详细信息,请参阅核心 yaml 语法,了解如何使用 azureml:// URI 格式。

- 要用作输入的现有已注册的 Azure 机器学习数据资产。 若要引用已注册的数据资产,请使用 azureml:<data_name>:<data_version> 语法或 azureml:<data_name>@latest(用于引用数据资产的最新版本),例如 path: azureml:cifar10-data:1path: azureml:cifar10-data@latest
mode 字符串 将数据传送到计算目标的模式。

对于只读装载 (ro_mount),数据将用作装载路径。 文件夹将装载为文件夹,文件将装载为文件。 Azure 机器学习会将输入解析为装载路径。

对于 download 模式,数据将下载到计算目标。 Azure 机器学习会将输入解析为下载的路径。

如果你只想要数据工件的存储位置的 URL 而不是挂载或下载数据本身,则可以使用 direct 模式。 这将传入存储位置的 URL 作为作业输入。 在这种情况下,你全权负责处理凭证以访问存储。
.- . ro_mount

作业输出

密钥 类型 说明 允许的值 默认值
type 字符串 作业输出的类型。 对于默认的 uri_folder 类型,输出将对应一个文件夹。 uri_folder uri_folder
mode 字符串 输出文件传送到目标存储的模式。 对于读写装载模式 (rw_mount),输出目录是装载的目录。 对于上传模式,写入的文件将在作业结束时上传。 %> rw_mount

并行作业的预定义参数

密钥 说明 允许的值 默认值
--error_threshold “失败项”的阈值。 失败的项是按照每个微批中的输入和返回数量差距统计的。 如果失败的项数之和高于此阈值,则并行作业将标记为失败。

注意:“-1”是默认数字,表示在执行并行作业期间忽略所有失败。
[-1, int.max] -1
--allowed_failed_percent mini_batch_error_threshold 类似,但使用失败的微批数百分比而不是计数。 [0, 100] 100
--task_overhead_timeout 初始化每个微批所要遵循的超时(以秒为单位)。 例如,加载微批数据并将其传递给 run() 函数。 (0, 259200] 30
--progress_update_timeout 监视微批执行进度所要遵循的超时(以秒为单位)。 如果在此超时设置内未收到进度更新,则并行作业将标记为失败。 (0, 259200] 由其他设置动态计算。
--first_task_creation_timeout 监视从启动作业到运行第一个微批的间隔时间所要遵循的超时(以秒为单位)。 (0, 259200] 600
--copy_logs_to_parent 指示是否将作业进度、概述和日志复制到父管道作业的布尔选项。 True、False 错误
--metrics_name_prefix 提供此并行作业中指标的自定义前缀。
--push_metrics_to_parent 布尔选项,用于选择是否将指标推送到父管道作业。 True、False 错误
--resource_monitor_interval 将节点资源使用情况(例如 CPU、内存)转储到“logs/sys/perf”路径下的 log 文件夹的时间间隔(以秒为单位)。

注意:频繁转储资源日志会略微减慢微批的执行速度。 将此值设置为“0”可停止转储资源使用情况。
[0, int.max] 600

注解

az ml job 命令可用于管理 Azure 机器学习作业。

示例

示例 GitHub 存储库中提供了示例。 下面显示了几个示例。

YAML:在管道中使用并行作业

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
  tag: tagvalue
  owner: sdkteam

settings:
  default_compute: azureml:cpu-cluster

jobs:
  batch_prediction:
    type: parallel
    compute: azureml:cpu-cluster
    inputs:
      input_data: 
        type: mltable
        path: ./neural-iris-mltable
        mode: direct
      score_model: 
        type: uri_folder
        path: ./iris-model
        mode: download
    outputs:
      job_output_file:
        type: uri_file
        mode: rw_mount

    input_data: ${{inputs.input_data}}
    mini_batch_size: "10kb"
    resources:
        instance_count: 2
    max_concurrency_per_instance: 2

    logging_level: "DEBUG"
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 2
      timeout: 60

    task:
      type: run_function
      code: "./script"
      entry_script: iris_prediction.py
      environment:
        name: "prs-env"
        version: 1
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
        conda_file: ./environment/environment_parallel.yml
      program_arguments: >-
        --model ${{inputs.score_model}}
        --error_threshold 5
        --allowed_failed_percent 30
        --task_overhead_timeout 1200
        --progress_update_timeout 600
        --first_task_creation_timeout 600
        --copy_logs_to_parent True
        --resource_monitor_interva 20
      append_row_to: ${{outputs.job_output_file}}

后续步骤