次の方法で共有


Spring Cloud Azure での Spring Cloud Stream のサポート

Spring Cloud Stream は、共有メッセージング システムに接続された高度にスケーラブルなイベント ドリブン マイクロサービスを構築するためのフレームワークです。

このフレームワークは、既に確立されており、使い慣れた Spring のイディオムとベスト プラクティスに基づいて構築された柔軟なプログラミング モデルを提供します。 これらのベスト プラクティスには、永続的な pub/sub セマンティクス、コンシューマー グループ、ステートフル パーティションのサポートが含まれます。

現在のバインダーの実装は次のとおりです。

  • - 詳細については、Azure Event Hubs の Spring Cloud Stream Binder の に関するページを参照してください。
  • spring-cloud-azure-stream-binder-servicebus - 詳細については、「Spring Cloud Stream Binder for Azure Service Bus」を参照してください。

Azure Event Hubs 用 Spring Cloud Stream Binder

主な概念

Azure Event Hubs 用の Spring Cloud Stream Binder は、Spring Cloud Stream フレームワークのバインド実装を提供します。 この実装では、Spring Integration Event Hubs チャネル アダプターを基盤として使用します。 デザインの観点からは、Event Hubs は Kafka に似ています。 また、Event Hubs には Kafka API 経由でアクセスできます。 プロジェクトが Kafka API に厳密に依存している場合は、Kafka API サンプル を使用 Events Hub を試すことができます

コンシューマー グループ

Event Hubs では、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 Kafka はコミットされたすべてのオフセットをブローカーに格納しますが、手動で処理される Event Hubs メッセージのオフセットを格納する必要があります。 Event Hubs SDK は、このようなオフセットを Azure Storage 内に格納する関数を提供します。

パーティション分割のサポート

Event Hubs には、Kafka と同様の物理パーティションの概念が用意されています。 ただし、Kafka のコンシューマーとパーティション間の自動再調整とは異なり、Event Hubs には一種のプリエンプティブ モードが用意されています。 ストレージ アカウントは、どのコンシューマーがどのパーティションを所有するかを判断するためのリースとして機能します。 新しいコンシューマーが起動すると、ワークロードバランスを実現するために、最も負荷の高いコンシューマーからいくつかのパーティションを盗もうとします。

負荷分散戦略を指定するために、spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* のプロパティが提供されます。 詳細については、「コンシューマーのプロパティの」セクションを参照してください。

Batch コンシューマーのサポート

Spring Cloud Azure Stream Event Hubs バインダーは、Spring Cloud Stream Batch コンシューマー機能 をサポートします。

バッチ コンシューマー モードを使用するには、spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode プロパティを trueに設定します。 有効にすると、バッチ処理されたイベントの一覧のペイロードを含むメッセージが受信され、Consumer 関数に渡されます。 各メッセージ ヘッダーもリストに変換され、コンテンツは各イベントから解析された関連ヘッダー値です。 イベントのバッチ全体が同じ値を共有するため、パーティション ID、チェックポイント、および最後にエンキューされたプロパティの共同ヘッダーが 1 つの値として表示されます。 詳細については、「Spring Integrationの Spring Cloud Azure サポート Event Hubs メッセージ ヘッダー」セクションを参照してください。

手記

チェックポイント ヘッダーは、MANUAL チェックポイント モードが使用されている場合にのみ存在します。

バッチ コンシューマーのチェックポイント処理では、BATCHMANUALの 2 つのモードがサポートされます。 BATCH モードは、バインダーが受信したイベントのバッチ全体を一緒にチェックポイント処理する自動チェックポイント モードです。 MANUAL モードでは、ユーザーがイベントをチェックポイント処理します。 使用すると、Checkpointer がメッセージ ヘッダーに渡され、ユーザーはそれを使用してチェックポイント処理を実行できます。

バッチ サイズを指定するには、プレフィックスが max-sizemax-wait-time プロパティと spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. プロパティを設定します。 max-size プロパティが必要であり、max-wait-time プロパティは省略可能です。 詳細については、「コンシューマーのプロパティの」セクションを参照してください。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

または、Maven の次の例に示すように、Spring Cloud Azure Stream Event Hubs Starter を使用することもできます。

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

構成

バインダーは、構成オプションの次の 3 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Event Hubs への接続に使用される構成オプションが含まれています。

手記

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。

spring-cloud-azure-stream-binder-eventhubs の接続構成可能なプロパティ:

財産 種類 形容
spring.cloud.azure.eventhubs.enabled ブーリアン Azure Event Hubs が有効になっているかどうか。
spring.cloud.azure.eventhubs.connection-string Event Hubs 名前空間の接続文字列の値。
spring.cloud.azure.eventhubs.namespace を する Event Hubs 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.eventhubs .___domain-name Azure Event Hubs 名前空間の値のドメイン名。
spring.cloud.azure.eventhubs.custom-endpoint-address を する カスタム エンドポイント アドレス。

先端

一般的な Azure Service SDK 構成オプションは、Spring Cloud Azure Stream Event Hubs バインダーでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure. または spring.cloud.azure.eventhubs.のプレフィックスで構成できます。

バインダーでは、Spring Could Azure Resource Manager も既定でサポートされています。 関連ロールで付与されていないセキュリティ プリンシパルを含む接続文字列を取得する方法については、「Spring Could Azure Resource Manager」の「Basic usage」セクション 参照してください。

チェックポイント構成プロパティ

このセクションには、パーティションの所有権とチェックポイント情報を保持するために使用されるストレージ BLOB サービスの構成オプションが含まれています。

手記

バージョン 4.0.0 以降、spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-if-exists のプロパティが手動で有効になっていない場合、spring.cloud.stream.bindings.bindings.binding-name.destination の名前を持つストレージ コンテナーは自動的に作成されません。

spring-cloud-azure-stream-binder-eventhubs の構成可能なプロパティのチェックポイント処理:

財産 種類 形容
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists ブーリアン コンテナーが存在しない場合に作成を許可するかどうか。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name を する ストレージ アカウントの名前。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key ストレージ アカウントのアクセス キー。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name ストレージ コンテナー名。

先端

Azure Service SDK の一般的な構成オプションは、ストレージ BLOB チェックポイント ストアでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure. または spring.cloud.azure.eventhubs.processor.checkpoint-storeのプレフィックスで構成できます。

Azure Event Hubs のバインド構成プロパティ

次のオプションは、コンシューマー プロパティ、高度なコンシューマー構成、プロデューサーのプロパティ、および高度なプロデューサー構成の 4 つのセクションに分かれています。

コンシューマーのプロパティ

これらのプロパティは、EventHubsConsumerPropertiesを介して公開されます。

手記

繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Event Hubs では、spring.cloud.stream.eventhubs.default.consumer.<property>=<value>の形式ですべてのチャネルの値の設定がサポートされています。

spring-cloud-azure-stream-binder-eventhubs のコンシューマーが構成可能なプロパティ:

財産 種類 形容
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode を する チェックポイントモード コンシューマーがメッセージをチェックポイント処理する方法を決定するときに使用されるチェックポイント モード
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count 整数 1 つのチェックポイントを実行する各パーティションのメッセージの量を決定します。 チェックポイント モード PARTITION_COUNT 使用されている場合にのみ有効になります。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval 期間 1 つのチェックポイントを実行する時間間隔を決定します。 チェックポイント モード TIME 使用されている場合にのみ有効になります。
spring.cloud.stream.eventhubs.bindings を します。<binding-name.consumer.batch.max-size 整数 バッチ内のイベントの最大数。 バッチ コンシューマー モードに必要です。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time を する 期間 バッチ処理の最大時間。 バッチ コンシューマー モードが有効でオプションの場合にのみ有効になります。
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.load-balancing.update-interval 期間 更新の間隔の期間。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy を する LoadBalancing戦略 負荷分散戦略。
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval 期間 パーティションの所有権が期限切れになるまでの期間。
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.track-last-enqueued-event-properties ブーリアン イベント プロセッサが、関連付けられているパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうか。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count 整数 イベント ハブ コンシューマーがアクティブに受信し、ローカルでキューに登録するイベントの数を制御するためにコンシューマーによって使用される数。
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.initial-partition-event-position キーをパーティション ID としてマップし、StartPositionProperties の値を指定します。 パーティションのチェックポイントがチェックポイント ストアに存在しない場合に、各パーティションに使用するイベント位置を含むマップ。 このマップは、パーティション ID からキーオフされます。

手記

