在本快速入门中,你将使用 Azure CLI 定义流分析作业,用于筛选温度读数大于 27 的实时传感器消息。 流分析作业从 IoT 中心读取数据,转换数据,并将输出数据写入 Blob 存储中的容器。 在本快速入门中使用的输入数据由 Raspberry Pi 联机模拟器生成。
在您开始之前
如果没有 Azure 帐户,请在开始前创建一个免费帐户。
先决条件
在 Azure Cloud Shell 中使用 Bash 环境。 有关详细信息,请参阅 Azure Cloud Shell 入门。
如需在本地运行 CLI 参考命令,请安装 Azure CLI。 如果在 Windows 或 macOS 上运行,请考虑在 Docker 容器中运行 Azure CLI。 有关详细信息,请参阅如何在 Docker 容器中运行 Azure CLI。
如果使用的是本地安装,请使用 az login 命令登录到 Azure CLI。 若要完成身份验证过程,请遵循终端中显示的步骤。 有关其他登录选项,请参阅 使用 Azure CLI 向 Azure 进行身份验证。
出现提示时,请在首次使用时安装 Azure CLI 扩展。 有关扩展的详细信息,请参阅 使用和管理 Azure CLI 中的扩展。
运行az version命令,以查看已安装的版本和依赖库。 若要升级到最新版本,请运行az upgrade。
创建资源组。 所有 Azure 资源都必须部署到资源组中。 使用资源组可以组织和管理相关的 Azure 资源。
在本快速入门中,使用以下 az group create 命令在 eastus 位置创建名为 streamanalyticsrg 的资源组:
az group create --name streamanalyticsrg --___location eastus
对输入数据进行准备
在定义流分析作业之前,请准备用于作业输入的数据。 以下 Azure CLI 命令准备作业所需的 输入 数据。
使用 az iot hub create 命令创建 IoT 中心。 此示例创建名为 MyASAIoTHub 的 IoT 中心。 由于 IoT 中心名称必须全局唯一,因此如果它已经被占用,可能需要更改名称。 将 SKU 设置为 F1 即可使用免费层,前提是它在订阅中可用。 否则,请选择下一个最低的层。
iotHubName=MyASAIoTHub az iot hub create --name $iotHubName --resource-group streamanalyticsrg --sku S1
创建 IoT 中心以后,请使用 az iot hub connection-string show 命令获取 IoT 中心连接字符串。 复制整个连接字符串并进行保存。 在将 IoT 中心作为输入添加到流分析作业时使用它。
az iot hub connection-string show --hub-name $iotHubName
使用 az iothub device-identity create 命令将设备添加到 IoT 中心。 此示例创建名为 MyASAIoTDevice 的设备。
az iot hub device-identity create --hub-name $iotHubName --device-id "MyASAIoTDevice"
使用 az iot hub device-identity connection-string show 命令获取设备连接字符串。 复制整个连接字符串并将其保存。这样,在创建 Raspberry Pi 模拟器时,就可以使用该字符串。
az iot hub device-identity connection-string show --hub-name $iotHubName --device-id "MyASAIoTDevice" --output table
输出示例:
HostName=MyASAIoTHub.azure-devices.net;DeviceId=MyASAIoTDevice;SharedAccessKey=a2mnUsg52+NIgYudxYYUNXI67r0JmNubmfVafojG8=
创建 Blob 存储帐户
以下 Azure CLI 命令创建用于作业输出的 Blob 存储帐户。
使用 az storage account create 命令创建常规用途存储帐户。 常规用途的存储帐户可用于以下四种服务:Blob、文件、表和队列。
storageAccountName="asatutorialstorage$RANDOM" az storage account create \ --name $storageAccountName \ --resource-group streamanalyticsrg \ --___location eastus \ --sku Standard_ZRS \ --encryption-services blob
通过运行 az storage account keys list 命令获取存储帐户的密钥。
key=$(az storage account keys list -g streamanalyticsrg -n $storageAccountName --query "[0].value" -o tsv) echo $key
重要
记下 Azure 存储帐户的访问密钥。 本快速入门稍后将使用此密钥。
使用 az storage container create 命令创建一个名为
state
存储 blob 的容器。 使用存储帐户密钥以授权操作来创建容器。 有关通过 Azure CLI 授权数据操作的详细信息,请参阅 使用 Azure CLI 授权访问 Blob 或队列数据。az storage container create \ --account-name $storageAccountName \ --name state \ --account-key $key \ --auth-mode key
创建流分析作业
使用 az stream-analytics job create 命令创建流分析作业。
az stream-analytics job create \
--job-name "streamanalyticsjob" \
--resource-group "streamanalyticsrg" \
--___location "eastus" \
--output-error-policy "Drop" \
--out-of-order-policy "Drop" \
--order-max-delay 5 \
--arrival-max-delay 16 \
--data-locale "en-US"
配置作业输入
使用 az stream-analytics input cmdlet 将输入添加到作业。 此 cmdlet 采用 JSON 格式的作业名称、作业输入名称、资源组名称和输入属性作为参数。 在此示例中,你将创建一个 IoT 中心作为输入。
重要
- 将
IOT HUB ACCESS KEY
替换为你保存的 IOT 中心连接字符串中的共享访问密钥值。 例如,如果 IOT 中心连接字符串为:HostName=MyASAIoTHub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=xxxxxxxxxxxxxx=
,则共享访问密钥值为xxxxxxxxxxxxxx=
。 替换值时,请确保不要删除"
(双引号)的\
(转义)字符。 - 如果您使用的名称不是
MyASAIoTHub
,请更新以下命令中iotHubNamespace
的值。 运行echo $iotHubName
以查看 IoT 中心的名称。
az stream-analytics input create \
--properties "{\"type\":\"Stream\",\"datasource\":{\"type\":\"Microsoft.Devices/IotHubs\",\"properties\":{\"consumerGroupName\":\"\$Default\",\"endpoint\":\"messages/events\",\"iotHubNamespace\":\"MyASAIoTHub\",\"sharedAccessPolicyKey\":\"IOT HUB ACCESS KEY\",\"sharedAccessPolicyName\":\"iothubowner\"}},\"serialization\":{\"type\":\"Json\",\"encoding\":\"UTF8\"}}" \
--input-name "asaiotinput" \
--job-name "streamanalyticsjob" \
--resource-group "streamanalyticsrg"
配置作业输出
使用 az stream-analytics output create cmdlet 将输出添加到您的作业中。 此 cmdlet 采用作业名称、作业输出名称、资源组名称、JSON 格式的数据源和序列化类型作为参数。
重要
将 STORAGEACCOUNTNAME>
替换为您的 Azure 存储帐户的名称,将 STORAGEACCESSKEY>
替换为存储帐户的访问密钥。 如果未记下这些值,请运行以下命令来获取这些值: echo $storageAccountName
和 echo $key
。 替换这些值时,请确保不要删除 "
(双引号)的 \
(转义)字符。
az stream-analytics output create \
--job-name streamanalyticsjob \
--datasource "{\"type\":\"Microsoft.Storage/Blob\",\"properties\":{\"container\":\"state\",\"dateFormat\":\"yyyy/MM/dd\",\"pathPattern\":\"{date}/{time}\",\"storageAccounts\":[{\"accountKey\":\"STORAGEACCESSKEY\",\"accountName\":\"STORAGEACCOUNTNAME\"}],\"timeFormat\":\"HH\"}}" \
--serialization "{\"type\":\"Json\",\"properties\":{\"format\":\"Array\",\"encoding\":\"UTF8\"}}" \
--output-name asabloboutput \
--resource-group streamanalyticsrg
定义转换查询
使用 az stream-analytics transformation create cmdlet 将转换添加到作业。
az stream-analytics transformation create \
--resource-group streamanalyticsrg \
--job-name streamanalyticsjob \
--name Transformation \
--streaming-units "6" \
--saql "SELECT * INTO asabloboutput FROM asaiotinput WHERE Temperature > 27"
运行 IoT 模拟器
将第 15 行中的占位符替换为在快速入门开头保存的整个 Azure IoT 中心设备连接字符串(而不是 IoT 中心连接字符串)。
选择 运行。 输出会显示传感器数据和发送到 IoT 中心的消息。
启动流分析作业并检查输出
使用 az stream-analytics job start cmdlet 启动作业。 此 cmdlet 使用作业名称、资源组名称、输出启动模式和启动时间作为参数。 OutputStartMode
接受的值为 JobStartTime
、CustomTime
或 LastOutputEventTime
。
以下 cmdlet 在运行以后会返回 True
作为输出(如果作业启动)。
az stream-analytics job start \
--resource-group streamanalyticsrg \
--name streamanalyticsjob \
--output-start-mode JobStartTime
为它提供几分钟,然后验证是否已在 state
Blob 容器中创建输出文件。
下载并打开该文件以查看类似于以下条目的多个条目:
{
"messageId": 229,
"deviceId": "Raspberry Pi Web Client",
"temperature": 31.85214010589595,
"humidity": 60.278830289656284,
"EventProcessedUtcTime": "2023-02-28T22:06:33.5567789Z",
"PartitionId": 3,
"EventEnqueuedUtcTime": "2023-02-28T22:05:49.6520000Z",
"IoTHub": {
"MessageId": null,
"CorrelationId": null,
"ConnectionDeviceId": "MyASAIoTDevice",
"ConnectionDeviceGenerationId": "638132150746523845",
"EnqueuedTime": "2023-02-28T22:05:49.6520000Z",
"StreamId": null
}
}
清理资源
删除资源组,这将删除资源组中的所有资源,包括流分析作业、IoT 中心和 Azure 存储帐户。
az group delete \
--name streamanalyticsrg \
--no-wait
后续步骤
在本快速入门中,你已使用 Azure CLI 部署了一个简单的流分析作业。 也可通过 Azure 门户和 Visual Studio 部署流分析作业。
若要了解如何配置其他输入源并执行实时检测,请继续阅读以下文章: