教程:使用 Batch Explorer、存储资源管理器 和 Python 通过数据工厂运行 Batch 作业

本教程将指导你创建和运行运行 Azure Batch 工作负载的 Azure 数据工厂管道。 Python 脚本在 Batch 节点上运行,以从 Azure Blob 存储容器获取逗号分隔值 (CSV) 输入,操作数据并将输出写入不同的存储容器。 你将使用 Batch Explorer 创建 Batch 池和节点,并使用 Azure 存储资源管理器来处理存储容器和文件。

本教程介绍如何执行下列操作:

  • 使用 Batch Explorer 创建 Batch 池和节点。
  • 使用存储资源管理器创建存储容器并上传输入文件。
  • 开发 Python 脚本以操作输入数据并生成输出。
  • 创建运行 Batch 工作负载的数据工厂管道。
  • 使用 Batch Explorer 查看输出日志文件。

先决条件

使用 Batch Explorer 创建 Batch 池和节点

使用 Batch Explorer 创建一个计算节点池来运行工作负载。

  1. 使用 Azure 凭据登录到 Batch Explorer。

  2. 选择你的 Azure Batch 帐户。

  3. 在左侧边栏中选择“池”,然后选择 + 图标以添加池

    在 Batch Explorer 中创建池的屏幕截图。

  4. 如下所示填写“将池添加到帐户”窗体

    • 在“ID”下输入 custom-activity-pool
    • 在“专用节点”下输入 2
    • 对于“选择操作系统配置”,请选择“数据科学”选项卡,然后选择“Dsvm Win 2019”
    • 对于“选择虚拟机”大小,请选择“Standard_F2s_v2”
    • 对于“启动任务”,请选择“添加启动任务”。 在“启动任务”屏幕上的“命令行”下输入 cmd /c "pip install azure-storage-blob pandas",然后选择“选择”。 此命令会在每个节点启动时安装 azure-storage-blob 包。
  5. 选择“保存并关闭”

使用存储资源管理器创建 Blob 容器

使用存储资源管理器创建 Blob 容器来存储输入和输出文件,然后上传输入文件。

  1. 使用 Azure 凭据登录到存储资源管理器。
  2. 在左侧边栏中,找到并展开链接到你的 Batch 帐户的存储帐户。
  3. 右键单击“Blob 容器”,然后选择“创建 Blob 容器”;或者在边栏底部的“操作”中选择“创建 Blob 容器”
  4. 在输入字段中输入 input
  5. 创建名为 output 的另一个 Blob 容器
  6. 选择“input”容器,然后在右侧窗格中选择“上传”>“上传文件”
  7. 在“上传文件”屏幕上的“选定文件”下,选择输入字段旁边的省略号“...”
  8. 浏览到 iris.csv 文件的下载位置,选择“打开”,然后选择“上传”

存储资源管理器的屏幕截图,其中显示了在存储帐户中创建的容器和 Blob。

开发 Python 脚本

以下 Python 脚本从存储资源管理器 input 容器加载 iris.csv 数据集文件,处理数据,并将结果保存到 output 容器

该脚本需要使用链接到 Batch 帐户的 Azure 存储帐户的连接字符串。 若要获取连接字符串,请执行以下操作:

  1. Azure 门户中,搜索并选择已链接到你的 Batch 帐户的存储帐户的名称。
  2. 在存储帐户的页面上,在左侧导航栏中的“安全 + 网络”下选择“访问密钥”
  3. 在“key1”下,选择“连接字符串”旁边的“显示”,然后选择“复制”图标以复制连接字符串

将连接字符串粘贴到以下脚本中(需替换其中的 <storage-account-connection-string> 占位符)。 将脚本另存为名为 main.py 的文件

重要说明

在实际使用时,不建议在应用源代码中公开帐户密钥。 正确的做法是限制对这些凭据的访问权限,通过使用变量或配置文件来在代码中引用它们。 最好是将 Batch 和存储帐户密钥存储在 Azure 密钥保管库中。

# Load libraries
# from azure.storage.blob import BlobClient
from azure.storage.blob import BlobServiceClient
import pandas as pd
import io

# Define parameters
connectionString = "<storage-account-connection-string>"
containerName = "output"
outputBlobName	= "iris_setosa.csv"

# Establish connection with the blob storage account
blob = BlobClient.from_connection_string(conn_str=connectionString, container_name=containerName, blob_name=outputBlobName)

# Initialize the BlobServiceClient (This initializes a connection to the Azure Blob Storage, downloads the content of the 'iris.csv' file, and then loads it into a Pandas DataFrame for further processing.)
blob_service_client = BlobServiceClient.from_connection_string(conn_str=connectionString)
blob_client = blob_service_client.get_blob_client(container_name=containerName, blob_name=outputBlobName)

# Download the blob content
blob_data = blob_client.download_blob().readall()

# Load iris dataset from the task node
# df = pd.read_csv("iris.csv")
df = pd.read_csv(io.BytesIO(blob_data))

