次の方法で共有


Azure SDK for Java での非同期プログラミング

この記事では、Azure SDK for Java の非同期プログラミング モデルについて説明します。

Azure SDK には、最初は、Azure サービスと対話するための非ブロッキング非同期 API のみが含まれていました。 これらの API を使用すると、Azure SDK を使用して、システム リソースを効率的に使用するスケーラブルなアプリケーションを構築できます。 ただし、 Azure SDK for Java には、より多くのユーザーに対応できる同期クライアントも含まれています。また、非同期プログラミングに慣れていないユーザーがクライアント ライブラリを近付けやすくなります。 (Azure SDK の設計ガイドラインの 「Approachable」 を参照してください)。そのため、Azure SDK for Java のすべての Java クライアント ライブラリには、非同期クライアントと同期クライアントの両方が用意されています。 ただし、システム リソースの使用を最大化するには、運用システムに非同期クライアントを使用することをお勧めします。

リアクティブ ストリーム

Java Azure SDK 設計ガイドライン「Async Service Clients」セクションを見ると、Java 8 で提供されるCompletableFutureを使用する代わりに、非同期 API でリアクティブ型が使用されていることがわかります。 JDK でネイティブに使用できる型よりもリアクティブ型を選択したのはなぜですか?

Java 8 では、 StreamsLambdasCompletableFuture などの機能が導入されました。 これらの機能には多くの機能がありますが、いくつかの制限があります。

CompletableFuture には、コールバック ベースの非ブロッキング機能と、一連の非同期操作を簡単に構成できる CompletionStage インターフェイスが用意されています。 ラムダは、これらのプッシュベースの API をより読みやすくします。 ストリームは、データ要素のコレクションを処理する機能スタイルの操作を提供します。 ただし、ストリームは同期的であり、再利用することはできません。 CompletableFuture では、1 つの要求を行い、コールバックのサポートを提供し、 1 つの 応答を期待できます。 ただし、多くのクラウド サービスでは、たとえば Event Hubs などのデータをストリーミングする機能が必要です。

リアクティブ ストリームは、ソースからサブスクライバーに要素をストリーミングすることで、これらの制限を克服するのに役立ちます。 サブスクライバーがソースからデータを要求すると、ソースは任意の数の結果を返します。 一度に送信する必要はありません。 転送は、ソースが送信するデータを持つたびに、一定期間にわたって行われます。

このモデルでは、サブスクライバーは、受信時にデータを処理するイベント ハンドラーを登録します。 これらのプッシュベースの対話は、個別のシグナルを介してサブスクライバーに通知します。

  • onSubscribe()呼び出しは、データ転送が開始しようとしていることを示します。
  • onError()呼び出しは、データ転送の終了を示すエラーが発生したことを示します。
  • onComplete()呼び出しは、データ転送が正常に完了したことを示します。

Java ストリームとは異なり、リアクティブ ストリームは、エラーをファースト クラスのイベントとして扱います。 リアクティブ ストリームには、ソースがサブスクライバーにエラーを通信するための専用チャネルがあります。 また、リアクティブ ストリームを使用すると、サブスクライバーはデータ転送速度をネゴシエートして、これらのストリームをプッシュ プル モデルに変換できます。

リアクティブ ストリーム仕様では、データの転送を行う方法の標準が提供されます。 大まかに言うと、仕様では次の 4 つのインターフェイスを定義し、これらのインターフェイスを実装する方法に関する規則を指定します。

  • パブリッシャー は、データ ストリームのソースです。
  • サブスクライバー はデータ ストリームのコンシューマーです。
  • サブスクリプション は、パブリッシャーとサブスクライバー間のデータ転送の状態を管理します。
  • プロセッサ はパブリッシャーとサブスクライバーの両方です。

RxJavaAkka StreamsVert.xProject Reactor など、この仕様の実装を提供する既知の Java ライブラリがいくつかあります。