initial-partition-event-position 構成では、各イベント ハブの初期位置を指定する map を受け入れます。 したがって、そのキーはパーティション ID であり、値は StartPositionPropertiesです。これには、オフセット、シーケンス番号、エンキューされた日付時刻、包括性のプロパティが含まれます。 たとえば、次のように設定できます。

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
高度なコンシューマー構成

上記の 接続チェックポイント、および一般的な Azure SDK クライアント 構成では、各バインダー コンシューマーのカスタマイズがサポート 。これはプレフィックス で構成できます。

プロデューサーのプロパティ

これらのプロパティは、EventHubsProducerPropertiesを介して公開されます。

手記

繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Event Hubs では、spring.cloud.stream.eventhubs.default.producer.<property>=<value>の形式ですべてのチャネルの値の設定がサポートされています。

spring-cloud-azure-stream-binder-eventhubs のプロデューサー構成可能なプロパティ:

財産 種類 形容
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync ブーリアン プロデューサーの同期用のスイッチ フラグ。 true の場合、プロデューサーは送信操作の後に応答を待機します。
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout 長い 送信操作の後に応答を待機する時間。 同期プロデューサーが有効になっている場合にのみ有効になります。
高度なプロデューサー構成

上記の 接続一般的な Azure SDK クライアント 構成では、各バインダー プロデューサーのカスタマイズがサポートされています。これは、プレフィックス spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.で構成できます。

基本的な使用方法

Event Hubs との間でのメッセージの送受信

  1. 構成オプションに資格情報を入力します。

    • 接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

      手記

      Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。

    • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. サプライヤーとコンシューマーを定義します。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

パーティション分割のサポート

送信するメッセージに関するパーティション情報を構成するために、ユーザー指定のパーティション情報を含む PartitionSupplier が作成されます。 次のフローチャートは、パーティション ID とキーのさまざまな優先順位を取得するプロセスを示しています。

パーティション分割サポート プロセスのフローチャートを示す図。

Batch コンシューマーのサポート

  1. 次の例に示すように、バッチ構成オプションを指定します。

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. サプライヤーとコンシューマーを定義します。

    BATCHとしてのチェックポイント モードの場合は、次のコードを使用してメッセージを送信し、バッチで使用できます。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    MANUALとしてのチェックポイント モードの場合は、次のコードを使用してメッセージを送信し、バッチで使用/チェックポイント処理できます。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

手記

バッチ使用モードでは、Spring Cloud Stream バインダーの既定のコンテンツ タイプが application/jsonされるため、メッセージ ペイロードがコンテンツ タイプと一致していることを確認します。 たとえば、application/json の既定のコンテンツ タイプを使用して String ペイロードを持つメッセージを受信する場合、ペイロードは元の JSON String テキストの二重引用符で囲まれた Stringする必要があります。 text/plain コンテンツ タイプの場合は、String オブジェクトを直接指定できます。 詳細については、「Spring Cloud Stream コンテンツ タイプ ネゴシエーションを する」を参照してください。

エラー メッセージの処理

  • 送信バインドのエラー メッセージを処理する

    既定では、Spring Integration は errorChannelというグローバル エラー チャネルを作成します。 送信バインディング エラー メッセージを処理するように、次のメッセージ エンドポイントを構成します。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 受信バインディング のエラー メッセージを処理する

    Spring Cloud Stream Event Hubs Binder では、受信メッセージ バインドのエラーを処理する 1 つのソリューションであるエラー ハンドラーがサポートされています。

    エラー ハンドラーの:

    Spring Cloud Stream では、Consumer インスタンスを受け入れる ErrorMessage を追加することで、カスタム エラー ハンドラーを提供するためのメカニズムが公開されています。 詳細については、「Spring Cloud Stream のドキュメント エラー メッセージ を処理する」を参照してください。

    • バインドの既定のエラー ハンドラー

      1 つの Consumer Bean を構成して、すべてのインバウンド・バインディング・エラー・メッセージを使用します。 次の既定の関数は、各受信バインディング エラー チャネルをサブスクライブします。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、spring.cloud.stream.default.error-handler-definition プロパティを関数名に設定する必要もあります。

    • バインド固有のエラー ハンドラー

      特定のインバウンド・バインディング・エラー・メッセージを使用するように Consumer Bean を構成します。 次の関数は、特定の受信バインディング エラー チャネルをサブスクライブし、バインディングの既定のエラー ハンドラーよりも高い優先順位を持ちます。

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition プロパティを関数名に設定する必要もあります。

