次の方法で共有


Azure Event Hubs と Kafka データ フロー エンドポイントを構成する

重要

このページには、プレビュー段階にある Kubernetes デプロイ マニフェストを使用して Azure IoT Operations コンポーネントを管理する手順が含まれます。 この機能は、いくつかの制限を設けて提供されているため、運用環境のワークロードには使用しないでください。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加的使用条件」を参照してください。

Azure IoT Operations と Apache Kafka ブローカー間の双方向通信を設定するには、データ フロー エンドポイントを構成します。 この構成では、エンドポイント、トランスポート層セキュリティ (TLS)、認証、およびその他の設定を指定できます。

前提条件

Azure Event Hubs

Azure Event Hubs は Kafka プロトコルと互換であり、いくつかの制限付きでデータ フローとともに使用できます。

Azure Event Hubs 名前空間とイベント ハブを作成する

まず、Kafka 対応の Azure Event Hubs 名前空間を作成します

次に、名前空間にイベント ハブを作成します。 個々のイベント ハブは、Kafka トピックに対応します。 同じ名前空間に複数のイベント ハブを作成して、複数の Kafka トピックを表すことができます。

マネージド ID にアクセス許可を割り当てる

Azure Event Hubs のデータ フロー エンドポイントを構成するには、ユーザー割り当てまたはシステム割り当てのマネージド ID を使用することをお勧めします。 この方法は安全であり、認証情報を手動で管理する必要性をなくします。

Azure Event Hubs 名前空間とイベント ハブが作成されたら、イベント ハブにメッセージを送受信するアクセス許可を付与するロールを Azure IoT Operations マネージド ID に割り当てる必要があります。

システム割り当てマネージド ID を使用する場合は、Azure portal で、Azure IoT Operations インスタンスに移動し、[概要] を選択します。 Azure IoT Operations Arc 拡張機能の後に一覧表示されている拡張機能の名前をコピーします。 たとえば、azure-iot-operations-xxxx7 などです。 システム割り当てマネージド ID は、Azure IoT Operations Arc 拡張機能と同じ名前を使用して見つけることができます。

次に、Event Hubs 名前空間 >[アクセス制御 (IAM)]>[ロールの割り当ての追加] の順に移動します。

  1. [ ロール ] タブで、 Azure Event Hubs Data SenderAzure Event Hubs Data Receiverなどの適切なロールを選択します。 これにより、名前空間内のすべてのイベント ハブのメッセージを送受信するために必要なアクセス許可が、マネージド ID に付与されます。 詳細については、「Event Hubs リソースにアクセスするための Microsoft Entra ID を使用しアプリケーションを認証する」を参照してください。
  2. [メンバー] タブで次の操作を行います。
    1. システム割り当てマネージド ID を使用している場合、[アクセスの割り当て先][ユーザー、グループ、またはサービス プリンシパル] オプションを選択し、[+ メンバーの選択] を選択して、Azure IoT Operations Arc 拡張機能の名前を検索します。
    2. ユーザー割り当てマネージド ID を使用している場合、[アクセスの割り当て先][マネージド ID] オプションを選択し、[+ メンバーの選択] を選択して、クラウド接続用に設定されたユーザー割り当てマネージド ID を検索します。

Azure Event Hubs のデータ フロー エンドポイントを作成する

Azure Event Hubs 名前空間とイベント ハブを構成したら、Kafka 対応の Azure Event Hubs 名前空間のデータ フロー エンドポイントを作成できます。

  1. 操作エクスペリエンスで、[データ フロー エンドポイント] タブを選択します。

  2. [新しいデータ フロー エンドポイントの作成] で、[Azure Event Hubs]>[新規] を選択します。

    操作エクスペリエンスを使用して Azure Event Hubs データ フロー エンドポイントを作成しているスクリーンショット。

  3. エンドポイントに関する次の設定を入力します。

    設定 説明
    名前 データ フロー エンドポイントの名前。
    ホスト Event Hubs ホストのホスト名。 既存の Event Hubs ホストを検索するか、 <NAMESPACE>.servicebus.windows.net形式を使用してホスト名を手動で入力できます。
    港 / ポート Event Hubs ホストのポート。 Event Hubs の場合、ポートは 9093
    認証方法 認証に使用する方式。 [システム割り当てマネージド ID] または [ユーザー割り当てマネージド ID] を選択することをお勧めします。
  4. [適用] を選択してエンドポイントをプロビジョニングします。