Azure SDK for Java では、非同期 API を提供するために Project Reactor が採用されました。 この決定を推進する主な要因は、Project Reactor も使用する Spring Webflux とのスムーズな統合を提供することです。 RxJavaよりもプロジェクトリアクターを選択するもう1つの要因は、Project ReactorがJava 8を使用していたが、当時のRxJavaはまだJava 7にありました。 Project Reactor には、構成可能な豊富な演算子セットも用意されており、データ処理パイプラインを構築するための宣言型コードを記述できます。 Project Reactor のもう 1 つの良い点は、Project Reactor 型を他の一般的な実装型に変換するためのアダプターがあるということです。

同期操作と非同期操作の API の比較

同期クライアントと非同期クライアントのオプションについて説明しました。 次の表は、これらのオプションを使用して設計された API の外観をまとめたものです。

API の種類 値なし 単一の値 複数の値
標準 Java - 同期 API void T Iterable<T>
標準 Java - 非同期 API CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
リアクティブ ストリーム インターフェイス Publisher<Void> Publisher<T> Publisher<T>
リアクティブ ストリームの Project Reactor の実装 Mono<Void> Mono<T> Flux<T>

完全を期す目的で、Java 9 では、4 つのリアクティブ ストリーム インターフェイスを含む Flow クラスが導入されました。 ただし、このクラスには実装は含まれません。

Azure SDK for Java で非同期 API を使用する

リアクティブ ストリームの仕様では、パブリッシャーの種類は区別されません。 リアクティブ ストリームの仕様では、パブリッシャーは単に 0 個以上のデータ要素を生成します。 多くの場合、高々1つのデータ要素を生成するパブリッシャーと、0 個以上のデータ要素を生成するパブリッシャーの間には便利な区別があります。 クラウドベースの API では、この区別は、要求が単一値の応答またはコレクションを返すかどうかを示します。 Project Reactor はこの区別をするために、MonoFlux の 2 つのタイプを提供します。 Monoを返す API には、最大 1 つの値を持つ応答が含まれます。また、Fluxを返す API には、0 個以上の値を持つ応答が含まれます。

たとえば、 ConfigurationAsyncClient を使用して、Azure App Configuration サービスを使用して格納されている構成を取得するとします。 (詳細については、「 Azure App Configuration とは」を参照してください)。)

ConfigurationAsyncClientを作成し、クライアントでgetConfigurationSetting()を呼び出すと、応答に 1 つの値が含まれていることを示すMonoが返されます。 ただし、このメソッドを呼び出すだけでは何も行われません。 クライアントは、Azure App Configuration サービスに対してまだ要求を行っていません。 この段階では、この API によって返される Mono<ConfigurationSetting> は、単なるデータ処理パイプラインの "アセンブリ" です。 これは、データを使用するために必要なセットアップが完了したことを意味します。 データ転送を実際にトリガーするには (つまり、サービスへの要求を行って応答を取得するには)、返された Monoをサブスクライブする必要があります。 そのため、これらのリアクティブ ストリームを処理するときは、subscribe() を呼び出す必要があります。そうしないと、何も起こりません。

次の例は、 Mono をサブスクライブし、構成値をコンソールに出力する方法を示しています。

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

クライアントで getConfigurationSetting() を呼び出した後、サンプル コードは結果をサブスクライブし、3 つの個別のラムダを提供します。 最初のラムダは、サービスから受信したデータを使用します。これは、応答が成功したときにトリガーされます。 2 番目のラムダは、構成の取得中にエラーが発生した場合にトリガーされます。 3 番目のラムダは、データ ストリームが完了したときに呼び出されます。つまり、このストリームからこれ以上データ要素が必要ありません。

すべての非同期プログラミングと同様に、サブスクリプションが作成されると、通常どおり実行が続行されます。 プログラムをアクティブにして実行し続ける必要がない場合は、非同期操作が完了する前に終了する可能性があります。 subscribe()を呼び出したメイン スレッドは、Azure App Configuration へのネットワーク呼び出しを行い、応答を受信するまで待機しません。 運用システムでは、他の処理を続けることができますが、この例では、 Thread.sleep() を呼び出して少し遅延を追加したり、 CountDownLatch を使用して非同期操作を完了したりできます。