Event Hubs メッセージ ヘッダー

サポートされる基本的なメッセージ ヘッダーについては、「Spring Integrationの Spring Cloud Azure サポート Event Hubs メッセージ ヘッダー 」セクションを参照してください。

複数バインダーのサポート

複数のバインダーを使用して、複数の Event Hubs 名前空間への接続もサポートされます。 このサンプルでは、接続文字列を例として受け取ります。 サービス プリンシパルとマネージド ID の資格情報もサポートされています。 各バインダーの環境設定で、関連するプロパティを設定できます。

  1. Event Hubs で複数のバインダーを使用するには、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    手記

    前のアプリケーション ファイルは、すべてのバインドに対してアプリケーションの 1 つの既定のポーリングツールを構成する方法を示しています。 特定のバインディングに対して poller を構成する場合は、spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000などの構成を使用できます。

    手記

    Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。

  2. 2 つのサプライヤーと 2 つのコンシューマーを定義する必要があります。

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

リソースのプロビジョニング

Event Hubs バインダーは、イベント ハブとコンシューマー グループのプロビジョニングをサポートしています。ユーザーは次のプロパティを使用してプロビジョニングを有効にすることができます。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリ を参照してください。

Azure Service Bus 用 Spring Cloud Stream Binder

主な概念

Azure Service Bus 用の Spring Cloud Stream Binder は、Spring Cloud Stream Framework のバインド実装を提供します。 この実装では、Spring Integration Service Bus チャネル アダプターを基盤として使用します。

スケジュールされたメッセージ

このバインダーは、遅延処理のためにトピックにメッセージを送信することをサポートします。 ユーザーは、ヘッダー x-delay メッセージの遅延時間をミリ秒単位で表すスケジュールされたメッセージを送信できます。 メッセージは、x-delay ミリ秒後にそれぞれのトピックに配信されます。

コンシューマー グループ

Service Bus トピックでは、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 このバインダーは、コンシューマー グループとして機能するトピックの Subscription に依存します。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

または、Maven の次の例に示すように、Spring Cloud Azure Stream Service Bus Starter を使用することもできます。

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

構成

バインダーは、構成オプションの次の 2 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Service Bus への接続に使用される構成オプションが含まれています。

手記

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。

spring-cloud-azure-stream-binder-servicebus の接続構成可能なプロパティ:

財産 種類 形容
spring.cloud.azure.servicebus.enabled ブーリアン Azure Service Bus が有効になっているかどうか。
spring.cloud.azure.servicebus.connection-string を する Service Bus 名前空間の接続文字列の値。
spring.cloud.azure.servicebus.custom-endpoint-address を する Service Bus に接続するときに使用するカスタム エンドポイント アドレス。
spring.cloud.azure.servicebus.namespace を する Service Bus 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.servicebus.___domain-name を する Azure Service Bus 名前空間の値のドメイン名。

手記

一般的な Azure Service SDK 構成オプションは、Spring Cloud Azure Stream Service Bus バインダーでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure. または spring.cloud.azure.servicebus.のプレフィックスで構成できます。

バインダーでは、Spring Could Azure Resource Manager も既定でサポートされています。 関連ロールで付与されていないセキュリティ プリンシパルを含む接続文字列を取得する方法については、「Spring Could Azure Resource Manager」の「Basic usage」セクション 参照してください。

Azure Service Bus のバインド構成プロパティ

次のオプションは、コンシューマー プロパティ、高度なコンシューマー構成、プロデューサーのプロパティ、および高度なプロデューサー構成の 4 つのセクションに分かれています。

コンシューマーのプロパティ

これらのプロパティは、ServiceBusConsumerPropertiesを介して公開されます。

手記

繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Service Bus では、spring.cloud.stream.servicebus.default.consumer.<property>=<value>の形式ですべてのチャネルの値の設定がサポートされています。

spring-cloud-azure-stream-binder-servicebus のコンシューマーが構成可能なプロパティ:

