使用 Azure Batch 在 Azure 中高效运行大规模并行和高性能计算 (HPC) 批处理作业。 本教程逐步讲解如何使用 Batch 运行并行工作负荷的 Python 示例。 你可以学习常用的 Batch 应用程序工作流,以及如何以编程方式与 Batch 和存储资源交互。
- 通过 Batch 和存储帐户进行身份验证。
- 将输入文件上传到存储。
- 创建运行应用程序所需的计算节点池。
- 创建用于处理输入文件的作业和任务。
- 监视任务执行情况。
- 检索输出文件。
在本教程中,你会使用 ffmpeg 开放源代码工具将 MP4 媒体文件并行转换为 MP3 格式。
如果没有 Azure 帐户,请在开始前创建一个免费帐户。
先决条件
登录到 Azure
登录到 Azure 门户。
获取帐户凭据
就此示例来说,需为 Batch 帐户和存储帐户提供凭据。 若要获取所需凭据,一种直接的方法是使用 Azure 门户。 (也可使用 Azure API 或命令行工具来获取这些凭据。)
选择所有服务,>批处理帐户,然后选择您的批处理帐户的名称。
若要查看 Batch 凭据,请选择“密钥”。 将“Batch 帐户”、“URL”和“主访问密钥”的值复制到文本编辑器。
若要查看存储帐户名称和密钥,请选择“存储帐户”。 将“存储帐户名称”和“Key1”的值复制到文本编辑器。
下载并运行示例应用
下载示例应用
从 GitHub 下载或克隆示例应用。 若要使用 Git 客户端克隆示例应用存储库,请使用以下命令:
git clone https://github.com/Azure-Samples/batch-python-ffmpeg-tutorial.git
导航到包含文件 batch_python_tutorial_ffmpeg.py的目录。
在 Python 环境中,使用 pip
安装所需的包。
pip install -r requirements.txt
使用代码编辑器打开文件 config.py。 使用特定于帐户的值更新 Batch 帐户和存储帐户凭据字符串。 例如:
_BATCH_ACCOUNT_NAME = 'yourbatchaccount'
_BATCH_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxE+yXrRvJAqT9BlXwwo1CwF+SwAYOxxxxxxxxxxxxxxxx43pXi/gdiATkvbpLRl3x14pcEQ=='
_BATCH_ACCOUNT_URL = 'https://yourbatchaccount.yourbatchregion.batch.azure.com'
_STORAGE_ACCOUNT_NAME = 'mystorageaccount'
_STORAGE_ACCOUNT_KEY = 'xxxxxxxxxxxxxxxxy4/xxxxxxxxxxxxxxxxfwpbIC5aAWA8wDu+AFXZB827Mt9lybZB1nUcQbQiUrkPtilK5BQ=='
运行应用
若要运行该脚本,请执行以下操作:
python batch_python_tutorial_ffmpeg.py
运行示例应用程序时,控制台输出如下所示。 在执行期间启动池的计算节点时,会遇到暂停并看到Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
。
Sample start: 11/28/2018 3:20:21 PM
Container [input] created.
Container [output] created.
Uploading file LowPriVMs-1.mp4 to container [input]...
Uploading file LowPriVMs-2.mp4 to container [input]...
Uploading file LowPriVMs-3.mp4 to container [input]...
Uploading file LowPriVMs-4.mp4 to container [input]...
Uploading file LowPriVMs-5.mp4 to container [input]...
Creating pool [LinuxFFmpegPool]...
Creating job [LinuxFFmpegJob]...
Adding 5 tasks to job [LinuxFFmpegJob]...
Monitoring all tasks for 'Completed' state, timeout in 00:30:00...
Success! All tasks completed successfully within the specified timeout period.
Deleting container [input]....
Sample end: 11/28/2018 3:29:36 PM
Elapsed time: 00:09:14.3418742
访问 Azure 门户中的 Batch 帐户,以监视池、计算节点、作业和任务。 例如,若要查看池中计算节点的热度映射,请选择 “>池 LinuxFFmpegPool”。
任务正在运行时,热度地图如下所示:
在应用程序的默认配置中运行应用程序时,典型的执行时间大约为 5 分钟 。 池创建过程需要最多时间。
检索输出文件
可以使用 Azure 门户下载 ffmpeg 任务生成的输出 MP3 文件。
- 单击“所有服务”>“存储帐户”,然后单击存储帐户的名称。
- 单击“Blob”>“输出”。
- 右键单击一个输出 MP3 文件,然后单击“下载”。 在浏览器中按提示打开或保存该文件。
也可以编程方式从计算节点或存储容器下载这些文件(但在本示例中未演示)。
查看代码
以下部分将示例应用程序细分为多个执行步骤,用于处理 Batch 服务中的工作负荷。 阅读本文的其余部分时,请参阅 Python 代码,因为示例中没有讨论每一行代码。
对 Blob 和 Batch 客户端进行身份验证
为了与存储帐户交互,应用使用 azure-storage-blob 包创建 BlockBlobService 对象。
blob_client = azureblob.BlockBlobService(
account_name=_STORAGE_ACCOUNT_NAME,
account_key=_STORAGE_ACCOUNT_KEY)
该应用创建一个 BatchServiceClient 对象,用于在 Batch 服务中创建和管理池、作业和任务。 示例中的 Batch 客户端使用共享密钥身份验证。 Batch 还支持通过 Microsoft Entra ID 进行身份验证,以对单个用户或无人参与的应用程序进行身份验证。
credentials = batchauth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
_BATCH_ACCOUNT_KEY)
batch_client = batch.BatchServiceClient(
credentials,
base_url=_BATCH_ACCOUNT_URL)
上传输入文件
应用使用 blob_client
引用为输入 MP4 文件创建存储容器,并为任务输出创建容器。 然后,它会调用函数 upload_file_to_container
,将本地 InputFiles 目录中的 MP4 文件上传到容器。 存储中的文件被定义为 Batch ResourceFile 对象,Batch 之后可以将这些对象下载到计算节点。
blob_client.create_container(input_container_name, fail_on_exist=False)
blob_client.create_container(output_container_name, fail_on_exist=False)
input_file_paths = []
for folder, subs, files in os.walk(os.path.join(sys.path[0], './InputFiles/')):
for filename in files:
if filename.endswith(".mp4"):
input_file_paths.append(os.path.abspath(
os.path.join(folder, filename)))
# Upload the input files. This is the collection of files that are to be processed by the tasks.
input_files = [
upload_file_to_container(blob_client, input_container_name, file_path)
for file_path in input_file_paths]
创建计算节点池
然后,该示例会调用 create_pool
以在 Batch 帐户中创建计算节点池。 此定义的函数使用 Batch PoolAddParameter 类设置节点数、VM 大小和池配置。 在此,VirtualMachineConfiguration 对象指定了与在 Azure 市场发布的 Ubuntu Server 20.04 LTS 映像相关的 ImageReference。 Batch 支持 Azure 市场中的各种 VM 映像以及自定义 VM 映像。
节点数和 VM 大小使用定义的常数进行设置。 Batch 支持专用节点和现成节点。可以在池中使用这其中的一种,或者两种都使用。 专用节点为池保留。 现成节点在 Azure 有剩余 VM 容量时以优惠价提供。 如果 Azure 没有足够的容量,现成节点将不可用。 在默认情况下,该示例创建一个池,池中仅包含五个 Standard_A1_v2 Spot 节点。
除了物理节点属性,此池配置还包括 StartTask 对象。 StartTask 在每个节点加入池时及每次节点重启时执行。 在此示例中,StartTask 运行 Bash shell 命令,以在节点上安装 ffmpeg 包和依赖项。
pool.add 方法将池提交到 Batch 服务。
new_pool = batch.models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
image_reference=batchmodels.ImageReference(
publisher="Canonical",
offer="UbuntuServer",
sku="20.04-LTS",
version="latest"
),
node_agent_sku_id="batch.node.ubuntu 20.04"),
vm_size=_POOL_VM_SIZE,
target_dedicated_nodes=_DEDICATED_POOL_NODE_COUNT,
target_low_priority_nodes=_LOW_PRIORITY_POOL_NODE_COUNT,
start_task=batchmodels.StartTask(
command_line="/bin/bash -c \"apt-get update && apt-get install -y ffmpeg\"",
wait_for_success=True,
user_identity=batchmodels.UserIdentity(
auto_user=batchmodels.AutoUserSpecification(
scope=batchmodels.AutoUserScope.pool,
elevation_level=batchmodels.ElevationLevel.admin)),
)
)
batch_service_client.pool.add(new_pool)
创建职位
Batch 作业可指定在其中运行任务的池以及可选设置,例如工作的优先级和计划。 此示例通过调用 create_job
创建一个作业。 这个已定义的函数使用 JobAddParameter 类在您的池中创建作业。 job.add 方法将池提交到 Batch 服务。 作业一开始没有任务。
job = batch.models.JobAddParameter(
id=job_id,
pool_info=batch.models.PoolInformation(pool_id=pool_id))
batch_service_client.job.add(job)
创建任务
应用通过调用add_tasks
在作业中创建任务。 此定义的函数使用 TaskAddParameter 类创建任务对象列表。 每个任务都运行 ffmpeg,使用 command_line
参数来处理输入 resource_files
对象。 ffmpeg 此前已在创建池时安装在每个节点上。 在这里,命令行运行 ffmpeg 将每个输入 MP4(视频)文件转换为 MP3(音频)文件。
此示例在运行命令行后为 MP3 文件创建 OutputFile 对象。 每个任务的输出文件(在本例中为)都使用任务 output_files
的属性上传到链接存储帐户中的容器。
然后,应用使用 task.add_collection 方法将任务添加到作业,该方法将任务排成队列以在计算节点上运行。
tasks = list()
for idx, input_file in enumerate(input_files):
input_file_path = input_file.file_path
output_file_path = "".join((input_file_path).split('.')[:-1]) + '.mp3'
command = "/bin/bash -c \"ffmpeg -i {} {} \"".format(
input_file_path, output_file_path)
tasks.append(batch.models.TaskAddParameter(
id='Task{}'.format(idx),
command_line=command,
resource_files=[input_file],
output_files=[batchmodels.OutputFile(
file_pattern=output_file_path,
destination=batchmodels.OutputFileDestination(
container=batchmodels.OutputFileBlobContainerDestination(
container_url=output_container_sas_url)),
upload_options=batchmodels.OutputFileUploadOptions(
upload_condition=batchmodels.OutputFileUploadCondition.task_success))]
)
)
batch_service_client.task.add_collection(job_id, tasks)
监视任务
将任务添加到作业时,Batch 会自动排队并计划这些任务,以便在关联的池中的计算节点上执行。 根据指定的设置,Batch 处理所有任务队列、计划、重试和其他任务管理职责。
监视任务执行的方法有很多。 wait_for_tasks_to_complete
在此示例中,该函数利用 TaskState 对象来监测任务是否在规定时间内达到某个状态(此处为已完成状态)。
while datetime.datetime.now() < timeout_expiration:
print('.', end='')
sys.stdout.flush()
tasks = batch_service_client.task.list(job_id)
incomplete_tasks = [task for task in tasks if
task.state != batchmodels.TaskState.completed]
if not incomplete_tasks:
print()
return True
else:
time.sleep(1)
...
清理资源
运行任务之后,应用自动删除所创建的输入存储容器,并允许你选择是否删除 Batch 池和作业。 BatchClient 的 JobOperations 和 PoolOperations 类都具有删除方法,如果确认删除,则会调用该方法。 虽然作业和任务本身不收费,但计算节点是要收费的。 因此,建议只在需要的时候分配池。 删除池时会删除节点上的所有任务输出。 但是,输入和输出文件保留在存储帐户中。
若不再需要资源组、Batch 帐户和存储帐户,请将其删除。 若要在 Azure 门户中执行此作,请选择 Batch 帐户的资源组,然后选择 “删除资源组”。
后续步骤
在本教程中,你将学习到如何:
- 通过 Batch 和存储帐户进行身份验证。
- 将输入文件上传到存储。
- 创建运行应用程序所需的计算节点池。
- 创建用于处理输入文件的作业和任务。
- 监视任务执行情况。
- 检索输出文件。
有关使用 Python API 计划和处理 Batch 工作负载的更多示例,请参阅 GitHub 上的 Batch Python 示例 。