중요합니다
이 페이지에는 미리 보기 상태인 Kubernetes 배포 매니페스트를 사용하여 Azure IoT 작업 구성 요소를 관리하는 방법에 대한 지침이 포함되어 있습니다. 이 기능은 여러 제한 사항을 가지고 있으므로 프로덕션 워크로드에는 사용하면 안 됩니다.
베타, 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 약관은 Microsoft Azure 미리 보기에 대한 추가 사용 약관을 참조하세요.
데이터 흐름은 데이터가 선택적 변환을 거쳐 원본에서 대상까지 이동하는 경로입니다. 데이터 흐름 사용자 지정 리소스를 만들거나 작업 환경 웹 UI를 사용하여 데이터 흐름을 구성할 수 있습니다. 데이터 흐름은 원본, 변환, 대상의 세 부분으로 구성됩니다.
원본과 대상을 정의하려면 데이터 흐름 엔드포인트를 구성해야 합니다. 변환은 선택 사항이며 데이터 보강, 데이터 필터링, 데이터를 다른 필드에 매핑하는 등의 작업을 포함할 수 있습니다.
중요합니다
각 데이터 흐름에는 Azure IoT 작업 로컬 MQTT 브로커 기본 엔드포인트가 원본 또는 대상으로 있어야 합니다.
Azure IoT 작업의 운영 환경을 사용하여 데이터 흐름을 만들 수 있습니다. 운영 환경은 데이터 흐름을 구성하기 위한 시각적 인터페이스를 제공합니다. Bicep을 사용하여 Bicep 파일을 사용하여 데이터 흐름을 만들거나 Kubernetes를 사용하여 YAML 파일을 사용하여 데이터 흐름을 만들 수도 있습니다.
계속해서 읽어서 원본, 변환 및 대상을 구성하는 방법을 알아봅니다.
필수 구성 요소
기본 데이터 흐름 프로필과 엔드포인트를 사용하여 Azure IoT 작업 인스턴스가 있으면 즉시 데이터 흐름을 배포할 수 있습니다. 하지만 데이터 흐름을 사용자 지정하기 위해 데이터 흐름 프로필과 엔드포인트를 구성하려고 할 수도 있습니다.
데이터 흐름 프로필
데이터 흐름에 대해 다른 크기 조정 설정이 필요하지 않은 경우 Azure IoT 작업에서 제공하는 기본 데이터 흐름 프로필을 사용합니다. 너무 많은 데이터 흐름을 단일 데이터 흐름 프로필과 연결하지 않아야 합니다. 많은 수의 데이터 흐름이 있는 경우 여러 데이터 흐름 프로필에 분산하여 데이터 흐름 프로필 구성 크기 제한을 초과할 위험을 줄입니다.
새 데이터 흐름 프로필을 구성하는 방법을 알아보려면 데이터 흐름 프로필 구성을 참조하세요.
데이터 흐름 엔드포인트
데이터 흐름 엔드포인트는 데이터 흐름의 원본과 대상을 구성하는 데 필요합니다. 빠르게 시작하려면 로컬 MQTT 브로커의 기본 데이터 흐름 엔드포인트를 사용할 수 있습니다. Kafka, Event Hubs 또는 Azure Data Lake Storage와 같은 다른 형식의 데이터 흐름 엔드포인트를 만들 수도 있습니다. 각 형식의 데이터 흐름 엔드포인트를 구성하는 방법을 알아보려면 데이터 흐름 엔드포인트 구성을 참조하세요.
시작하기
필수 구성 요소를 갖추면 데이터 흐름을 만들 수 있습니다.
운영 환경에서 데이터 흐름을 만들려면 데이터 흐름>데이터 흐름 만들기를 선택합니다.
자리 표시자 이름 new-data-flow 를 선택하여 데이터 흐름 속성을 설정합니다. 데이터 흐름의 이름을 입력하고 사용할 데이터 흐름 프로필을 선택합니다. 기본 데이터 흐름 프로필은 기본적으로 선택됩니다. 데이터 흐름 프로필에 대한 자세한 내용은 데이터 흐름 프로필 구성을 참조하세요.
중요합니다
데이터 흐름을 만들 때만 데이터 흐름 프로필을 선택할 수 있습니다. 데이터 흐름을 만든 후에는 데이터 흐름 프로필을 변경할 수 없습니다.
기존 데이터 흐름의 데이터 흐름 프로필을 변경하려면 원래 데이터 흐름을 삭제하고 새 데이터 흐름 프로필을 사용하여 새 데이터 흐름을 만듭니다.
데이터 흐름 다이어그램에서 항목을 선택하여 데이터 흐름에 대한 원본, 변환 및 대상 엔드포인트를 구성합니다.
az iot ops dataflow apply 명령을 사용하여 데이터 흐름을 만들거나 변경합니다.
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
--config-file
매개 변수는 리소스 속성을 포함하는 JSON 구성 파일의 경로 및 파일 이름입니다.
이 예제에서는 사용자의 홈 디렉터리에 저장된 다음 콘텐츠로 명명 data-flow.json
된 구성 파일을 가정합니다.
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
// See source configuration section
}
},
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
// See transformation configuration section
}
},
{
"operationType": "Destination",
"destinationSettings": {
// See destination configuration section
}
}
]
}
다음은 기본 데이터 흐름 프로필을 사용하여 데이터 흐름을 만들거나 업데이트하는 예제 명령입니다.
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
데이터 흐름 만들기를 시작하려면 Bicep .bicep
파일을 만듭니다. 이 예에서는 원본, 변환 및 대상 구성을 포함하는 데이터 흐름의 구조를 보여 줍니다.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// See source configuration section
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// See transformation configuration section
}
}
{
operationType: 'Destination'
destinationSettings: {
// See destination configuration section
}
}
]
}
}
Kubernetes 매니페스트 .yaml
파일을 만들어 데이터 흐름 만들기를 시작합니다. 이 예에서는 원본, 변환 및 대상 구성을 포함하는 데이터 흐름의 구조를 보여 줍니다.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: <DATAFLOW_NAME>
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
# This field is required when configuring via Kubernetes YAML
# The syntax is different when using Bicep
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
# Transformation optional
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
다음 섹션을 검토하여 데이터 흐름의 작업 유형을 구성하는 방법을 알아봅니다.
출처
데이터 흐름에 대한 원본을 구성하려면 엔드포인트 참조와 엔드포인트에 대한 데이터 원본 목록을 지정합니다. 다음 옵션 중 하나를 데이터 흐름의 원본으로 선택합니다.
기본 엔드포인트가 원본으로 사용되지 않으면 대상으로 사용해야 합니다. 로컬 MQTT broker 엔드포인트를 사용하는 방법에 대한 자세한 내용은 데이터 흐름이 로컬 MQTT broker 엔드포인트를 사용해야 하므로 참조하세요.
옵션 1: 기본 메시지 브로커 엔드포인트를 원본으로 사용
소스 세부 정보에서 메시지 브로커를 선택합니다.
메시지 브로커 원본에 대해 다음 설정을 입력합니다.
설정 |
설명 |
데이터 흐름 엔드포인트 |
기본 MQTT 메시지 브로커 엔드포인트를 사용하려면 기본값을 선택합니다. |
항목 |
들어오는 메시지에 대한 구독 항목 필터입니다. 여러 항목을 추가하려면 항목>행 추가를 사용합니다. 항목에 대한 자세한 내용은 MQTT 또는 Kafka 항목 구성을 참조하세요. |
메시지 스키마 |
들어오는 메시지를 역직렬화하는 데 사용할 스키마입니다. 데이터 역직렬화를 위한 스키마 지정을 참조하세요. |
적용을 선택합니다.
다음은 기본 MQTT broker 엔드포인트에 대한 예제 원본 엔드포인트 구성입니다.
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default"
}
}
메시지 브로커 엔드포인트는 Bicep 파일에 구성됩니다. 예를 들어, 다음 엔드포인트는 데이터 흐름의 원본입니다.
sourceSettings: {
endpointRef: 'default'
dataSources: [
'thermostats/+/sensor/temperature/#'
'humidifiers/+/sensor/humidity/#'
]
}
예를 들어, 메시지 브로커 엔드포인트와 두 개의 항목 필터를 사용하여 원본을 구성하려면 다음 구성을 사용합니다.
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/sensor/temperature/#
- humidifiers/+/sensor/humidity/#
dataSources
를 사용하면 엔드포인트 구성을 수정하지 않고도 MQTT 또는 Kafka 항목을 지정할 수 있으므로 항목이 다르더라도 여러 데이터 흐름에 대해 엔드포인트를 다시 사용할 수 있습니다. 자세한 내용은 데이터 원본 구성을 참조하세요.
옵션 2: 자산을 원본으로 사용
자산을 데이터 흐름의 원본으로 사용할 수 있습니다. 자산을 원본으로 사용하는 기능은 운영 환경에서만 사용할 수 있습니다.
소스 세부 정보에서 자산을 선택합니다.
원본 엔드포인트로 사용할 자산을 선택합니다.
계속을 선택합니다.
선택한 자산에 대한 데이터 포인트 목록이 표시됩니다.
자산을 원본 엔드포인트로 사용하려면 적용을 선택합니다.
자산을 원본으로 구성하는 기능은 운영 환경에서만 사용할 수 있습니다.
자산을 원본으로 구성하는 기능은 운영 환경에서만 사용할 수 있습니다.
자산을 원본으로 구성하는 기능은 운영 환경에서만 사용할 수 있습니다.
자산을 원본으로 사용하는 경우 자산 정의는 데이터 흐름에 대한 스키마를 유추하는 데 사용됩니다. 자산 정의에는 자산의 데이터 포인트에 대한 스키마가 포함됩니다. 자세한 내용은 원격으로 자산 구성 관리를 참조하세요.
구성이 완료되면 자산의 데이터는 로컬 MQTT 브로커를 통해 데이터 흐름에 도달합니다. 따라서 자산을 원본으로 사용하는 경우 데이터 흐름은 실제로 로컬 MQTT 브로커 기본 엔드포인트를 원본으로 사용합니다.
옵션 3: 사용자 지정 MQTT 또는 Kafka 데이터 흐름 엔드포인트를 원본으로 사용
사용자 지정 MQTT 또는 Kafka 데이터 흐름 엔드포인트를 만든 경우(예: Event Grid 또는 Event Hubs와 함께 사용) 이를 데이터 흐름의 원본으로 사용할 수 있습니다. Data Lake나 Fabric OneLake와 같은 스토리지 유형 엔드포인트는 원본으로 사용할 수 없습니다.
소스 세부 정보에서 메시지 브로커를 선택합니다.
메시지 브로커 원본에 대해 다음 설정을 입력합니다.
적용을 선택합니다.
자리 표시자 값을 사용자 지정 엔드포인트 이름과 항목으로 바꿉니다.
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataSources": [
"<TOPIC_1>",
"<TOPIC_2>"
]
}
}
자리 표시자 값을 사용자 지정 엔드포인트 이름과 항목으로 바꿉니다.
sourceSettings: {
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
dataSources: [
'<TOPIC_1>'
'<TOPIC_2>'
// See section on configuring MQTT or Kafka topics for more information
]
}
자리 표시자 값을 사용자 지정 엔드포인트 이름과 항목으로 바꿉니다.
sourceSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataSources:
- <TOPIC_1>
- <TOPIC_2>
# See section on configuring MQTT or Kafka topics for more information
데이터 흐름 엔드포인트 구성을 수정하지 않고도 원본에서 여러 MQTT 또는 Kafka 항목을 지정할 수 있습니다. 이러한 유연성 덕분에 항목이 다르더라도 동일한 엔드포인트를 여러 데이터 흐름에서 다시 사용할 수 있습니다. 자세한 내용은 데이터 흐름 엔드포인트 다시 사용을 참조하세요.
MQTT 토픽
원본이 MQTT(Event Grid 포함) 엔드포인트인 경우 MQTT 항목 필터를 사용하여 들어오는 메시지를 구독할 수 있습니다. 항목 필터에는 와일드카드를 포함하여 여러 항목을 구독할 수 있습니다. 예를 들어 thermostats/+/sensor/temperature/#
자동 온도 조절기의 모든 온도 센서 메시지를 구독합니다. MQTT 항목 필터를 구성하려면:
운영 환경 데이터 흐름 소스 세부 정보에서 메시지 브로커를 선택한 다음 항목 필드를 사용하여 들어오는 메시지에 대해 구독할 MQTT 항목 필터를 지정합니다. 행 추가를 선택하고 새 항목을 입력하여 여러 MQTT 항목을 추가할 수 있습니다.
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<MESSAGE_BROKER_ENDPOINT_NAME>",
"dataSources": [
"<TOPIC_FILTER_1>",
"<TOPIC_FILTER_2>"
// Add more topic filters as needed
]
}
}
와일드카드를 사용한 여러 MQTT 항목 필터의 예:
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
]
}
}
여기에서 와일드카드 +
은 thermostats
및 humidifiers
항목 아래의 모든 디바이스를 선택하는 데 사용됩니다. #
와일드카드는 temperature
및 humidity
토픽의 모든 하위 주제에서 모든 센서 메시지를 선택하는 데 사용됩니다.
sourceSettings: {
endpointRef: '<MESSAGE_BROKER_ENDPOINT_NAME>'
dataSources: [
'<TOPIC_FILTER_1>'
'<TOPIC_FILTER_2>'
// Add more topic filters as needed
]
}
와일드카드를 사용한 여러 MQTT 항목 필터의 예:
sourceSettings: {
endpointRef: 'default'
dataSources: [
'thermostats/+/sensor/temperature/#'
'humidifiers/+/sensor/humidity/#'
]
}
여기에서 와일드카드 +
은 thermostats
및 humidifiers
항목 아래의 모든 디바이스를 선택하는 데 사용됩니다. #
와일드카드는 temperature
및 humidity
토픽의 모든 하위 주제에서 모든 센서 메시지를 선택하는 데 사용됩니다.
sourceSettings:
endpointRef: <ENDPOINT_NAME>
dataSources:
- <TOPIC_FILTER_1>
- <TOPIC_FILTER_2>
# Add more topic filters as needed
와일드카드를 사용한 여러 항목 필터의 예:
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/sensor/temperature/#
- humidifiers/+/sensor/humidity/#
여기에서 와일드카드 +
은 thermostats
및 humidifiers
항목 아래의 모든 디바이스를 선택하는 데 사용됩니다. #
와일드카드는 토픽의 temperature
humidity
모든 하위 항목에서 모든 메시지를 선택하는 데 사용됩니다.
공유 구독
메시지 브로커 원본과 공유 구독을 사용하려면 $shared/<GROUP_NAME>/<TOPIC_FILTER>
형식으로 공유 구독 항목을 지정할 수 있습니다.
운영 환경 데이터 흐름 소스 세부 정보에서 메시지 브로커를 선택하고 항목 필드를 사용하여 공유 구독 그룹과 항목을 지정합니다.
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"$shared/<GROUP_NAME>/<TOPIC_FILTER>"
]
}
}
sourceSettings: {
dataSources: [
'$shared/<GROUP_NAME>/<TOPIC_FILTER>'
]
}
sourceSettings:
dataSources:
- $shared/<GROUP_NAME>/<TOPIC_FILTER>
데이터 흐름 프로필의 인스턴스 수가 1보다 크면 메시지 브로커 원본을 사용하는 모든 데이터 흐름에 대해 공유 구독이 자동으로 사용하도록 설정됩니다. 이 경우 $shared
접두사가 추가되고 공유 구독 그룹 이름이 자동으로 생성됩니다. 예를 들어, 인스턴스 수가 3인 데이터 흐름 프로필이 있고 데이터 흐름이 항목 topic1
및 topic2
로 구성된 원본으로 메시지 브로커 엔드포인트를 사용하는 경우 이는 자동으로 $shared/<GENERATED_GROUP_NAME>/topic1
및 $shared/<GENERATED_GROUP_NAME>/topic2
로 공유 구독으로 변환됩니다.
구성에서 $shared/mygroup/topic
이라는 이름의 항목을 명시적으로 만들 수 있습니다. 그러나 $shared
항목을 명시적으로 추가하는 것은 권장되지 않습니다. 필요할 때 $shared
접두사가 자동으로 추가되기 때문입니다. 그룹 이름이 설정되지 않은 경우 데이터 흐름은 그룹 이름을 사용하여 최적화될 수 있습니다. 예를 들어, $share
가 설정되지 않으면 데이터 흐름은 항목 이름에 대해서만 작동해야 합니다.
중요합니다
인스턴스 수가 1보다 클 때 공유 구독이 필요한 데이터 흐름은 Event Grid MQTT 브로커를 원본으로 사용하는 경우 중요합니다. 이는 공유 구독을 지원하지 않기 때문입니다. 메시지 누락을 방지하려면 Event Grid MQTT 브로커를 원본으로 사용할 때 데이터 흐름 프로필 인스턴스 수를 1로 설정합니다. 이는 데이터 흐름이 구독자이고 클라우드로부터 메시지를 수신하는 경우입니다.
Kafka 항목
원본이 Kafka(Event Hubs 포함) 엔드포인트인 경우, 들어오는 메시지에 대해 구독할 개별 Kafka 항목을 지정합니다. 와일드카드는 지원되지 않으므로 각 항목을 정적으로 지정해야 합니다.
참고
Kafka 엔드포인트를 통해 Event Hubs를 사용하는 경우 네임스페이스 내의 각 개별 이벤트 허브는 Kafka 항목입니다. 예를 들어, thermostats
와 humidifiers
라는 두 개의 이벤트 허브가 있는 Event Hubs 네임스페이스가 있는 경우 각 이벤트 허브를 Kafka 항목으로 지정할 수 있습니다.
Kafka 항목을 구성하려면:
운영 환경 데이터 흐름 소스 세부 정보에서 메시지 브로커를 선택한 다음 항목 필드를 사용하여 들어오는 메시지에 대해 구독할 Kafka 항목 필터를 지정합니다.
참고
운영 환경에서는 항목 필터를 하나만 지정할 수 있습니다. 여러 항목 필터를 사용하려면 Bicep 또는 Kubernetes를 사용합니다.
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<KAFKA_ENDPOINT_NAME>",
"dataSources": [
"<KAFKA_TOPIC_1>",
"<KAFKA_TOPIC_2>"
// Add more Kafka topics as needed
]
}
}
sourceSettings: {
endpointRef: '<KAFKA_ENDPOINT_NAME>'
dataSources: [
'<KAFKA_TOPIC_1>'
'<KAFKA_TOPIC_2>'
// Add more Kafka topics as needed
]
}
sourceSettings:
endpointRef: <KAFKA_ENDPOINT_NAME>
dataSources:
- <KAFKA_TOPIC_1>
- <KAFKA_TOPIC_2>
# Add more Kafka topics as needed
원본 스키마 지정
MQTT 또는 Kafka를 원본으로 사용하는 경우 작업 환경 웹 UI에서 데이터 요소 목록을 표시하는 스키마 를 지정할 수 있습니다. 스키마를 사용하여 들어오는 메시지를 역직렬화하고 유효성 검사하는 것은 현재 지원되지 않습니다.
원본이 자산인 경우 스키마는 자산 정의에서 자동으로 유추됩니다.
원본에서 들어오는 메시지를 역직렬화하는 데 사용되는 스키마를 구성하려면 다음을 수행합니다.
운영 환경 데이터 흐름 소스 세부 정보에서 메시지 브로커를 선택하고 메시지 스키마 필드를 사용하여 스키마를 지정합니다. 먼저 업로드 단추를 사용하여 스키마 파일을 업로드할 수 있습니다. 자세한 내용은 메시지 스키마 이해를 참조하세요.
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<ENDPOINT_NAME>",
"serializationFormat": "Json",
"schemaRef": "aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>"
}
}
스키마 레지스트리를 사용하여 스키마를 저장한 후 데이터 흐름 구성에서 이를 참조할 수 있습니다.
sourceSettings: {
serializationFormat: 'Json'
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>'
}
스키마 레지스트리를 사용하여 스키마를 저장한 후 데이터 흐름 구성에서 이를 참조할 수 있습니다.
sourceSettings:
serializationFormat: Json
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>'
자세한 내용은 메시지 스키마 이해를 참조하세요.
변환 작업은 데이터를 대상으로 보내기 전에 원본의 데이터를 변환하는 작업입니다. 변환은 선택 사항입니다. 데이터를 변경할 필요가 없다면 데이터 흐름 구성에 변환 작업을 포함하지 마세요. 여러 개의 변환은 구성에서 지정된 순서에 관계없이 단계적으로 연결됩니다. 단계의 순서는 항상 다음과 같습니다.
- 보강: 데이터 세트와 일치 조건이 주어지면 원본 데이터에 추가 데이터를 추가합니다.
- 필터: 조건에 따라 데이터를 필터링합니다.
- 맵, 컴퓨팅, 이름 바꾸기 또는 새 속성 추가: 선택적 변환을 통해 한 필드에서 다른 필드로 데이터를 이동합니다.
이 섹션은 데이터 흐름 변환에 대한 소개입니다. 자세한 내용은 데이터 흐름을 사용하여 데이터 매핑, 데이터 흐름 변환을 사용하여 데이터 변환 및 데이터 흐름을 사용하여 데이터 보강을 참조하세요.
운영 환경에서 데이터 흐름>변환 추가(선택 사항)를 선택합니다.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
// See section on enriching data
],
"filter": [
// See section on filtering data
],
"map": [
// See section on mapping data
]
}
}
builtInTransformationSettings: {
datasets: [
// See section on enriching data
]
filter: [
// See section on filtering data
]
map: [
// See section on mapping data
]
}
builtInTransformationSettings:
datasets:
# See section on enriching data
filter:
# See section on filtering data
map:
# See section on mapping data
보강: 참조 데이터 추가
데이터를 풍부하게 하려면 먼저 Azure IoT 작업 상태 저장소에 참조 데이터 세트를 추가합니다. 데이터 세트는 조건에 따라 원본 데이터에 추가 데이터를 추가하는 데 사용됩니다. 조건은 데이터 세트의 필드와 일치하는 원본 데이터의 필드로 지정됩니다.
state store CLI를 사용하여 샘플 데이터를 상태 저장소에 로드할 수 있습니다. 상태 저장소의 키 이름은 데이터 흐름 구성의 데이터 세트에 해당합니다.
현재 보강 단계는 운영 환경에서 지원되지 않습니다.
데이터를 보강하려면 데이터 흐름 구성에서 속성을 사용할 builtInTransformationSettings
수 있습니다. 이 datasets
속성은 보강에 사용할 데이터 세트를 지정하는 데 사용됩니다.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
{
"key": "<DATASET_KEY>",
"inputs": [
"$source.<SOURCE_FIELD>" // ---------------- $1
"$context(<DATASET_KEY>).<DATASET_FIELD>" // - $2
],
"expression": "$1 == $2"
}
]
}
}
이 예에서는 원본 데이터의 deviceId
필드를 사용하여 데이터 세트의 asset
필드와 일치하는 방법을 보여 줍니다.
builtInTransformationSettings: {
datasets: [
{
key: 'assetDataset'
inputs: [
'$source.deviceId' // ---------------- $1
'$context(assetDataset).asset' // ---- $2
]
expression: '$1 == $2'
}
]
}
예를 들어, 원본 데이터의 deviceId
필드를 사용하여 데이터 세트의 asset
필드와 일치시킬 수 있습니다.
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
데이터 세트에 asset
필드가 있는 레코드가 있는 경우 다음과 같습니다.
{
"asset": "thermostat1",
"___location": "room1",
"manufacturer": "Contoso"
}
deviceId
과 일치하는 thermostat1
필드가 있는 원본의 데이터에는 필터 및 맵 단계에서 사용할 수 있는 ___location
및 manufacturer
필드가 있습니다.
조건 구문에 대한 자세한 내용은 데이터 흐름을 사용하여 데이터 보강 및 데이터 흐름을 사용하여 데이터 변환을 참조하세요.
필터: 조건에 따라 데이터 필터링
조건에 따라 데이터를 필터링하려면 filter
단계를 사용할 수 있습니다. 조건은 값과 일치하는 원본 데이터의 필드로 지정됩니다.
변환(선택 사항)에서 필터>추가를 선택합니다.
필요한 설정을 입력합니다.
설정 |
설명 |
필터 조건 |
원본 데이터의 필드를 기준으로 데이터를 필터링하는 조건. |
설명 |
필터 조건에 대한 설명을 제공합니다. |
필터 조건 필드에 @
을 입력하거나 Ctrl + 스페이스를 선택하여 드롭다운에서 데이터 포인트를 선택합니다.
@$metadata.user_properties.<property>
또는 @$metadata.topic
형식을 사용하여 MQTT 메타데이터 속성을 입력할 수 있습니다. @$metadata.<header>
형식을 사용하여 $metadata 헤더를 입력할 수도 있습니다. $metadata
구문은 메시지 헤더의 일부인 MQTT 속성에만 필요합니다. 자세한 내용은 필드 참조를 참조하세요.
조건에서는 원본 데이터의 필드를 사용할 수 있습니다. 예를 들어, @temperature > 20
과 같은 필터 조건을 사용하면 온도 필드를 기준으로 20 이하인 데이터를 필터링할 수 있습니다.
적용을 선택합니다.
예를 들어, 원본 데이터의 temperature
필드를 사용하여 데이터를 필터링할 수 있습니다.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"filter": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"expression": "$1 > 20"
}
]
}
}
예를 들어, 원본 데이터의 temperature
필드를 사용하여 데이터를 필터링할 수 있습니다.
builtInTransformationSettings: {
filter: [
{
inputs: [
'temperature ? $last'
]
expression: '$1 > 20'
}
]
}
temperature
필드가 20보다 큰 경우, 데이터는 다음 단계로 전달됩니다. temperature
필드가 20보다 작거나 같으면 데이터가 필터링됩니다.
예를 들어, 원본 데이터의 temperature
필드를 사용하여 데이터를 필터링할 수 있습니다.
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
temperature
필드가 20보다 큰 경우, 데이터는 다음 단계로 전달됩니다. temperature
필드가 20보다 작거나 같으면 데이터가 필터링됩니다.
맵: 한 필드에서 다른 필드로 데이터 이동
선택적 변환을 통해 데이터를 다른 필드에 매핑하려면 map
연산을 사용할 수 있습니다. 변환은 원본 데이터의 필드를 사용하는 수식으로 지정됩니다.
운영 환경에서는 현재 컴퓨팅, 이름 바꾸기 및 새 속성 변환을 사용하여 매핑이 지원됩니다.
컴퓨팅
컴퓨팅 변환을 사용하여 원본 데이터에 수식을 적용할 수 있습니다. 이 작업은 원본 데이터에 수식을 적용하고 결과 필드를 저장하는 데 사용됩니다.
변환(선택 사항)에서 컴퓨팅>추가를 선택합니다.
필요한 설정을 입력합니다.
설정 |
설명 |
수식 선택 |
드롭다운에서 기존 수식을 선택하거나 사용자 지정을 선택하여 수식을 직접 입력합니다. |
출력 |
결과에 대한 출력 표시 이름을 지정합니다. |
수식 |
원본 데이터에 적용할 수식을 입력합니다. |
설명 |
변환에 대한 설명을 제공합니다. |
마지막으로 알려진 값 |
선택적으로, 현재 값을 사용할 수 없는 경우 마지막으로 알려진 값을 사용합니다. |
수식 필드에서 수식을 입력하거나 편집할 수 있습니다. 수식은 원본 데이터의 필드를 사용할 수 있습니다. 드롭다운에서 데이터 포인트를 선택하려면 @
을 입력하거나 Ctrl + 스페이스바를 선택합니다. 기본 제공 수식의 경우 사용 가능한 데이터 포인트 목록을 보려면 <dataflow>
자리 표시자를 선택합니다.
@$metadata.user_properties.<property>
또는 @$metadata.topic
형식을 사용하여 MQTT 메타데이터 속성을 입력할 수 있습니다. @$metadata.<header>
형식을 사용하여 $metadata 헤더를 입력할 수도 있습니다. $metadata
구문은 메시지 헤더의 일부인 MQTT 속성에만 필요합니다. 자세한 내용은 필드 참조를 참조하세요.
수식은 원본 데이터의 필드를 사용할 수 있습니다. 예를 들어, 원본 데이터의 temperature
필드를 사용하여 온도를 섭씨로 변환하고 temperatureCelsius
출력 필드에 저장할 수 있습니다.
적용을 선택합니다.
이름 바꾸기
이름 바꾸기 변환을 사용하여 데이터 포인트의 이름을 바꿀 수 있습니다. 이 작업은 원본 데이터의 데이터 포인트 이름을 새 이름으로 바꾸는 데 사용됩니다. 새 이름은 데이터 흐름의 후속 단계에서 사용될 수 있습니다.
변환(선택 사항)에서 이름 바꾸기>추가를 선택합니다.
필요한 설정을 입력합니다.
설정 |
설명 |
데이터 포인트 |
드롭다운에서 데이터 포인트를 선택하거나 $metadata 헤더를 입력합니다. |
새 데이터 포인트 이름 |
데이터 포인트의 새 이름을 입력합니다. |
설명 |
변환에 대한 설명을 제공합니다. |
@$metadata.user_properties.<property>
또는 @$metadata.topic
형식을 사용하여 MQTT 메타데이터 속성을 입력할 수 있습니다. @$metadata.<header>
형식을 사용하여 $metadata 헤더를 입력할 수도 있습니다. $metadata
구문은 메시지 헤더의 일부인 MQTT 속성에만 필요합니다. 자세한 내용은 필드 참조를 참조하세요.
적용을 선택합니다.
새 속성
새 속성 변환을 사용하여 원본 데이터에 새 속성을 추가할 수 있습니다. 이 작업은 원본 데이터에 새 속성을 추가하는 데 사용됩니다. 새 속성은 데이터 흐름의 후속 단계에서 사용될 수 있습니다.
변환(선택 사항)에서 새 속성>추가를 선택합니다.
필요한 설정을 입력합니다.
설정 |
설명 |
속성 키 |
새 속성의 키를 입력합니다. |
속성 값 |
새 속성의 값을 입력합니다. |
설명 |
새 속성에 대한 설명을 제공합니다. |
적용을 선택합니다.
예를 들어, 원본 데이터의 temperature
필드를 사용하여 온도를 섭씨로 변환하고 temperatureCelsius
필드에 저장할 수 있습니다. 또한 컨텍스트화 데이터 세트의 ___location
필드를 사용하여 원본 데이터를 보강할 수도 있습니다.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"output": "temperatureCelsius",
"expression": "($1 - 32) * 5/9"
},
{
"inputs": [
"$context(assetDataset).___location" // - $2
],
"output": "___location"
}
]
}
}
$metadata.user_properties.<property>
또는 $metadata.topic
형식을 사용하여 MQTT 메타데이터 속성에 액세스할 수 있습니다. $metadata.<header>
형식을 사용하여 $metadata 헤더를 입력할 수도 있습니다. 자세한 내용은 필드 참조를 참조하세요.
예를 들어, 원본 데이터의 temperature
필드를 사용하여 온도를 섭씨로 변환하고 temperatureCelsius
필드에 저장할 수 있습니다. 또한 컨텍스트화 데이터 세트의 ___location
필드를 사용하여 원본 데이터를 보강할 수도 있습니다.
builtInTransformationSettings: {
map: [
{
inputs: [
'temperature'
]
output: 'temperatureCelsius'
expression: '($1 - 32) * 5/9'
}
{
inputs: [
'$context(assetDataset).___location'
]
output: '___location'
}
]
}
$metadata.user_properties.<property>
또는 $metadata.topic
형식을 사용하여 MQTT 메타데이터 속성에 액세스할 수 있습니다. $metadata.<header>
형식을 사용하여 $metadata 헤더를 입력할 수도 있습니다. 자세한 내용은 필드 참조를 참조하세요.
예를 들어, 원본 데이터의 temperature
필드를 사용하여 온도를 섭씨로 변환하고 temperatureCelsius
필드에 저장할 수 있습니다. 또한 컨텍스트화 데이터 세트의 ___location
필드를 사용하여 원본 데이터를 보강할 수도 있습니다.
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
expression: "($1 - 32) * 5/9"
output: temperatureCelsius
- inputs:
- $context(assetDataset).___location
output: ___location
자세한 내용은 데이터 흐름을 사용하여 데이터 매핑 및 데이터 흐름을 사용하여 데이터 변환을 참조하세요.
제거하십시오
기본적으로 모든 데이터 포인트는 출력 스키마에 포함됩니다. 제거 변환을 사용하여 대상에서 모든 데이터 포인트를 제거할 수 있습니다.
변환(선택 사항)에서 제거를 선택합니다.
출력 스키마에서 제거할 데이터 포인트를 선택합니다.
적용을 선택합니다.
출력 스키마에서 데이터 포인트를 제거하려면 데이터 흐름 구성에서 속성을 사용할 builtInTransformationSettings
수 있습니다. 이 map
속성은 제거할 데이터 포인트를 지정하는 데 사용됩니다.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"*"
],
"output": "*"
},
{
"inputs": [
"weight"
],
"output": ""
}
{
"inputs": [
"weight.SourceTimestamp"
],
"output": ""
},
{
"inputs": [
"weight.Value"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Code"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Symbol"
],
"output": ""
}
]
}
}
builtInTransformationSettings: {
map: [
{
inputs: [
'*'
]
output: '*'
}
{
inputs: [
'weight'
]
output: ''
}
{
inputs: [
'weight.SourceTimestamp'
]
output: ''
}
{
inputs: [
'weight.Value'
]
output: ''
}
{
inputs: [
'weight.StatusCode'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Code'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Symbol'
]
output: ''
}
]
}
builtInTransformationSettings:
map:
- type: PassThrough
inputs:
- "*"
output: "*"
- inputs:
- weight
output: ""
- inputs:
- weight.SourceTimestamp
output: ""
- inputs:
- weight.Value
output: ""
- inputs:
- weight.StatusCode
output: ""
- inputs:
- weight.StatusCode.Code
output: ""
- inputs:
- weight.StatusCode.Symbol
output: ""
자세한 내용은 데이터 흐름을 사용하여 데이터 매핑 및 데이터 흐름을 사용하여 데이터 변환을 참조하세요.
스키마에 따라 데이터 직렬화
대상으로 전송하기 전에 데이터를 직렬화하려면 스키마와 serialization 서식을 지정해야 합니다. 그렇지 않으면 데이터는 유추된 형식을 사용하여 JSON으로 직렬화됩니다. Microsoft Fabric이나 Azure Data Lake와 같은 스토리지 엔드포인트에는 데이터 일관성을 보장하기 위한 스키마가 필요합니다. 지원되는 serialization 형식은 Parquet와 델타입니다.
운영 환경을 위해 데이터 흐름 엔드포인트 세부 정보에서 스키마와 serialization 서식을 지정합니다. serialization 형식을 지원하는 엔드포인트는 Microsoft Fabric OneLake, Azure Data Lake Storage Gen 2, Azure Data Explorer 및 로컬 스토리지입니다. 예를 들어, 델타 형식으로 데이터를 직렬화하려면 스키마 레지스트리에 스키마를 업로드하고 데이터 흐름 대상 엔드포인트 구성에서 이를 참조해야 합니다.
스키마 레지스트리에 스키마를 업로드하면 데이터 흐름 구성에서 해당 스키마를 참조할 수 있습니다.
{
"builtInTransformationSettings": {
"serializationFormat": "Delta",
"schemaRef": "aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>"
}
}
스키마 레지스트리에 스키마를 업로드하면 데이터 흐름 구성에서 해당 스키마를 참조할 수 있습니다.
builtInTransformationSettings: {
serializationFormat: 'Delta'
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>'
}
스키마 레지스트리에 스키마를 업로드하면 데이터 흐름 구성에서 해당 스키마를 참조할 수 있습니다.
builtInTransformationSettings:
serializationFormat: Delta
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>'
스키마 레지스트리에 대한 자세한 내용은 메시지 스키마 이해를 참조하세요.
대상
데이터 흐름에 대한 대상을 구성하려면 엔드포인트 참조와 데이터 대상을 지정합니다. 엔드포인트에 대한 데이터 대상 목록을 지정할 수 있습니다.
로컬 MQTT 브로커가 아닌 다른 대상으로 데이터를 보내려면 데이터 흐름 엔드포인트를 만듭니다. 자세한 내용은 데이터 흐름 엔드포인트 구성을 참조하세요. 대상이 로컬 MQTT 브로커가 아닌 경우 원본으로 사용해야 합니다. 로컬 MQTT broker 엔드포인트를 사용하는 방법에 대한 자세한 내용은 데이터 흐름이 로컬 MQTT broker 엔드포인트를 사용해야 하므로 참조하세요.
중요합니다
스토리지 엔드포인트에는 serialization을 위한 스키마가 필요합니다. Microsoft Fabric OneLake, Azure Data Lake Storage, Azure Data Explorer 또는 Local Storage에서 데이터 흐름을 사용하려면 스키마 참조를 지정해야 합니다.
대상으로 사용할 데이터 흐름 엔드포인트를 선택합니다.
스토리지 엔드포인트에는 serialization을 위한 스키마가 필요합니다. Microsoft Fabric OneLake, Azure Data Lake Storage, Azure Data Explorer 또는 Local Storage 대상 엔드포인트를 선택하는 경우 스키마 참조를 지정해야 합니다. 예를 들어, 델타 형식으로 Microsoft Fabric 엔드포인트에 데이터를 직렬화하려면 스키마 레지스트리에 스키마를 업로드하고 데이터 흐름 대상 엔드포인트 구성에서 이를 참조해야 합니다.
계속을 선택하여 대상을 구성합니다.
데이터를 보낼 항목이나 테이블을 포함하여 대상에 필요한 설정을 입력합니다. 자세한 내용은 데이터 대상(항목, 컨테이너 또는 테이블) 구성을 참조하세요.
{
"destinationSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataDestination": "<TOPIC_OR_TABLE>" // See section on configuring data destination
}
}
destinationSettings: {
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
dataDestination: '<TOPIC_OR_TABLE>' // See section on configuring data destination
}
destinationSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataDestination: <TOPIC_OR_TABLE> # See section on configuring data destination
데이터 원본과 마찬가지로 데이터 대상은 여러 데이터 흐름에서 데이터 흐름 엔드포인트를 재사용 가능하게 유지하는 데 사용되는 개념입니다. 기본적으로 이는 데이터 흐름 엔드포인트 구성의 하위 디렉터리를 나타냅니다. 예를 들어, 데이터 흐름 엔드포인트가 스토리지 엔드포인트인 경우 데이터 대상은 스토리지 계정의 테이블입니다. 데이터 흐름 엔드포인트가 Kafka 엔드포인트인 경우, 데이터 대상은 Kafka 항목입니다.
엔드포인트 유형 |
데이터 대상 의미 |
설명 |
MQTT(또는 Event Grid) |
항목 |
데이터가 전송되는 MQTT 항목. 정적 항목만 지원되며 와일드카드는 사용할 수 없습니다. |
Kafka(또는 Event Hubs) |
항목 |
데이터가 전송되는 Kafka 항목. 정적 항목만 지원되며 와일드카드는 사용할 수 없습니다. 엔드포인트가 Event Hubs 네임스페이스인 경우, 데이터 대상은 네임스페이스 내의 개별 이벤트 허브입니다. |
Azure Data Lake Storage |
컨테이너 |
스토리지 계정의 컨테이너. 테이블이 아닙니다. |
Microsoft Fabric OneLake |
테이블 또는 폴더 |
구성된 엔드포인트의 경로 형식에 해당합니다. |
Azure Data Explorer(아주르 데이터 탐색기) |
테이블 |
Azure Data Explorer 데이터베이스의 테이블. |
로컬 스토리지 |
폴더 |
로컬 스토리지 영구 볼륨 탑재의 폴더 또는 디렉터리 이름. Azure Arc 클라우드 수집 에지 볼륨에서 사용하도록 설정된 Azure 컨테이너 스토리지를 사용하는 경우 이는 만든 하위 볼륨의 spec.path 매개 변수와 일치해야 합니다. |
데이터 대상을 구성하려면:
운영 환경을 사용할 때 데이터 대상 필드는 엔드포인트 형식에 따라 자동으로 해석됩니다. 예를 들어, 데이터 흐름 엔드포인트가 스토리지 엔드포인트인 경우 대상 세부 정보 페이지에서 컨테이너 이름을 입력하라는 메시지가 표시됩니다. 데이터 흐름 엔드포인트가 MQTT 엔드포인트인 경우 대상 세부 정보 페이지에서 항목 등을 입력하라는 메시지가 표시됩니다.
{
"destinationSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataDestination": "<TOPIC_OR_TABLE>" // See section on configuring data destination
}
}
예를 들어, 로컬 MQTT 브로커에 정적 MQTT 항목으로 데이터를 다시 보내려면 다음 구성을 사용합니다.
{
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "example-topic"
}
}
또는 사용자 지정 이벤트 허브 엔드포인트가 있는 경우 구성은 다음과 같습니다.
{
"destinationSettings": {
"endpointRef": "my-eh-endpoint",
"dataDestination": "individual-event-hub"
}
}
구문은 모든 데이터 흐름 엔드포인트에서 동일합니다.
destinationSettings: {
endpointRef: "<CUSTOM_ENDPOINT_NAME>"
dataDestination: '<TOPIC_OR_TABLE>'
}
예를 들어, 로컬 MQTT 브로커에 정적 MQTT 항목으로 데이터를 다시 보내려면 다음 구성을 사용합니다.
destinationSettings: {
endpointRef: 'default'
dataDestination: 'example-topic'
}
또는 사용자 지정 이벤트 허브 엔드포인트가 있는 경우 구성은 다음과 같습니다.
destinationSettings: {
endpointRef: 'my-eh-endpoint'
dataDestination: 'individual-event-hub'
}
스토리지 엔드포인트를 대상으로 사용하는 또 다른 예:
destinationSettings: {
endpointRef: 'my-adls-endpoint'
dataDestination: 'my-container'
}
구문은 모든 데이터 흐름 엔드포인트에서 동일합니다.
destinationSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataDestination: <TOPIC_OR_TABLE>
예를 들어, 로컬 MQTT 브로커에 정적 MQTT 항목으로 데이터를 다시 보내려면 다음 구성을 사용합니다.
destinationSettings:
endpointRef: default
dataDestination: example-topic
또는 사용자 지정 이벤트 허브 엔드포인트가 있는 경우 구성은 다음과 같습니다.
destinationSettings:
endpointRef: my-eh-endpoint
dataDestination: individual-event-hub
스토리지 엔드포인트를 대상으로 사용하는 또 다른 예:
destinationSettings:
endpointRef: my-adls-endpoint
dataDestination: my-container
예제
다음 예는 원본과 대상에 MQTT 엔드포인트를 사용하는 데이터 흐름 구성입니다. 원본은 MQTT 항목 azure-iot-operations/data/thermostat
에서 데이터를 필터링합니다. 이 변환은 온도를 화씨로 변환하고 온도에 습도를 곱한 값이 100000보다 작은 데이터를 필터링합니다. 대상은 MQTT 항목 factory
에 데이터를 보냅니다.
az iot ops dataflow apply 명령을 사용하여 데이터 흐름을 만들거나 변경합니다.
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
--config-file
매개 변수는 리소스 속성을 포함하는 JSON 구성 파일의 경로 및 파일 이름입니다.
이 예제에서는 사용자의 홈 디렉터리에 저장된 다음 콘텐츠로 명명 data-flow.json
된 구성 파일을 가정합니다.
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"builtInTransformationSettings": {
"datasets": [],
"filter": [
{
"expression": "$1 * $2 < 100000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value"
],
"type": "Filter"
}
],
"map": [
{
"inputs": [
"*"
],
"output": "*",
"type": "PassThrough"
},
{
"expression": "fToC($1)",
"inputs": [
"Temperature.Value"
],
"output": "TemperatureF",
"type": "Compute"
},
{
"inputs": [
"@\"Tag 10\".Value"
],
"output": "Humidity",
"type": "Rename"
}
],
"serializationFormat": "Json"
},
"operationType": "BuiltInTransformation"
},
{
"destinationSettings": {
"dataDestination": "factory",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
다음은 기본 데이터 흐름 프로필을 사용하여 데이터 흐름을 만들거나 업데이트하는 예제 명령입니다.
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
// Pointer to the default data flow endpoint
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// Use the default MQTT endpoint as the source
endpointRef: defaultDataflowEndpoint.name
// Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources: [
'azure-iot-operations/data/thermostat'
]
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// Filter the data where temperature * "Tag 10" < 100000
filter: [
{
inputs: [
'temperature.Value'
'"Tag 10".Value'
]
expression: '$1 * $2 < 100000'
}
]
map: [
// Passthrough all values by default
{
inputs: [
'*'
]
output: '*'
}
// Convert temperature to Fahrenheit and output it to TemperatureF
{
inputs: [
'temperature.Value'
]
output: 'TemperatureF'
expression: 'cToF($1)'
}
// Extract the "Tag 10" value and output it to Humidity
{
inputs: [
'"Tag 10".Value'
]
output: 'Humidity'
}
]
}
}
{
operationType: 'Destination'
destinationSettings: {
// Use the default MQTT endpoint as the destination
endpointRef: defaultDataflowEndpoint.name
// Send the data to the MQTT topic factory
dataDestination: 'factory'
}
}
]
}
}
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# Use the default MQTT endpoint as the source
endpointRef: default
# Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources:
- azure-iot-operations/data/thermostat
# Transformation optional
- operationType: builtInTransformation
builtInTransformationSettings:
# Filter the data where temperature * "Tag 10" < 100000
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: '$1 * $2 < 100000'
map:
# Passthrough all values by default
- inputs:
- '*'
output: '*'
# Convert temperature to Fahrenheit and output it to TemperatureF
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
# Extract the "Tag 10" value and output it to Humidity
- inputs:
- '"Tag 10".Value'
output: 'Humidity'
- operationType: Destination
destinationSettings:
# Use the default MQTT endpoint as the destination
endpointRef: default
# Send the data to the MQTT topic factory
dataDestination: factory
데이터 흐름 구성의 더 많은 예를 보려면 Azure REST API - 데이터 흐름 및 빠른 시작 Bicep을 참조하세요.
데이터 흐름이 작동하는지 확인
자습서: Azure Event Grid에 대한 양방향 MQTT 브리지에 따라 데이터 흐름이 작동하는지 확인합니다.
데이터 흐름 구성 내보내기
데이터 흐름 구성을 내보내려면 작업 환경을 사용하거나 데이터 흐름 사용자 지정 리소스를 내보낼 수 있습니다.
내보내려는 데이터 흐름을 선택하고 도구 모음에서 내보내기를 선택합니다.
az iot ops dataflow show 명령을 사용하여 데이터 흐름을 내보냅니다.
az iot ops dataflow show --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <DataflowName> --profile <DataflowProfileName> --output json > my-dataflow.json
다음은 이름이 JSON 파일로 명명된 data-flow
데이터 흐름을 내보내는 예제 명령입니다 data-flow.json
.
az iot ops dataflow show --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --output json > data-flow.json
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml
적절한 데이터 흐름 구성
데이터 흐름이 예상대로 작동하는지 확인하려면 다음 사항을 확인합니다.
다음 단계