財産 種類 デフォルト 形容
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected ブーリアン 失敗したメッセージが DLQ にルーティングされる場合。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls を する 整数 1 Service Bus プロセッサ クライアントが処理する必要がある最大同時メッセージ数。 セッションを有効にすると、各セッションに適用されます。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions を する 整数 ヌル 任意の時点で処理する同時セッションの最大数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled の ブーリアン ヌル セッションが有効かどうか。
spring.cloud.stream.servicebus.bindings.binding-name.consumer .session-idle-timeout をする 期間 ヌル 現在アクティブなセッションでメッセージが受信されるまで待機する最大時間 (期間) を設定します。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count を する 整数 0 Service Bus プロセッサ クライアントのプリフェッチ数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue の サブキュー 何一つ 接続先のサブ キューの種類。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration 期間 5メートル ロックの自動更新を続行する時間。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode の サービスバス受信モード peek_lock Service Bus プロセッサ クライアントの受信モード。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete を する ブーリアン メッセージを自動的に決済するかどうか。 false に設定すると、開発者が手動でメッセージを決済できるように、Checkpointer のメッセージ ヘッダーが追加されます。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-メガバイト 長い 1024 キュー/トピックの最大サイズ (メガバイト単位)。 これは、キュー/トピックに割り当てられたメモリのサイズです。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live の 期間 P10675199DT2H48M5.4775807S. (10675199日、2 時間、48 分、5 秒、477 ミリ秒) メッセージが Service Bus に送信されてからメッセージが期限切れになるまでの期間。

大事な

Azure Resource Manager (ARM) を使用する場合は、spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type プロパティを構成する必要があります。 詳細については、GitHub の servicebus-queue-binder-arm サンプル を参照してください。

高度なコンシューマー構成

上記の 接続一般的な Azure SDK クライアント 構成では、各バインダー コンシューマーのカスタマイズがサポートされています。これはプレフィックス spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.で構成できます。

プロデューサーのプロパティ

これらのプロパティは、ServiceBusProducerPropertiesを介して公開されます。

手記

繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Service Bus では、spring.cloud.stream.servicebus.default.producer.<property>=<value>の形式ですべてのチャネルの値の設定がサポートされています。

spring-cloud-azure-stream-binder-servicebus のプロデューサーで構成可能なプロパティ:

財産 種類 デフォルト 形容
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync を する ブーリアン プロデューサーの同期用にフラグを切り替えます。
spring.cloud.stream.servicebus.bindings.bindings.binding-name.producer.send-timeout 長い 1万 プロデューサーの送信のタイムアウト値。
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusエンティティタイプ ヌル バインディング プロデューサーに必要な、プロデューサーの Service Bus エンティティ型。
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-メガバイト 長い 1024 キュー/トピックの最大サイズ (メガバイト単位)。 これは、キュー/トピックに割り当てられたメモリのサイズです。
spring.cloud.stream.servicebus.bindings.bindings.binding-name.producer.default-message-time-to-live (英語) 期間 P10675199DT2H48M5.4775807S. (10675199日、2 時間、48 分、5 秒、477 ミリ秒) メッセージが Service Bus に送信されてからメッセージが期限切れになるまでの期間。

大事な

バインディング プロデューサーを使用する場合は、spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type のプロパティを構成する必要があります。

高度なプロデューサー構成

上記の 接続一般的な Azure SDK クライアント 構成では、各バインダー プロデューサーのカスタマイズがサポートされています。これは、プレフィックス spring.cloud.stream.servicebus.bindings.<binding-name>.producer.で構成できます。

基本的な使用方法

Service Bus との間でのメッセージの送受信

  1. 構成オプションに資格情報を入力します。

    • 接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

      手記

      Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。

    • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

  • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. サプライヤーとコンシューマーを定義します。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

パーティション キーのサポート

バインダーは、メッセージ ヘッダーでパーティション キーとセッション ID を設定できるようにすることで、Service Bus のパーティション分割 をサポートします。 このセクションでは、メッセージのパーティション キーを設定する方法について説明します。

Spring Cloud Stream には、パーティション キー SpEL 式プロパティ spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionが用意されています。 たとえば、このプロパティを "'partitionKey-' + headers[<message-header-key>]" として設定し、message-header-key というヘッダーを追加します。 Spring Cloud Stream は、パーティション キーを割り当てる式を評価するときに、このヘッダーの値を使用します。 次のコードは、プロデューサーの例を示しています。

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

セッションのサポート