# Take a subset of the records
df = df[df['Species'] == "setosa"]

# Save the subset of the iris dataframe locally in the task node
df.to_csv(outputBlobName, index = False)

with open(outputBlobName, "rb") as data:
    blob.upload_blob(data, overwrite=True)

有关使用 Azure Blob 存储的详细信息,请参阅 Azure Blob 存储文档

在本地运行脚本以测试和验证功能。

python main.py

该脚本应生成一个名为 iris_setosa.csv 的输出文件,其中只有包含 Species = setosa 的数据记录。 在确认该脚本正常运行后,将 main.py 脚本文件上传到存储资源管理器 input 容器

设置数据工厂管道

创建并验证使用 Python 脚本的数据工厂管道。

获取帐户信息

数据工厂管道使用你的 Batch 和存储帐户名称、帐户密钥值和 Batch 帐户终结点。 若要从 Azure 门户获取此信息,请执行以下操作:

  1. 在 Azure 搜索栏中,搜索并选择你的 Batch 帐户名称。

  2. 在 Batch 帐户页面上,从左侧导航栏中选择“密钥”。

  3. 在“密钥”页面上,复制以下值:

    • 批处理帐户
    • 帐户终结点
    • 主访问密钥
    • 存储帐户名称
    • 密钥 1

创建并运行管道

  1. 如果 Azure 数据工厂工作室尚未运行,请在 Azure 门户的“数据工厂”页上选择“启动工作室”

  2. 在数据工厂工作室中,选择左侧导航栏中的“创作”铅笔图标

  3. 在“工厂资源”下选择 + 图标,然后选择“管道”

  4. 在右侧的“属性”窗格中,将管道名称更改为“运行 Python”

    选择“添加管道”后的数据工厂工作室屏幕截图。

  5. 在“活动”窗格中展开“Batch 服务”,然后将“自定义”活动拖放到管道设计器图面中

  6. 在设计器画布下面的“常规”选项卡上,在“名称”下输入 testPipeline

    用于创建管道任务的“常规”选项卡的屏幕截图。

  7. 选择“Azure Batch”选项卡,然后选择“新建”

  8. 如下所示填写“新建链接服务”窗体

    • 名称:输入链接服务的名称,例如 AzureBatch1
    • 访问密钥:输入从 Batch 帐户复制的主访问密钥
    • 帐户名称:输入 Batch 帐户名称
    • Batch URL:输入从 Batch 帐户复制的帐户终结点,例如 https://batchdotnet.eastus.batch.azure.com
    • 池名称:输入 custom-activity-pool,即在 Batch Explorer 中创建的池
    • 存储帐户链接服务名称:选择“新建”。 在下一个屏幕上输入链接存储服务的名称(例如 AzureBlobStorage1),选择你的 Azure 订阅和链接存储帐户,然后选择“创建”
  9. 在 Batch“新建链接服务”屏幕的底部,选择“测试连接”。 如果连接成功,请选择“创建”

    Batch 作业的“新建链接服务”屏幕的屏幕截图。

  10. 选择“设置”选项卡,然后输入或选择以下设置

    • 命令:输入 cmd /C python main.py
    • 资源链接服务:选择创建的链接存储服务(例如 AzureBlobStorage1),然后测试连接以确保连接成功
    • 文件夹路径:选择文件夹图标,选择“input”容器,然后选择“确定”。 在运行 Python 脚本之前,此文件夹中的文件会从容器下载到池节点。

    Batch 作业的“设置”选项卡的屏幕截图。

  11. 在管道工具栏上选择“验证”以验证管道

  12. 选择“调试”以测试管道并确保其正常运行

  13. 选择“全部发布”以发布管道

  14. 选择“添加触发器”,然后选择“立即触发”以运行管道,或选择“新建/编辑”以计划管道

    数据工厂中“验证”、“调试”、“全部发布”和“添加触发器”选项的屏幕截图。

使用 Batch Explorer 查看日志文件

如果运行管道时生成了警告或错误,你可以使用 Batch Explorer 查看 stdout.txt 和 stderr.txt 输出文件以了解详细信息

  1. 在 Batch Explorer 中,从左侧边栏中选择“作业”
  2. 选择“adfv2-custom-activity-pool”作业
  3. 选择返回了失败退出代码的任务。
  4. 查看 stdout.txt 和 stderr.txt 文件以调查和诊断问题

清理资源

Batch 帐户、作业和任务是免费的,但计算节点即使在不运行作业时也会产生费用。 最好只根据需要分配节点池,并在用完池后将其删除。 删除池会删除节点上的所有任务输出以及节点本身。

输入和输出文件保留在存储帐户中,可能会产生费用。 不再需要这些文件时,可以删除这些文件或容器。 不再需要 Batch 帐户或链接的存储帐户时,可将其删除。

后续步骤

在本教程中,你已了解如何将 Python 脚本与 Batch Explorer、存储资源管理器和数据工厂配合使用以运行 Batch 工作负载。 有关数据工厂的详细信息,请参阅什么是 Azure 数据工厂?