Kafka トピックまたは個々のイベント ハブは、後でデータ フローの作成時に構成されます。 Kafka トピックは、データ フロー メッセージの宛先です。

Event Hubs への認証に接続文字列を使用する

重要

操作エクスペリエンス Web UI を使用してシークレットを管理するには、まず、安全な設定で Azure IoT Operations を有効にする必要があります。それには Azure Key Vault を構成し、ワークロード ID を有効にします。 詳細については、「Azure IoT Operations デプロイでの安全な設定を有効にする」を参照してください。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [基本] タブを選択してから、[認証方法]>[SASL] の順に選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
SASL の種類 [Plain] を選択します。
同期済みシークレットの名前 接続文字列を含む Kubernetes シークレットの名前を入力します。
ユーザー名参照またはトークン シークレット SASL 認証に使用されるユーザー名への参照またはトークン シークレット。 Key Vault の一覧から選択するか、新しく作成します。 値は $ConnectionString である必要があります。
トークン シークレットのパスワード参照 SASL 認証に使用されるパスワードへの参照またはトークン シークレット。 Key Vault の一覧から選択するか、新しく作成します。 値は Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> の形式でなければなりません。

[参照の追加] を選択した後、[新規作成] を選択した場合は、次の設定を入力します。

設定 説明
シークレット名 Azure Key Vault 内のシークレットの名前。 覚えやすい名前にして、後でリストからそのシークレットを選択できるようにしてください。
シークレット値 ユーザー名には「$ConnectionString」を入力します。 パスワードには接続文字列を Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> の形式で入力します。
アクティブ化する日を設定する オンにした場合、シークレットがアクティブになる日付。
有効期限を設定する オンにした場合、シークレットの有効期限が切れる日付。

シークレットの詳細については、「Azure IoT Operations でのシークレットの作成と管理」を参照してください。

制限事項

Azure Event Hubs は、Kafka がサポートするすべての圧縮タイプをサポートするわけではありません。 現在、Azure Event Hubs の Premium および Dedicated レベルでサポートされているのは GZIP 圧縮だけです。 他の圧縮の種類を使用すると、エラーが発生する可能性があります。

カスタム Kafka ブローカー

Event Hubs 以外の Kafka ブローカーのデータ フロー エンドポイントを構成するには、必要に応じてホスト、TLS、認証、その他を設定します。

  1. 操作エクスペリエンスで、[データ フロー エンドポイント] タブを選択します。

  2. [新しいデータ フロー エンドポイントの作成] で、[カスタム Kafka ブローカー]>[新規] を選択します。

    操作エクスペリエンスを使用して Kafka データ フロー エンドポイントを作成しているスクリーンショット。

  3. エンドポイントに関する次の設定を入力します。

    設定 説明
    名前 データ フロー エンドポイントの名前。
    ホスト 形式 <Kafka-broker-host>:xxxx の Kafka ブローカーのホスト名。 ホスト設定にポート番号を含めます。
    認証方法 認証に使用する方式。 [SASL] を選択します。
    SASL の種類 SASL 認証の種類。 [Plain]、[ScramSha256]、または [ScramSha512] を選びます。 [SASL] を使用する場合は必須です。
    同期済みシークレットの名前 シークレットの名前。 [SASL] を使用する場合は必須です。
    トークン シークレットのユーザー名参照 SASL トークン シークレット内のユーザー名への参照。 [SASL] を使用する場合は必須です。
  4. [適用] を選択してエンドポイントをプロビジョニングします。

現在、操作エクスペリエンスでは、ソースとしての Kafka データ フロー エンドポイントの使用がサポートされていません。 Kafka データ フロー エンドポイントをソースに使用するデータ フローは、Kubernetes または Bicep を使用して作成できます。

エンドポイント設定をカスタマイズする場合は、次のセクションで詳細を確認してください。

使用可能な認証方法

Kafka ブローカー データ フロー エンドポイントで使用できる認証方法は次のとおりです。

システム割り当てマネージド ID

データ フロー エンドポイントを構成する前に、Kafka ブローカーに接続するためのアクセス許可を付与する Azure IoT Operations マネージド ID に、ロールを割り当てます。

  1. Azure portal で、Azure IoT Operations インスタンスに移動し、[概要] を選択します。
  2. Azure IoT Operations Arc 拡張機能の後に一覧表示されている拡張機能の名前をコピーします。 たとえば、azure-iot-operations-xxxx7 などです。
  3. アクセス許可を付与する必要があるクラウド リソースに移動します。 たとえば、Event Hubs 名前空間 >[アクセス制御 (IAM)]>[ロールの割り当てを追加] の順に移動します。
  4. [ ロール ] タブで、適切なロールを選択します。
  5. [メンバー] タブの [アクセスの割り当て先][ユーザー、グループ、またはサービス プリンシパル] オプションを選択し、[+ メンバーの選択] を選択して、Azure IoT Operations マネージド ID を検索します。 たとえば、azure-iot-operations-xxxx7 などです。

次に、システム割り当てマネージド ID の設定を使用して、データ フロー エンドポイントを構成します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [基本] タブを選択してから、[認証方法]>[システム割り当てマネージド ID] の順に選択します。

この構成では、既定の対象ユーザーを使用してマネージド ID を作成します。これは、https://<NAMESPACE>.servicebus.windows.net の形式の Event Hubs 名前空間のホスト値と同じです。 ただし、既定の対象ユーザーをオーバーライドする必要がある場合は、audience フィールドを目的の値に設定できます。

操作エクスペリエンスではサポートされていません。

ユーザー割り当てマネージド ID

認証にユーザー割り当てマネージド ID を使用するには、まず、セキュリティで保護された設定を有効にして Azure IoT Operations を展開する必要があります。 次に、クラウド接続用にユーザー割り当てマネージド ID を設定する必要があります。 詳細については、「Azure IoT Operations デプロイでの安全な設定を有効にする」を参照してください。

データ フロー エンドポイントを構成する前に、Kafka ブローカーに接続するためのアクセス許可を付与するユーザー割り当てマネージド ID に、ロールを割り当てます。

  1. Azure portal で、アクセス許可を付与する必要があるクラウド リソースに移動します。 たとえば、[Event Grid 名前空間] >[アクセス制御 (IAM)]>[ロールの割り当てを追加] の順に移動します。
  2. [ ロール ] タブで、適切なロールを選択します。
  3. [メンバー] タブの [アクセスの割り当て先][マネージド ID] オプションを選択してから、[+ メンバーの選択] を選択して、ユーザー割り当てマネージド ID を検索します。

次に、ユーザー割り当てマネージド ID の設定を使用してデータ フロー エンドポイントを構成します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[基本] タブを選択してから、[認証方法]>[ユーザー割り当てマネージド ID] の順に選択します。

ここでは、マネージド ID の対象ユーザーがスコープされています。 既定値は、Event Hubs 名前空間のホスト値と同じで、形式は https://<NAMESPACE>.servicebus.windows.net です。 ただし、既定の対象ユーザーをオーバーライドする必要がある場合は、Bicep または Kubernetes を使用してスコープ フィールドを目的の値に設定できます。

SASL

認証に SASL を使用するには、SASL 認証方法を指定し、SASL の種類と、SASL トークンを含むシークレットの名前を持つシークレット参照を構成します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [基本] タブを選択してから、[認証方法]>[SASL] の順に選択します。

エンドポイントに関する次の設定を入力します。

設定 説明
SASL の種類 使用する SASL 認証の種類。 サポートされている型は、PlainScramSha256ScramSha512 です。
同期済みシークレットの名前 SASL トークンを含む Kubernetes シークレットの名前。
ユーザー名参照またはトークン シークレット SASL 認証に使用されるユーザー名への参照またはトークン シークレット。
トークン シークレットのパスワード参照 SASL 認証に使用されるパスワードへの参照またはトークン シークレット。

サポートされている SASL の種類は次のとおりです。

  • Plain
  • ScramSha256
  • ScramSha512

このシークレットは、Kafka データ フロー エンドポイントと同じ名前空間に存在する必要があります。 シークレットには、SASL トークンがキーと値のペアの形式で含まれている必要があります。