バインダーは、Service Bus メッセージ セッションをサポートします。 メッセージのセッション ID は、メッセージ ヘッダーを使用して設定できます。

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

手記

Service Bus のパーティション分割によると、セッション ID はパーティション キーよりも優先度が高くなります。 そのため、ServiceBusMessageHeaders#SESSION_ID ヘッダーと ServiceBusMessageHeaders#PARTITION_KEY ヘッダーの両方が設定されると、最終的にセッション ID の値を使用してパーティション キーの値が上書きされます。

エラー メッセージの処理

  • 送信バインドのエラー メッセージを処理する

    既定では、Spring Integration は errorChannelというグローバル エラー チャネルを作成します。 送信バインディング エラー メッセージを処理するように、次のメッセージ エンドポイントを構成します。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 受信バインディング のエラー メッセージを処理する

    Spring Cloud Stream Service Bus Binder では、受信メッセージ バインドのエラーを処理するための 2 つのソリューション (バインダー エラー ハンドラーとハンドラー) がサポートされています。

    Binder エラー ハンドラーの:

    既定のバインダー エラー ハンドラーは、受信バインディングを処理します。 このハンドラーを使用して、spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected が有効になっている場合に、失敗したメッセージを配信不能キューに送信します。 それ以外の場合、失敗したメッセージは破棄されます。 バインダー エラー ハンドラーは、他の指定されたエラー ハンドラーと相互に排他的です。

    エラー ハンドラーの:

    Spring Cloud Stream では、Consumer インスタンスを受け入れる ErrorMessage を追加することで、カスタム エラー ハンドラーを提供するためのメカニズムが公開されています。 詳細については、「Spring Cloud Stream のドキュメント エラー メッセージ を処理する」を参照してください。

    • バインドの既定のエラー ハンドラー

      1 つの Consumer Bean を構成して、すべてのインバウンド・バインディング・エラー・メッセージを使用します。 次の既定の関数は、各受信バインディング エラー チャネルをサブスクライブします。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、spring.cloud.stream.default.error-handler-definition プロパティを関数名に設定する必要もあります。

    • バインド固有のエラー ハンドラー

      特定のインバウンド・バインディング・エラー・メッセージを使用するように Consumer Bean を構成します。 次の関数は、バインディングの既定のエラー ハンドラーよりも優先順位の高い特定の受信バインド エラー チャネルをサブスクライブします。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      また、spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition プロパティを関数名に設定する必要もあります。

Service Bus メッセージ ヘッダー

サポートされる基本的なメッセージ ヘッダーについては、Spring Integrationの Spring Cloud Azure サポートの Service Bus メッセージ ヘッダー セクションを参照してください。

手記

パーティション キーを設定する場合、メッセージ ヘッダーの優先度は Spring Cloud Stream プロパティよりも高くなります。 そのため、spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression は、ServiceBusMessageHeaders#SESSION_ID ヘッダーと ServiceBusMessageHeaders#PARTITION_KEY ヘッダーが構成されていない場合にのみ有効になります。

複数バインダーのサポート

複数の Service Bus 名前空間への接続も、複数のバインダーを使用してサポートされます。 このサンプルでは、接続文字列を例として受け取ります。 サービス プリンシパルとマネージド ID の資格情報もサポートされており、ユーザーは各バインダーの環境設定で関連プロパティを設定できます。

  1. ServiceBus の複数のバインダーを使用するには、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    手記

    前のアプリケーション ファイルは、すべてのバインドに対してアプリケーションの 1 つの既定のポーリングツールを構成する方法を示しています。 特定のバインディングに対して poller を構成する場合は、spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000などの構成を使用できます。

    手記

    Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。

  2. 2 つのサプライヤーと 2 人の消費者を定義する必要がある

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

リソースのプロビジョニング

Service Bus バインダーは、キュー、トピック、サブスクリプションのプロビジョニングをサポートしています。ユーザーは次のプロパティを使用してプロビジョニングを有効にすることができます。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

手記

tenant-id に使用できる値は、commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。

Service Bus クライアントのプロパティをカスタマイズする

開発者は、AzureServiceClientBuilderCustomizer を使用して Service Bus クライアントのプロパティをカスタマイズできます。 次の例では、sessionIdleTimeoutServiceBusClientBuilder プロパティをカスタマイズします。

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリ を参照してください。