Spring Cloud Stream は、共有メッセージング システムに接続された高度にスケーラブルなイベント ドリブン マイクロサービスを構築するためのフレームワークです。
このフレームワークは、既に確立されており、使い慣れた Spring のイディオムとベスト プラクティスに基づいて構築された柔軟なプログラミング モデルを提供します。 これらのベスト プラクティスには、永続的な pub/sub セマンティクス、コンシューマー グループ、ステートフル パーティションのサポートが含まれます。
現在のバインダーの実装は次のとおりです。
- 詳細については、Azure Event Hubs の Spring Cloud Stream Binder の に関するページを参照してください。 -
spring-cloud-azure-stream-binder-servicebus
- 詳細については、「Spring Cloud Stream Binder for Azure Service Bus」を参照してください。
Azure Event Hubs 用 Spring Cloud Stream Binder
主な概念
Azure Event Hubs 用の Spring Cloud Stream Binder は、Spring Cloud Stream フレームワークのバインド実装を提供します。
この実装では、Spring Integration Event Hubs チャネル アダプターを基盤として使用します。 デザインの観点からは、Event Hubs は Kafka に似ています。 また、Event Hubs には Kafka API 経由でアクセスできます。 プロジェクトが Kafka API に厳密に依存している場合は、Kafka API サンプル を使用
コンシューマー グループ
Event Hubs では、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 Kafka はコミットされたすべてのオフセットをブローカーに格納しますが、手動で処理される Event Hubs メッセージのオフセットを格納する必要があります。 Event Hubs SDK は、このようなオフセットを Azure Storage 内に格納する関数を提供します。
パーティション分割のサポート
Event Hubs には、Kafka と同様の物理パーティションの概念が用意されています。 ただし、Kafka のコンシューマーとパーティション間の自動再調整とは異なり、Event Hubs には一種のプリエンプティブ モードが用意されています。 ストレージ アカウントは、どのコンシューマーがどのパーティションを所有するかを判断するためのリースとして機能します。 新しいコンシューマーが起動すると、ワークロードバランスを実現するために、最も負荷の高いコンシューマーからいくつかのパーティションを盗もうとします。
負荷分散戦略を指定するために、spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
のプロパティが提供されます。 詳細については、「コンシューマーのプロパティの」セクションを参照してください。
Batch コンシューマーのサポート
Spring Cloud Azure Stream Event Hubs バインダーは、Spring Cloud Stream Batch コンシューマー機能 をサポートします。
バッチ コンシューマー モードを使用するには、spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
プロパティを true
に設定します。 有効にすると、バッチ処理されたイベントの一覧のペイロードを含むメッセージが受信され、Consumer
関数に渡されます。 各メッセージ ヘッダーもリストに変換され、コンテンツは各イベントから解析された関連ヘッダー値です。 イベントのバッチ全体が同じ値を共有するため、パーティション ID、チェックポイント、および最後にエンキューされたプロパティの共同ヘッダーが 1 つの値として表示されます。 詳細については、「Spring Integrationの Spring Cloud Azure サポート
手記
チェックポイント ヘッダーは、MANUAL
チェックポイント モードが使用されている場合にのみ存在します。
バッチ コンシューマーのチェックポイント処理では、BATCH
と MANUAL
の 2 つのモードがサポートされます。
BATCH
モードは、バインダーが受信したイベントのバッチ全体を一緒にチェックポイント処理する自動チェックポイント モードです。
MANUAL
モードでは、ユーザーがイベントをチェックポイント処理します。 使用すると、Checkpointer
がメッセージ ヘッダーに渡され、ユーザーはそれを使用してチェックポイント処理を実行できます。
バッチ サイズを指定するには、プレフィックスが max-size
の max-wait-time
プロパティと spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
プロパティを設定します。
max-size
プロパティが必要であり、max-wait-time
プロパティは省略可能です。 詳細については、「コンシューマーのプロパティの」セクションを参照してください。
依存関係のセットアップ
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
または、Maven の次の例に示すように、Spring Cloud Azure Stream Event Hubs Starter を使用することもできます。
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
構成
バインダーは、構成オプションの次の 3 つの部分を提供します。
接続構成プロパティ
このセクションには、Azure Event Hubs への接続に使用される構成オプションが含まれています。
手記
セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。
spring-cloud-azure-stream-binder-eventhubs の接続構成可能なプロパティ:
財産 | 種類 | 形容 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | ブーリアン | Azure Event Hubs が有効になっているかどうか。 |
spring.cloud.azure.eventhubs.connection-string | 糸 | Event Hubs 名前空間の接続文字列の値。 |
spring.cloud.azure.eventhubs.namespace を |
糸 | Event Hubs 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります |
spring.cloud.azure.eventhubs .___domain-name | 糸 | Azure Event Hubs 名前空間の値のドメイン名。 |
spring.cloud.azure.eventhubs.custom-endpoint-address を |
糸 | カスタム エンドポイント アドレス。 |
先端
一般的な Azure Service SDK 構成オプションは、Spring Cloud Azure Stream Event Hubs バインダーでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure.
または spring.cloud.azure.eventhubs.
のプレフィックスで構成できます。
バインダーでは、Spring Could Azure Resource Manager
チェックポイント構成プロパティ
このセクションには、パーティションの所有権とチェックポイント情報を保持するために使用されるストレージ BLOB サービスの構成オプションが含まれています。
手記
バージョン 4.0.0 以降、
spring-cloud-azure-stream-binder-eventhubs の構成可能なプロパティのチェックポイント処理:
財産 | 種類 | 形容 |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists |
ブーリアン | コンテナーが存在しない場合に作成を許可するかどうか。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name を |
糸 | ストレージ アカウントの名前。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | 糸 | ストレージ アカウントのアクセス キー。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | 糸 | ストレージ コンテナー名。 |
先端
Azure Service SDK の一般的な構成オプションは、ストレージ BLOB チェックポイント ストアでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure.
または spring.cloud.azure.eventhubs.processor.checkpoint-store
のプレフィックスで構成できます。
Azure Event Hubs のバインド構成プロパティ
次のオプションは、コンシューマー プロパティ、高度なコンシューマー構成、プロデューサーのプロパティ、および高度なプロデューサー構成の 4 つのセクションに分かれています。
コンシューマーのプロパティ
これらのプロパティは、EventHubsConsumerProperties
を介して公開されます。
手記
繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Event Hubs では、spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
の形式ですべてのチャネルの値の設定がサポートされています。
spring-cloud-azure-stream-binder-eventhubs のコンシューマーが構成可能なプロパティ:
財産 | 種類 | 形容 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode を |
チェックポイントモード | コンシューマーがメッセージをチェックポイント処理する方法を決定するときに使用されるチェックポイント モード |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count |
整数 | 1 つのチェックポイントを実行する各パーティションのメッセージの量を決定します。 チェックポイント モード PARTITION_COUNT 使用されている場合にのみ有効になります。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | 期間 | 1 つのチェックポイントを実行する時間間隔を決定します。 チェックポイント モード TIME 使用されている場合にのみ有効になります。 |
spring.cloud.stream.eventhubs.bindings を します。<binding-name.consumer.batch.max-size | 整数 | バッチ内のイベントの最大数。 バッチ コンシューマー モードに必要です。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time を |
期間 | バッチ処理の最大時間。 バッチ コンシューマー モードが有効でオプションの場合にのみ有効になります。 |
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.load-balancing.update-interval | 期間 | 更新の間隔の期間。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy を |
LoadBalancing戦略 | 負荷分散戦略。 |
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | 期間 | パーティションの所有権が期限切れになるまでの期間。 |
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.track-last-enqueued-event-properties | ブーリアン | イベント プロセッサが、関連付けられているパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうか。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | 整数 | イベント ハブ コンシューマーがアクティブに受信し、ローカルでキューに登録するイベントの数を制御するためにコンシューマーによって使用される数。 |
spring.cloud.stream.eventhubs.bindings.bindings.binding-name.consumer.initial-partition-event-position | キーをパーティション ID としてマップし、StartPositionProperties の値を指定します。 |
パーティションのチェックポイントがチェックポイント ストアに存在しない場合に、各パーティションに使用するイベント位置を含むマップ。 このマップは、パーティション ID からキーオフされます。 |
手記
initial-partition-event-position
構成では、各イベント ハブの初期位置を指定する map
を受け入れます。 したがって、そのキーはパーティション ID であり、値は StartPositionProperties
です。これには、オフセット、シーケンス番号、エンキューされた日付時刻、包括性のプロパティが含まれます。 たとえば、次のように設定できます。
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
高度なコンシューマー構成
上記の
プロデューサーのプロパティ
これらのプロパティは、EventHubsProducerProperties
を介して公開されます。
手記
繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Event Hubs では、spring.cloud.stream.eventhubs.default.producer.<property>=<value>
の形式ですべてのチャネルの値の設定がサポートされています。
spring-cloud-azure-stream-binder-eventhubs のプロデューサー構成可能なプロパティ:
財産 | 種類 | 形容 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | ブーリアン | プロデューサーの同期用のスイッチ フラグ。 true の場合、プロデューサーは送信操作の後に応答を待機します。 |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | 長い | 送信操作の後に応答を待機する時間。 同期プロデューサーが有効になっている場合にのみ有効になります。 |
高度なプロデューサー構成
上記の 接続 と 一般的な Azure SDK クライアント 構成では、各バインダー プロデューサーのカスタマイズがサポートされています。これは、プレフィックス spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
で構成できます。
基本的な使用方法
Event Hubs との間でのメッセージの送受信
構成オプションに資格情報を入力します。
接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
手記
Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。
サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
手記
tenant-id
に使用できる値は、common
、organizations
、consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。
資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
サプライヤーとコンシューマーを定義します。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
パーティション分割のサポート
送信するメッセージに関するパーティション情報を構成するために、ユーザー指定のパーティション情報を含む PartitionSupplier
が作成されます。 次のフローチャートは、パーティション ID とキーのさまざまな優先順位を取得するプロセスを示しています。
Batch コンシューマーのサポート
次の例に示すように、バッチ構成オプションを指定します。
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
サプライヤーとコンシューマーを定義します。
BATCH
としてのチェックポイント モードの場合は、次のコードを使用してメッセージを送信し、バッチで使用できます。@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
MANUAL
としてのチェックポイント モードの場合は、次のコードを使用してメッセージを送信し、バッチで使用/チェックポイント処理できます。@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
手記
バッチ使用モードでは、Spring Cloud Stream バインダーの既定のコンテンツ タイプが application/json
されるため、メッセージ ペイロードがコンテンツ タイプと一致していることを確認します。 たとえば、application/json
の既定のコンテンツ タイプを使用して String
ペイロードを持つメッセージを受信する場合、ペイロードは元の JSON String
テキストの二重引用符で囲まれた String
する必要があります。
text/plain
コンテンツ タイプの場合は、String
オブジェクトを直接指定できます。 詳細については、「Spring Cloud Stream コンテンツ タイプ ネゴシエーションを
エラー メッセージの処理
送信バインドのエラー メッセージを処理する
既定では、Spring Integration は
errorChannel
というグローバル エラー チャネルを作成します。 送信バインディング エラー メッセージを処理するように、次のメッセージ エンドポイントを構成します。@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
受信バインディング のエラー メッセージを処理する
Spring Cloud Stream Event Hubs Binder では、受信メッセージ バインドのエラーを処理する 1 つのソリューションであるエラー ハンドラーがサポートされています。
エラー ハンドラーの:
Spring Cloud Stream では、
Consumer
インスタンスを受け入れるErrorMessage
を追加することで、カスタム エラー ハンドラーを提供するためのメカニズムが公開されています。 詳細については、「Spring Cloud Stream のドキュメント エラー メッセージ を処理する」を参照してください。バインドの既定のエラー ハンドラー
1 つの
Consumer
Bean を構成して、すべてのインバウンド・バインディング・エラー・メッセージを使用します。 次の既定の関数は、各受信バインディング エラー チャネルをサブスクライブします。@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
また、
spring.cloud.stream.default.error-handler-definition
プロパティを関数名に設定する必要もあります。バインド固有のエラー ハンドラー
特定のインバウンド・バインディング・エラー・メッセージを使用するように
Consumer
Bean を構成します。 次の関数は、特定の受信バインディング エラー チャネルをサブスクライブし、バインディングの既定のエラー ハンドラーよりも高い優先順位を持ちます。@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
また、
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
プロパティを関数名に設定する必要もあります。
Event Hubs メッセージ ヘッダー
サポートされる基本的なメッセージ ヘッダーについては、「Spring Integrationの Spring Cloud Azure サポート
複数バインダーのサポート
複数のバインダーを使用して、複数の Event Hubs 名前空間への接続もサポートされます。 このサンプルでは、接続文字列を例として受け取ります。 サービス プリンシパルとマネージド ID の資格情報もサポートされています。 各バインダーの環境設定で、関連するプロパティを設定できます。
Event Hubs で複数のバインダーを使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
手記
前のアプリケーション ファイルは、すべてのバインドに対してアプリケーションの 1 つの既定のポーリングツールを構成する方法を示しています。 特定のバインディングに対して poller を構成する場合は、
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
などの構成を使用できます。手記
Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。
2 つのサプライヤーと 2 つのコンシューマーを定義する必要があります。
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
リソースのプロビジョニング
Event Hubs バインダーは、イベント ハブとコンシューマー グループのプロビジョニングをサポートしています。ユーザーは次のプロパティを使用してプロビジョニングを有効にすることができます。
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
手記
tenant-id
に使用できる値は、common
、organizations
、consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。
サンプル
詳細については、GitHub の azure-spring-boot-samples リポジトリ
Azure Service Bus 用 Spring Cloud Stream Binder
主な概念
Azure Service Bus 用の Spring Cloud Stream Binder は、Spring Cloud Stream Framework のバインド実装を提供します。 この実装では、Spring Integration Service Bus チャネル アダプターを基盤として使用します。
スケジュールされたメッセージ
このバインダーは、遅延処理のためにトピックにメッセージを送信することをサポートします。 ユーザーは、ヘッダー x-delay
メッセージの遅延時間をミリ秒単位で表すスケジュールされたメッセージを送信できます。 メッセージは、x-delay
ミリ秒後にそれぞれのトピックに配信されます。
コンシューマー グループ
Service Bus トピックでは、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。
このバインダーは、コンシューマー グループとして機能するトピックの Subscription
に依存します。
依存関係のセットアップ
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
または、Maven の次の例に示すように、Spring Cloud Azure Stream Service Bus Starter を使用することもできます。
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
構成
バインダーは、構成オプションの次の 2 つの部分を提供します。
接続構成プロパティ
このセクションには、Azure Service Bus への接続に使用される構成オプションが含まれています。
手記
セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID を使用してアクセスを承認する」 を参照して、セキュリティ プリンシパルに Azure リソースにアクセスするための十分なアクセス許可が付与されていることを確認してください。
spring-cloud-azure-stream-binder-servicebus の接続構成可能なプロパティ:
財産 | 種類 | 形容 |
---|---|---|
spring.cloud.azure.servicebus.enabled | ブーリアン | Azure Service Bus が有効になっているかどうか。 |
spring.cloud.azure.servicebus.connection-string を |
糸 | Service Bus 名前空間の接続文字列の値。 |
spring.cloud.azure.servicebus.custom-endpoint-address を |
糸 | Service Bus に接続するときに使用するカスタム エンドポイント アドレス。 |
spring.cloud.azure.servicebus.namespace を |
糸 | Service Bus 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります |
spring.cloud.azure.servicebus.___domain-name を |
糸 | Azure Service Bus 名前空間の値のドメイン名。 |
手記
一般的な Azure Service SDK 構成オプションは、Spring Cloud Azure Stream Service Bus バインダーでも構成できます。 サポートされている構成オプションは、Spring Cloud Azure 構成で導入され、統合プレフィックス spring.cloud.azure.
または spring.cloud.azure.servicebus.
のプレフィックスで構成できます。
バインダーでは、Spring Could Azure Resource Manager
Azure Service Bus のバインド構成プロパティ
次のオプションは、コンシューマー プロパティ、高度なコンシューマー構成、プロデューサーのプロパティ、および高度なプロデューサー構成の 4 つのセクションに分かれています。
コンシューマーのプロパティ
これらのプロパティは、ServiceBusConsumerProperties
を介して公開されます。
手記
繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Service Bus では、spring.cloud.stream.servicebus.default.consumer.<property>=<value>
の形式ですべてのチャネルの値の設定がサポートされています。
spring-cloud-azure-stream-binder-servicebus のコンシューマーが構成可能なプロパティ:
財産 | 種類 | デフォルト | 形容 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | ブーリアン | 偽 | 失敗したメッセージが DLQ にルーティングされる場合。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls を |
整数 | 1 | Service Bus プロセッサ クライアントが処理する必要がある最大同時メッセージ数。 セッションを有効にすると、各セッションに適用されます。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions を |
整数 | ヌル | 任意の時点で処理する同時セッションの最大数。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled の |
ブーリアン | ヌル | セッションが有効かどうか。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer .session-idle-timeout をする | 期間 | ヌル | 現在アクティブなセッションでメッセージが受信されるまで待機する最大時間 (期間) を設定します。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count を |
整数 | 0 | Service Bus プロセッサ クライアントのプリフェッチ数。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue の |
サブキュー | 何一つ | 接続先のサブ キューの種類。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | 期間 | 5メートル | ロックの自動更新を続行する時間。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode の |
サービスバス受信モード | peek_lock | Service Bus プロセッサ クライアントの受信モード。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete を |
ブーリアン | 真 | メッセージを自動的に決済するかどうか。 false に設定すると、開発者が手動でメッセージを決済できるように、Checkpointer のメッセージ ヘッダーが追加されます。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-メガバイト | 長い | 1024 | キュー/トピックの最大サイズ (メガバイト単位)。 これは、キュー/トピックに割り当てられたメモリのサイズです。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live の |
期間 | P10675199DT2H48M5.4775807S. (10675199日、2 時間、48 分、5 秒、477 ミリ秒) | メッセージが Service Bus に送信されてからメッセージが期限切れになるまでの期間。 |
大事な
Azure Resource Manager (ARM) を使用する場合は、spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
プロパティを構成する必要があります。 詳細については、GitHub の servicebus-queue-binder-arm サンプル
高度なコンシューマー構成
上記の 接続 と 一般的な Azure SDK クライアント 構成では、各バインダー コンシューマーのカスタマイズがサポートされています。これはプレフィックス spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
で構成できます。
プロデューサーのプロパティ
これらのプロパティは、ServiceBusProducerProperties
を介して公開されます。
手記
繰り返しを回避するために、バージョン 4.17.0 と 5.11.0 以降、Spring Cloud Azure Stream Binder Service Bus では、spring.cloud.stream.servicebus.default.producer.<property>=<value>
の形式ですべてのチャネルの値の設定がサポートされています。
spring-cloud-azure-stream-binder-servicebus のプロデューサーで構成可能なプロパティ:
財産 | 種類 | デフォルト | 形容 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync を |
ブーリアン | 偽 | プロデューサーの同期用にフラグを切り替えます。 |
spring.cloud.stream.servicebus.bindings.bindings.binding-name.producer.send-timeout | 長い | 1万 | プロデューサーの送信のタイムアウト値。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusエンティティタイプ | ヌル | バインディング プロデューサーに必要な、プロデューサーの Service Bus エンティティ型。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-メガバイト | 長い | 1024 | キュー/トピックの最大サイズ (メガバイト単位)。 これは、キュー/トピックに割り当てられたメモリのサイズです。 |
spring.cloud.stream.servicebus.bindings.bindings.binding-name.producer.default-message-time-to-live (英語) | 期間 | P10675199DT2H48M5.4775807S. (10675199日、2 時間、48 分、5 秒、477 ミリ秒) | メッセージが Service Bus に送信されてからメッセージが期限切れになるまでの期間。 |
大事な
バインディング プロデューサーを使用する場合は、spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
のプロパティを構成する必要があります。
高度なプロデューサー構成
上記の 接続 と 一般的な Azure SDK クライアント 構成では、各バインダー プロデューサーのカスタマイズがサポートされています。これは、プレフィックス spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
で構成できます。
基本的な使用方法
Service Bus との間でのメッセージの送受信
構成オプションに資格情報を入力します。
接続文字列としての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
手記
Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。
サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
手記
tenant-id
に使用できる値は、common
、organizations
、consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。
資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
サプライヤーとコンシューマーを定義します。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
パーティション キーのサポート
バインダーは、メッセージ ヘッダーでパーティション キーとセッション ID を設定できるようにすることで、Service Bus のパーティション分割 をサポートします。 このセクションでは、メッセージのパーティション キーを設定する方法について説明します。
Spring Cloud Stream には、パーティション キー SpEL 式プロパティ spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
が用意されています。 たとえば、このプロパティを "'partitionKey-' + headers[<message-header-key>]"
として設定し、message-header-key というヘッダーを追加します。 Spring Cloud Stream は、パーティション キーを割り当てる式を評価するときに、このヘッダーの値を使用します。 次のコードは、プロデューサーの例を示しています。
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
セッションのサポート
バインダーは、Service Bus
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
手記
Service Bus のパーティション分割によると、セッション ID はパーティション キーよりも優先度が高くなります。 そのため、ServiceBusMessageHeaders#SESSION_ID
ヘッダーと ServiceBusMessageHeaders#PARTITION_KEY
ヘッダーの両方が設定されると、最終的にセッション ID の値を使用してパーティション キーの値が上書きされます。
エラー メッセージの処理
送信バインドのエラー メッセージを処理する
既定では、Spring Integration は
errorChannel
というグローバル エラー チャネルを作成します。 送信バインディング エラー メッセージを処理するように、次のメッセージ エンドポイントを構成します。@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
受信バインディング のエラー メッセージを処理する
Spring Cloud Stream Service Bus Binder では、受信メッセージ バインドのエラーを処理するための 2 つのソリューション (バインダー エラー ハンドラーとハンドラー) がサポートされています。
Binder エラー ハンドラーの:
既定のバインダー エラー ハンドラーは、受信バインディングを処理します。 このハンドラーを使用して、
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
が有効になっている場合に、失敗したメッセージを配信不能キューに送信します。 それ以外の場合、失敗したメッセージは破棄されます。 バインダー エラー ハンドラーは、他の指定されたエラー ハンドラーと相互に排他的です。エラー ハンドラーの:
Spring Cloud Stream では、
Consumer
インスタンスを受け入れるErrorMessage
を追加することで、カスタム エラー ハンドラーを提供するためのメカニズムが公開されています。 詳細については、「Spring Cloud Stream のドキュメント エラー メッセージ を処理する」を参照してください。バインドの既定のエラー ハンドラー
1 つの
Consumer
Bean を構成して、すべてのインバウンド・バインディング・エラー・メッセージを使用します。 次の既定の関数は、各受信バインディング エラー チャネルをサブスクライブします。@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
また、
spring.cloud.stream.default.error-handler-definition
プロパティを関数名に設定する必要もあります。バインド固有のエラー ハンドラー
特定のインバウンド・バインディング・エラー・メッセージを使用するように
Consumer
Bean を構成します。 次の関数は、バインディングの既定のエラー ハンドラーよりも優先順位の高い特定の受信バインド エラー チャネルをサブスクライブします。@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
また、
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
プロパティを関数名に設定する必要もあります。
Service Bus メッセージ ヘッダー
サポートされる基本的なメッセージ ヘッダーについては、Spring Integrationの Spring Cloud Azure サポートの
手記
パーティション キーを設定する場合、メッセージ ヘッダーの優先度は Spring Cloud Stream プロパティよりも高くなります。 そのため、spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
は、ServiceBusMessageHeaders#SESSION_ID
ヘッダーと ServiceBusMessageHeaders#PARTITION_KEY
ヘッダーが構成されていない場合にのみ有効になります。
複数バインダーのサポート
複数の Service Bus 名前空間への接続も、複数のバインダーを使用してサポートされます。 このサンプルでは、接続文字列を例として受け取ります。 サービス プリンシパルとマネージド ID の資格情報もサポートされており、ユーザーは各バインダーの環境設定で関連プロパティを設定できます。
ServiceBus の複数のバインダーを使用するには、application.yml ファイルで次のプロパティを構成します。
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
手記
前のアプリケーション ファイルは、すべてのバインドに対してアプリケーションの 1 つの既定のポーリングツールを構成する方法を示しています。 特定のバインディングに対して poller を構成する場合は、
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
などの構成を使用できます。手記
Microsoft では、使用可能な最も安全な認証フローを使用することをお勧めします。 この手順で説明されている認証フロー (データベース、キャッシュ、メッセージング、AI サービスなど) には、アプリケーションで非常に高い信頼度が要求されるため、他のフローには存在しないリスクが伴います。 このフローは、パスワードレス接続またはキーレス接続のマネージド ID など、より安全なオプションが有効でない場合にのみ使用します。 ローカル コンピューターの操作では、パスワードレス接続またはキーレス接続にユーザー ID を使用します。
2 つのサプライヤーと 2 人の消費者を定義する必要がある
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
リソースのプロビジョニング
Service Bus バインダーは、キュー、トピック、サブスクリプションのプロビジョニングをサポートしています。ユーザーは次のプロパティを使用してプロビジョニングを有効にすることができます。
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
手記
tenant-id
に使用できる値は、common
、organizations
、consumers
、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人用アカウントと組織アカウント) セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra IDでのシングルテナント アプリをマルチテナントに変換する」を参照してください。
Service Bus クライアントのプロパティをカスタマイズする
開発者は、AzureServiceClientBuilderCustomizer
を使用して Service Bus クライアントのプロパティをカスタマイズできます。 次の例では、sessionIdleTimeout
の ServiceBusClientBuilder
プロパティをカスタマイズします。
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
サンプル
詳細については、GitHub の azure-spring-boot-samples リポジトリ