アノニマス

匿名認証を使用するには、Anonymous メソッドを使用するように Kafka 設定の認証セクションを更新します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [基本] タブを選択してから、[認証方法]>[なし] の順に選択します。

詳細設定

Kafka データ フロー エンドポイントでは、TLS、信頼されたCA 証明書、Kafka メッセージング設定、バッチ処理、CloudEvents など、の詳細な設定ができます。 これらは、データ フロー エンドポイントの [詳細] ポータル タブ、またはデータ フロー エンドポイント リソースの中で設定できます。

操作エクスペリエンスで、データ フロー エンドポイントの [詳細] タブを選択します。

操作エクスペリエンスを使用して Kafka データ フロー エンドポイントを詳細設定しているスクリーンショット。

TLS の設定

TLS のモード

Kafka エンドポイントの TLS を有効または無効にするには、TLS 設定の mode 設定を更新します。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択してから、[TLS モードが有効] の横にあるチェックボックスを設定します。

TLS モードは、Enabled または Disabled に設定できます。 モードが Enabled に設定されている場合、データ フローは Kafka ブローカーに対し安全な接続を使用します。 モードが Disabled に設定されている場合は、データ フローは Kafka ブローカーに対して安全ではない接続を使用します。

信頼された CA 証明書

Kafka ブローカーへの安全な接続を確立するように、Kafka エンドポイントのための信頼された CA 証明書を構成します。 Kafka ブローカーが自己署名証明書、または、既定では信頼されていないカスタムの CA によって署名された証明書を使用する場合には、この設定が重要です。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [詳細] タブを選択してから、[信頼された CA 証明書構成マップ] フィールドを使用して、信頼された CA 証明書を含む ConfigMap を指定します。

この ConfigMap には、PEM 形式の CA 証明書が含まれている必要があります。 この ConfigMap は、Kafka データ フロー リソースと同じ名前空間に存在する必要があります。 次に例を示します。

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

ヒント

Azure Event Hubs に接続する際には CA 証明書は必要ありません。Event Hubs サービスでは、既定で信頼されているパブリック CA によって署名された証明書を使用するためです。

コンシューマー グループ ID

コンシューマー グループ ID は、Kafka トピックからのメッセージの読み取りにデータ フローが使用する、コンシューマー グループを識別するために使用されます。 コンシューマー グループ ID は、Kafka ブローカー内で一意である必要があります。

重要

Kafka エンドポイントをソースとして使用する場合は、コンシューマー グループ ID が必要です。 それ以外の場合、データ フローでは Kafka トピックからのメッセージを読み取ることができず、"Kafka 型のソース エンドポイントには consumerGroupId が定義されている必要があります" というエラーが表示されます。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択してから、[コンシューマー グループ ID] フィールドを使用してコンシューマー グループ ID を指定します。

この設定は、エンドポイントがソース (つまり、データ フローがコンシューマー) として使用される場合にのみ有効になります。

圧縮

圧縮フィールドを使用すると、Kafka トピックに送信されるメッセージの圧縮が可能になります。 圧縮は、データ転送に必要なネットワーク帯域幅とストレージ領域を減らすのに役立ちます。 ただし、圧縮により、オーバーヘッドと待機時間もプロセスに追加されます。 サポートされている圧縮の種類を次の表に示します。

説明
None 圧縮またはバッチ処理は適用されません。 compression が指定されていない場合、既定値は None です。
Gzip GZIP 圧縮とバッチ処理が適用されます。 GZIP は、圧縮率と速度のバランスが良い汎用的な圧縮アルゴリズムです。 現在、Azure Event Hubs の Premium および Dedicated レベルでサポートされているのは GZIP 圧縮だけです。
Snappy Snappy 圧縮とバッチ処理が適用されます。 Snappy は、圧縮率が中程度で速度も良好な、高速圧縮アルゴリズムです。 この圧縮モードは、Azure Event Hubs ではサポートされていません。
Lz4 LZ4 圧縮とバッチ処理が適用されます。 LZ4は、低圧縮で高いスピードを提供する高速圧縮アルゴリズムです。 この圧縮モードは、Azure Event Hubs ではサポートされていません。