次の例に示すように、 Flux を返す API も同様のパターンに従います。 違いは、 subscribe() メソッドに提供される最初のコールバックが、応答内の各データ要素に対して複数回呼び出されるということです。 エラーまたは完了コールバックは 1 回だけ呼び出され、ターミナル シグナルと見なされます。 これらのシグナルのいずれかがパブリッシャーから受信された場合、他のコールバックは呼び出されません。

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

バックプレッシャ

ソースがサブスクライバーが処理できる速度よりも速い速度でデータを生成するとどうなりますか。 サブスクライバーはデータに圧倒され、メモリ不足エラーが発生する可能性があります。 サブスクライバーは、パブリッシャーに通信して、追従できない場合に速度を落としてもらう必要があります。 既定では、上記の例に示すようにsubscribe()Fluxを呼び出すと、サブスクライバーは無制限のデータ ストリームを要求し、パブリッシャーにできるだけ早くデータを送信するよう指示します。 この動作は常に望ましいとは限らないので、サブスクライバーは "バックプレッシャ" を使用して発行速度を制御する必要がある場合があります。 バックプレッシャにより、サブスクライバーはデータ要素のフローを制御できます。 サブスクライバーは、処理できる限られた数のデータ要素を要求します。 サブスクライバーがこれらの要素の処理を完了すると、サブスクライバーはより多くを要求できます。 バックプレッシャを使用すると、データ転送用のプッシュ モデルをプッシュ プル モデルに変換できます。

次の例は、Event Hubs コンシューマーがイベントを受信する速度を制御する方法を示しています。

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

サブスクライバーが最初にパブリッシャーに "接続" すると、パブリッシャーはサブスクライバーに Subscription インスタンスを提供し、データ転送の状態を管理します。 この Subscription は、サブスクライバーが処理できるデータ要素の数を指定する request() を呼び出すことによってバックプレッシャを適用できるメディアです。

たとえば、サブスクライバーが onNext()を呼び出すたびに複数のデータ要素を要求した場合、 request(10) パブリッシャーは、次の 10 個の要素が使用可能になった場合、または使用可能になったときにすぐに送信します。 これらの要素はサブスクライバー側のバッファーに蓄積され、各 onNext() 呼び出しではさらに 10 個が要求されるため、パブリッシャーが送信するデータ要素がなくなったか、サブスクライバーのバッファー オーバーフローが発生してメモリ不足エラーが発生するまでバックログは増加し続けます。

サブスクリプションを取り消す

サブスクリプションは、パブリッシャーとサブスクライバー間のデータ転送の状態を管理します。 パブリッシャーがサブスクライバーへのすべてのデータの転送を完了するか、サブスクライバーがデータの受信に関心を持たなくなるまで、サブスクリプションはアクティブです。 次に示すように、サブスクリプションを取り消す方法はいくつかあります。

次の例では、サブスクライバーを破棄してサブスクリプションを取り消します。

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

次の例では、cancel()Subscription メソッドを呼び出してサブスクリプションを取り消します。

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

結論

スレッドは高価なリソースであり、リモート サービス呼び出しからの応答を待つ際に無駄にしないでください。 マイクロサービス アーキテクチャの導入が増えるにつれて、リソースを効率的にスケーリングして使用する必要性が不可欠になります。 非同期 API は、ネットワーク バインド操作がある場合に便利です。 Azure SDK for Java には、システム リソースの最大化に役立つ非同期操作用の豊富な API セットが用意されています。 非同期クライアントを試してみることを強くお勧めします。

特定のタスクに最適な演算子の詳細については、「Reactor 3 リファレンス ガイド」の「必要なオペレーター」を参照してください。

次のステップ

さまざまな非同期プログラミングの概念をよりよく理解したので、結果を反復処理する方法を学習することが重要です。 最適なイテレーション戦略の詳細と改ページの動作の詳細については、 Azure SDK for Java での改ページとイテレーションに関するページを参照してください。