次の方法で共有


EventProcessorClient クラス

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClient

public class EventProcessorClient

EventProcessorClient は、コンシューマー グループのコンテキストで Event Hub のすべてのパーティションからイベントを使用する便利なメカニズムを提供します。 イベント プロセッサ ベースのアプリケーションは、EventProcessorClient の 1 つ以上のインスタンスで構成され、同じイベント ハブのイベントを使用するように設定されます。コンシューマー グループは、異なるインスタンス間でワークロードのバランスを取り、イベントが処理されたときに進行状況を追跡します。 実行中のインスタンスの数に基づいて、各 EventProcessorClient は、すべてのインスタンス間でワークロードのバランスを取るために、0 個以上のパーティションを所有できます。

サンプル: を構築する EventProcessorClient

次のサンプルではメモリCheckpointStore内を使用していますが、azure-messaging-eventhubs-checkpointstore-blob は、Azure Blob Storageによってサポートされるチェックポイント ストアを提供します。 さらに、 fullyQualifiedNamespace は Event Hubs 名前空間のホスト名です。 Azure Portal を介して Event Hubs 名前空間に移動した後、"Essentials" パネルの下に一覧表示されます。 使用される資格情報は、 DefaultAzureCredential デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。 は consumerGroup 、Event Hub インスタンスに移動し、[エンティティ] パネルの下にある [コンシューマー グループ] を選択することで見つかります。 consumerGroup は必須です。 使用される資格情報は、 DefaultAzureCredential デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

メソッドの概要

修飾子と型 メソッドと説明
String getIdentifier()

識別子は、このイベント プロセッサ インスタンスに指定された一意の名前です。

synchronized boolean isRunning()

イベント プロセッサが true 実行されている場合は を返します。

synchronized void start()

このイベント プロセッサが所有できる Event Hub のすべてのパーティションに対するイベントの処理を開始し、各パーティションに専用 PartitionProcessor の を割り当てます。

synchronized void stop()

このイベント プロセッサが所有するすべてのパーティションのイベントの処理を停止します。

メソッドの継承元: java.lang.Object

メソッドの詳細

getIdentifier

public String getIdentifier()

識別子は、このイベント プロセッサ インスタンスに指定された一意の名前です。

Returns:

このイベント プロセッサの識別子。

isRunning

public synchronized boolean isRunning()

イベント プロセッサが true 実行されている場合は を返します。 イベント プロセッサが既に実行されている場合、 を呼び出しても start() 効果はありません。

Returns:

true イベント プロセッサが実行されている場合は 。

start

public synchronized void start()

このイベント プロセッサが所有できる Event Hub のすべてのパーティションに対するイベントの処理を開始し、各パーティションに専用 PartitionProcessor の を割り当てます。 イベント ハブ上の同じコンシューマー グループに対してアクティブな他のイベント プロセッサがある場合、パーティションの責任はそれらの間で共有されます。

このイベント プロセッサが既に実行されている場合、以降の開始呼び出しは無視されます。 が呼び出された後 stop() に start を呼び出すと、このイベント プロセッサが再起動されます。

プロセッサを起動して、すべてのパーティションからのイベントを使用する

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop()

このイベント プロセッサが所有するすべてのパーティションのイベントの処理を停止します。 すべて PartitionProcessor シャットダウンされ、開いているリソースはすべて閉じられます。

イベント プロセッサが実行されていない場合、後続の stop の呼び出しは無視されます。

プロセッサの停止

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

適用対象