圧縮を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで、[詳細] タブを選択してから、[圧縮] フィールドを使用して圧縮の種類を指定します。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

バッチ処理

圧縮とは別に、Kafka トピックに送信する前の、メッセージのバッチ処理を構成することもできます。 バッチ処理を使用すると、複数のメッセージをグループ化して 1 つの単位として圧縮できるため、圧縮効率が向上し、ネットワークのオーバーヘッドが削減されます。

フィールド 説明 必須
mode Enabled または Disabled を指定できます。 Kafka には "バッチ処理されない" メッセージングの概念がないため、既定値は Enabled です。 Disabled に設定すると、バッチ処理が最小化され、毎回 1 つのメッセージでバッチが作成されます。 いいえ
latencyMs 送信前にメッセージをバッファリングできる最大の時間間隔 (ミリ秒単位)。 この間隔に達した場合、バッファー内のすべてのメッセージは、メッセージの数や大きさに関係なく、バッチとして送信されます。 設定しない場合、既定値は 5 です。 いいえ
maxMessages 送信前にバッファーに格納できるメッセージの最大数。 この数に達すると、バッファーに格納されているメッセージの大きさやバッファーの時間に関係なく、バッファー内のすべてのメッセージがバッチとして送信されます。 設定しない場合の既定値は 100,000 です。 いいえ
maxBytes 送信前にバッファーに格納できる最大サイズ (バイト単位)。 このサイズに達すると、バッファーに格納されているメッセージの数やバッファリングの時間に関係なく、バッファー内のすべてのメッセージがバッチとして送信されます。 既定値は 1,000,000 (1 MB) です。 いいえ

たとえば、latencyMs を 1000 に、maxMessages を 100 に、maxBytes を 1024 に設定した場合、バッファーのメッセージが 100 件になるか、バッファーのバイト数が 1,024 になるか、最後の送信から 1,000 ミリ秒が経過するかのいずれかが最初に発生したタイミングで、メッセージが送信されます。

バッチ処理を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [詳細] タブを選択してから、[バッチ処理が有効] フィールドを使用してバッチ処理を有効にします。 [バッチ処理の待機時間][最大バイト数][メッセージ数] フィールドを使用してバッチ処理の設定を指定します。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

パーティション処理戦略

パーティション処理戦略は、Kafka トピックに送信する際にメッセージをどのように Kafka パーティションに割り当てるかを制御します。 Kafka パーティションは、並列処理とフォールト トレランスを可能にする Kafka トピックの論理セグメントです。 Kafka トピック内の各メッセージには、メッセージの識別と順序付けに使用されるパーティションとオフセットがあります。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

既定では、データ フローはラウンド ロビン アルゴリズムを使用して、ランダムなパーティションにメッセージを割り当てます。 ただし、MQTT トピック名や MQTT メッセージ プロパティなど、いくつかの条件に基づいてパーティションにメッセージを割り当てるために使用できるさまざまな戦略があります。 これは、負荷分散、データの局所性、またはメッセージの順序付けを改善するのに役立ちます。

説明
Default ラウンド ロビン アルゴリズムを使用して、ランダム パーティションにメッセージを割り当てます。 これは、戦略が指定されていない場合の既定値です。
Static データ フローのインスタンス ID から派生した固定のパーティション番号にメッセージを割り当てます。 これは、各データ フロー インスタンスが異なるパーティションにメッセージを送信することを意味します。 これにより、負荷分散とデータの局所性が向上します。
Topic パーティション分割のキーとして、データ フロー ソースからの MQTT トピック名を使用します。 これは、MQTT トピック名が同じメッセージは、同じパーティションに送信されることを意味します。 これは、メッセージの順序付けとデータの局所性の向上に役立ちます。
Property パーティション分割のキーとして、データ フロー ソースからの MQTT メッセージ プロパティを使用します。 プロパティの名前は、partitionKeyProperty フィールドで指定します。 これは、同じプロパティ値を持つメッセージが同じパーティションに送信されることを意味します。 これにより、カスタムの条件に基づいてメッセージの順序付けとデータの局所性を向上させることができます。

たとえば、パーティション処理戦略を Property に設定し、パーティション キー プロパティを device-id に設定した場合、同じ device-id プロパティを持つメッセージは同じパーティションに送信されます。

パーティション処理戦略を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [詳細] タブを選択してから、[パーティション処理戦略] フィールドを使用してパーティション処理戦略を指定します。 戦略が に設定されている場合は、Property フィールドを使用して、パーティション化に使用するプロパティを指定します。

Kafka の受信確認

Kafka 受信確認 (ACK) は、Kafka トピックに送信されるメッセージの持続性と一貫性を制御するために使用されます。 プロデューサーは、Kafka トピックにメッセージを送信するときに、メッセージがトピックに正常に書き込まれ、Kafka クラスター全体にレプリケートされたことを確認するために、Kafka ブローカーにさまざまなレベルの受信確認を要求できます。

この設定は、エンドポイントが宛先 (つまり、データ フローがプロデューサー) として使用される場合にのみ有効になります。

説明
None データ フローは、Kafka ブローカーからの受信確認を待機しません。 この設定は最も高速ですが、最も持続的ではないオプションです。
All データ フローは、メッセージがリーダー パーティションとすべてのフォロワー パーティションに書き込まれるのを待機します。 この設定は最も低速ですが、最も持続的なオプションです。 この設定は既定のオプションでもあります
One データ フローは、メッセージがリーダー パーティションと少なくとも 1 個のフォロワー パーティションに書き込まれるのを待機します。
Zero データ フローは、メッセージがリーダー パーティションに書き込まれるのを待機しますが、フォロワーからの受信確認を待機しません。 これは One よりも高速ですが、持続性は低くなります。

たとえば、Kafka の受信確認を All に設定した場合、データ フローは、メッセージがリーダー パーティションとすべてのフォロワー パーティションに書き込まれるまで待機してから、次のメッセージを送信します。

Kafka の受信確認を構成するには:

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [詳細] タブを選択してから、[Kafka 受信確認] フィールドを使用して Kafka 受信確認レベルを指定します。

この設定は、エンドポイントが宛先 (データ フローはプロデューサー) として使用される場合にのみ有効になります。

MQTT プロパティのコピー

既定では、MQTT プロパティのコピー設定は有効になっています。 これらのユーザー プロパティには、メッセージを送信する資産の名前を格納する subject などの値が含まれます。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [詳細] タブを選択してから、[MQTT プロパティのコピー] フィールドの横にあるチェックボックスを使用して、MQTT プロパティのコピーを有効または無効にします。

以降のセクションでは、設定が有効になっている場合に、MQTT プロパティを Kafka ユーザー ヘッダーに変換する方法と、その逆を行う方法について説明します。

Kafka エンドポイントが宛先である

Kafka エンドポイントがデータ フローの宛先である場合には、MQTT v5 仕様に定義されているすべてのプロパティが、Kafka ユーザー ヘッダーに変換されます。 たとえば、Kafka に転送される MQTT v5 メッセージに "Content Type" が指定されている場合、Kafka のユーザー ヘッダー"Content Type":{specifiedValue} に変換されます。 次の表に定義されている他の組み込み MQTT プロパティにも、同様の規則が適用されます。

MQTT プロパティ 変換動作
ペイロード形式インジケーター キー: "Payload Format Indicator"
値: "0" (ペイロードはバイト) または "1" (ペイロードは UTF-8)
応答トピック キー: "Response Topic"
値: 元のメッセージからの応答トピックのコピー。
メッセージの有効期限の間隔 キー: "Message Expiry Interval"
値: メッセージの有効期限が切れるまでの秒数の UTF-8 表現。 詳細については、「Message Expiry Interval プロパティ」を参照してください。
関連付けデータ: キー: "Correlation Data"
値: 元のメッセージからの相関関係データのコピー。 UTF-8 でエンコードされた多くの MQTT v5 プロパティとは異なり、相関関係データは無作為のデータになる可能性があります。
コンテンツ タイプ: キー: "Content Type"
値: 元のメッセージからの Content Type のコピー。

MQTT v5 ユーザー プロパティのキーと値のペアは、Kafka のユーザー ヘッダーに直接変換されます。 メッセージ内のユーザー ヘッダーに、組み込みの MQTT プロパティと同じ名前 (たとえば、"Correlation Data" という名前のユーザー ヘッダー) がある場合、MQTT v5 仕様プロパティ値を転送するか、それ以外ではユーザー プロパティが未定義になります。

データ フローが、MQTT ブローカーからこれらのプロパティを受け取ることはありません。 このため、データ フローで次の項目は転送されません。

  • トピックの別名
  • サブスクリプション識別子
Message Expiry Interval プロパティ

Message Expiry Interval は、メッセージが破棄されるまで MQTT ブローカー内に留まれる長さを指定します。

Message Expiry Interval が指定された MQTT メッセージをデータ フローが受信すると、次のようになります。

  • メッセージが受信された日時を記録します。
  • メッセージが宛先に出力される前に、元の有効期限間隔の時間から、メッセージがキューに入れられてからの時間が減算されます。
  • メッセージの有効期限が切れていない (上記の演算結果が > 0 である) 場合、メッセージは、更新された Message Expiry Time を含めて宛先に出力されます。
  • メッセージの有効期限が切れている (上記の演算結果が <= 0 である) 場合、メッセージはターゲットによって出力されません。

例 :

  • データ フローは、Message Expiry Interval = 3,600 秒の MQTT メッセージをを受信します。 対応する宛先は一時的に切断されていますが、再接続が可能です。 この MQTT メッセージがターゲットに送信されるまでに 1,000 秒が経過します。 この場合、宛先のメッセージの Message Expiry Interval は 2,600 (3,600 - 1,000) 秒に設定されます。
  • データ フローは、Message Expiry Interval = 3,600 秒の MQTT メッセージを受信します。 対応する宛先は一時的に切断されていますが、再接続が可能です。 ただし、この場合、再接続に 4,000 秒かかります。 メッセージの有効期限が切れるので、データ フローはこのメッセージを宛先に転送しません。

Kafka エンドポイントがデータ フロー ソースである

Event Hubs エンドポイントをデータ フロー ソースとして使用する場合に、MQTT に変換された Kafka ヘッダーが破損するという既知の問題があります。 これは、内部で AMQP を使用する Event Hubs クライアント経由で Event Hubs を使用する場合にのみ発生します。 たとえば、"foo"="bar" の場合、"foo" は変換されますが、値は "\xa1\x03bar" になります。

Kafka エンドポイントがデータ フロー ソースである場合は、Kafka のユーザー ヘッダーは MQTT v5 プロパティに変換されます。 次の表で、Kafka のユーザー ヘッダーが MQTT v5 プロパティにどのように変換されるかを説明します。

Kafka ヘッダー 変換動作
キー キー: "Key"
値: 元のメッセージからの Key のコピー。
タイムスタンプ キー: "Timestamp"
値: Kafka Timestamp の UTF-8 エンコード。これは、Unix エポック以降のミリ秒数です。

Kafka のユーザー ヘッダーのキーと値のペア (すべて UTF-8 でエンコードされている場合) は、MQTT のユーザー キーと値のプロパティに直接変換されます。

UTF-8/バイナリの不一致

MQTT v5 では、UTF-8 ベースのプロパティのみをサポートできます。 データ フローが UTF-8 以外のヘッダーを 1 つ以上含む Kafka メッセージを受信した場合、そのデータ フローは次を行います。

  • 問題のあるプロパティを削除します。
  • 前の規則に従って、メッセージの残りの部分を転送します。

Kafka のソース ヘッダーをバイナリ転送する必要があるアプリケーション => MQTT ターゲット プロパティは、それらを最初に UTF-8 で (たとえば、Base64 経由で) エンコードする必要があります。

64KB 以上のプロパティという不一致

MQTT v5 プロパティは 64 KB 未満である必要があります。 データ フローが> = 64KB のヘッダーを 1 つ以上含む Kafka メッセージを受信した場合、そのデータ フローは次を行います。

  • 問題のあるプロパティを削除します。
  • 前の規則に従って、メッセージの残りの部分を転送します。
Event Hubs と AMQP を使用するプロデューサーを使用する場合のプロパティ変換

次のいずれかのアクションを実行している Kafka データ フロー ソース エンドポイントに、クライアントがメッセージを転送している場合。

  • Azure.Messaging.EventHubs などのクライアント ライブラリを使用した Event Hubs へのメッセージ送信
  • AMQP を直接使用

注意する必要があるプロパティ変換の微妙な違いがあります。

次のいずれかを実行する必要があります。

  • プロパティの送信を避ける。
  • プロパティを送信する必要がある場合は、UTF-8 としてエンコードされた値を送信する。

Event Hubs は、プロパティを AMQP から Kafka に変換するときに、基盤の AMQP エンコード型をメッセージに含めます。 動作の詳細については、「異なるプロトコルを使用してコンシューマーとプロデューサー間でイベントを交換する」を参照してください。

次のコード例では、データ フロー エンドポイントは値 "foo":"bar" を受け取る際に、<0xA1 0x03 "bar"> としてプロパティを受け取ります。

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

データが UTF-8 ではないため、データ フロー エンドポイントは、ペイロード プロパティ <0xA1 0x03 "bar"> を MQTT メッセージに転送できません。 ただし、UTF-8 文字列が指定されていれば、データ フロー エンドポイントは MQTT に送信する前に文字列を変換します。 UTF-8 文字列を使用すると、MQTT メッセージにユーザー プロパティとして "foo":"bar" が含められます。

UTF-8 ヘッダーのみが変換されます。 たとえば、プロパティが float として設定されている次のシナリオを考えます。

Properties = 
{
  {"float-value", 11.9 },
}

データ フロー エンドポイントは、"float-value" フィールドを含むパケットを破棄します。

propertyEventData.correlationId を含むすべてのイベント データ プロパティが、転送されないわけではありません。 詳細については、「イベントのユーザー プロパティ」を参照してください。

CloudEvents

CloudEvents は、イベント データを一般的な方法で記述する方法です。 CloudEvents の設定は、CloudEvents 形式でメッセージを送受信するために使われます。 CloudEvents は、同じ、または異なるクラウド プロバイダー内にある異なるサービスが相互に通信する必要があるイベント駆動型アーキテクチャに使用できます。

CloudEventAttributes オプションは Propagate または CreateOrRemap です。

操作エクスペリエンスのデータ フロー エンドポイント設定ページで [詳細] タブを選択してから、[クラウド イベント属性] フィールドを使用して CloudEvents 設定を指定します。

以下のセクションでは、CloudEvent プロパティがどのように伝達または作成され、再マップされるかについて説明します。

設定の伝達

CloudEvent プロパティは、必要なプロパティを含むメッセージではパススルーされます。 メッセージに必要なプロパティが含まれていない場合、メッセージはそのまま通過します。 必要なプロパティが存在する場合は、ce_ プレフィックスが CloudEvent プロパティ名に追加されます。

名前 必須 サンプル値 出力名 出力値
specversion はい 1.0 ce-specversion そのまま渡されます
type はい ms.aio.telemetry ce-type そのまま渡されます
source はい aio://mycluster/myoven ce-source そのまま渡されます
id はい A234-1234-1234 ce-id そのまま渡されます
subject いいえ aio/myoven/sensor/temperature ce-subject そのまま渡されます
time いいえ 2018-04-05T17:31:00Z ce-time そのまま渡されます。 再スタンプされません。
datacontenttype いいえ application/json ce-datacontenttype オプションの変換ステージの後で、出力データのコンテンツ タイプに変更されます。
dataschema いいえ sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 変換構成に出力データ変換スキーマが指定されている場合、dataschema は出力スキーマに変更されます。

CreateOrRemap の設定

CloudEvent プロパティは、必要なプロパティを含むメッセージではパススルーされます。 メッセージに必要なプロパティが含まれていない場合、プロパティが生成されます。

名前 必須 出力名 不足している場合に生成される値
specversion はい ce-specversion 1.0
type はい ce-type ms.aio-dataflow.telemetry
source はい ce-source aio://<target-name>
id はい ce-id ターゲット クライアントで生成された UUID
subject いいえ ce-subject メッセージが送信される出力トピック
time いいえ ce-time ターゲット クライアントで RFC 3339 として生成
datacontenttype いいえ ce-datacontenttype オプションの変換ステージの後で、出力データのコンテンツ タイプに変更
dataschema いいえ ce-dataschema スキーマ レジストリで定義されているスキーマ

次のステップ

データ フローの詳細については、「データ フローの作成」を参照してください。