次の方法で共有


イベントに登録する

ヒント

このコンテンツは、.NET Docs で入手できる、またはオフラインで読み取ることができる無料のダウンロード可能な PDF として入手できる、コンテナー化された .NET アプリケーションの電子ブックである .NET マイクロサービス アーキテクチャからの抜粋です。

コンテナー化された .NET アプリケーションの .NET マイクロサービス アーキテクチャの電子ブックの表紙サムネイル。

イベント バスを使用する最初の手順は、受信するイベントにマイクロサービスをサブスクライブすることです。 その機能は、受信側マイクロサービスで行う必要があります。

次の単純なコードは、サービスの開始時 (つまり、 Startup クラス) で必要なイベントをサブスクライブするために、各受信側マイクロサービスが実装する必要があることを示しています。 この場合、 basket-api マイクロサービスは、 ProductPriceChangedIntegrationEventOrderStartedIntegrationEvent メッセージをサブスクライブする必要があります。

たとえば、 ProductPriceChangedIntegrationEvent イベントをサブスクライブすると、バスケット マイクロサービスは製品価格の変更を認識し、その製品がユーザーのバスケット内にある場合に変更についてユーザーに警告できます。

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

このコードが実行されると、サブスクライバー マイクロサービスは RabbitMQ チャネルを介してリッスンします。 ProductPriceChangedIntegrationEvent 型のメッセージが到着すると、コードはそれに渡されるイベント ハンドラーを呼び出し、イベントを処理します。

イベント バスを介したイベントの発行

最後に、メッセージ送信者 (配信元マイクロサービス) は、次の例のようなコードを使用して統合イベントを発行します。 (このアプローチは、原子性を考慮しない簡略化された例です)。イベントを複数のマイクロサービスに伝達する必要がある場合は常に、同様のコードを実装します。通常は、配信元マイクロサービスからデータまたはトランザクションをコミットした直後です。

まず、次のコードのように、イベント バス実装オブジェクト (RabbitMQ に基づくか、サービス バスに基づく) がコントローラー コンストラクターに挿入されます。

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

次に、UpdateProduct メソッドのように、コントローラーのメソッドから使用します。

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

この場合、配信元マイクロサービスは単純な CRUD マイクロサービスであるため、そのコードは Web API コントローラーに直接配置されます。

CQRS アプローチを使用する場合のように、より高度なマイクロサービスでは、CommandHandler メソッド内の Handle() クラスに実装できます。

イベント バスに発行するときの原子性と回復性の設計

イベント バスなどの分散メッセージング システムを介して統合イベントを発行する場合は、元のデータベースをアトミックに更新し、イベントを発行する (つまり、両方の操作が完了するか、いずれも完了しない) という問題が発生します。 たとえば、前に示した簡略化された例では、製品価格が変更されたときにコードによってデータがデータベースにコミットされ、ProductPriceChangedIntegrationEvent メッセージが発行されます。 最初は、これら 2 つの操作をアトミックに実行することが不可欠な場合があります。 ただし、 Microsoft Message Queuing (MSMQ) などの古いシステムの場合と同様に、データベースとメッセージ ブローカーを含む分散トランザクションを使用している場合、 CAP 定理で説明されている理由から、この方法はお勧めしません。

基本的には、マイクロサービスを使用して、スケーラブルで高可用性のシステムを構築します。 少し単純化すると、CAP 定理は、継続的に利用可能で、厳密に一貫性があり、どのパーティションにも耐性のある (分散) データベース (またはそのモデルを所有するマイクロサービス) を構築できない 言います。 これら 3 つのプロパティのうち 2 つを選択する必要があります。

マイクロサービス ベースのアーキテクチャでは、可用性と許容範囲を選択する必要があり、厳密な整合性を強調しないようにする必要があります。 そのため、最新のマイクロサービス ベースのアプリケーションでは、通常、MSMQ を使用して Windows 分散トランザクション コーディネーター (DTC) に基づいて分散トランザクションを実装する場合と同様に、メッセージングで分散トランザクションを使用する必要はありません。

最初の問題とその例に戻りましょう。 データベースが更新された後 (この場合は、 _context.SaveChangesAsync()を含むコード行の直後) にサービスがクラッシュした場合、統合イベントが発行される前に、システム全体が不整合になる可能性があります。 このアプローチは、処理する特定のビジネス操作に応じて、ビジネス上重要な場合があります。

アーキテクチャのセクションで前述したように、この問題に対処するためのいくつかの方法があります。

このシナリオでは、完全なイベント ソーシング (ES) パターンを使用するのが最適な方法の 1 つですが 、最適ではありません 。 ただし、多くのアプリケーション シナリオでは、完全な ES システムを実装できない場合があります。 ES は、現在の状態データを格納するのではなく、トランザクション データベースにドメイン イベントのみを格納することを意味します。 ドメイン イベントのみを格納すると、システムの履歴が利用可能であることや、過去の任意の時点でシステムの状態を判断できるなどの大きな利点があります。 ただし、完全な ES システムを実装するには、システムの大部分を再設計し、他の多くの複雑さと要件を導入する必要があります。 たとえば、 イベント ストアなどのイベント ソーシング用に特別に作成されたデータベースや、Azure Cosmos DB、MongoDB、Cassandra、CouchDB、RavenDB などのドキュメント指向データベースを使用します。 ES は、この問題に対する優れたアプローチですが、イベント ソーシングに既に慣れていない限り、最も簡単なソリューションではありません。

トランザクション ログ マイニングを使用するオプションは、最初は透過的に見えます。 ただし、この方法を使用するには、マイクロサービスを SQL Server トランザクション ログなどの RDBMS トランザクション ログに結合する必要があります。 この方法はおそらく望ましくありません。 もう 1 つの欠点は、トランザクション ログに記録された低レベルの更新が、高レベルの統合イベントと同じレベルにならない可能性があるということです。 その場合、これらのトランザクション ログ操作をリバース エンジニアリングするプロセスは困難な場合があります。

バランスの取れたアプローチは、トランザクション データベース テーブルと簡略化された ES パターンの組み合わせです。 "イベントの発行準備完了" などの状態を使用できます。これは、統合イベント テーブルにコミットするときに元のイベントで設定します。 そして、イベントをイベントバスに公開しようとします。 発行イベント アクションが成功した場合は、配信元サービスで別のトランザクションを開始し、状態を "イベントを発行する準備完了" から "既に発行されているイベント" に移動します。

イベント バスの発行イベント アクションが失敗した場合でも、データは配信元マイクロサービス内で不整合になりません。データは引き続き "イベントを発行する準備完了" としてマークされ、残りのサービスに関しては最終的に一貫性があります。 バックグラウンド ジョブでは、トランザクションまたは統合イベントの状態を常に確認できます。 ジョブで "イベント発行準備完了" 状態のイベントが検出された場合、ジョブによってイベント バスに対するそのイベントの再発行を試みることができます。

この方法では、各配信元マイクロサービスの統合イベントと、他のマイクロサービスまたは外部システムと通信するイベントのみを保持していることに注意してください。 これに対し、完全な ES システムでは、すべてのドメイン イベントも格納します。

したがって、このバランスの取れたアプローチは、簡略化された ES システムです。 統合イベントの一覧と現在の状態 ("発行する準備完了" と "発行済み" ) が必要です。 ただし、これらの状態は統合イベントにのみ実装する必要があります。 また、この方法では、すべてのドメイン データをイベントとしてトランザクション データベースに格納する必要はありません。これは、完全な ES システムの場合と同様です。

リレーショナル データベースを既に使用している場合は、トランザクション テーブルを使用して統合イベントを格納できます。 アプリケーションでアトミック性を実現するには、ローカル トランザクションに基づく 2 段階のプロセスを使用します。 基本的には、ドメイン エンティティがあるのと同じデータベースに IntegrationEvent テーブルがあります。 このテーブルはアトミック性を実現するための保険として機能するため、永続化された統合イベントをドメイン データをコミットするのと同じトランザクションに含めることができます。

ステップ バイ ステップでは、プロセスは次のようになります。

  1. アプリケーションがローカル データベース トランザクションを開始します。

  2. その後、ドメイン エンティティの状態が更新され、統合イベント テーブルにイベントが挿入されます。

  3. 最後に、トランザクションがコミットされます。これで目的の原子性が与えられます。次に、

  4. 何らかの方法でイベントを発行します (次)。

イベントを発行する手順を実装する場合は、次の選択肢があります。

  • トランザクションをコミットした直後に統合イベントを発行し、別のローカル トランザクションを使用して、テーブル内のイベントをパブリッシュ済みとしてマークします。 次に、テーブルを成果物として使用して、リモート マイクロサービスで問題が発生した場合に統合イベントを追跡し、格納されている統合イベントに基づいて補正アクションを実行します。

  • テーブルをキューの一種として使用します。 別のアプリケーション スレッドまたはプロセスが統合イベント テーブルに対してクエリを実行し、イベント をイベント バスに発行した後、ローカル トランザクションを使用してイベントを発行済みとしてマークします。

図 6-22 は、これらのアプローチの最初のアーキテクチャを示しています。

ワーカー マイクロサービスなしで発行するときの原子性の図。

図 6-22 イベント バスにイベントを発行するときの原子性

図 6-22 に示すアプローチには、公開された統合イベントの成功の確認と確認を担当する追加の worker マイクロサービスがありません。 エラーが発生した場合、その追加のチェッカー ワーカー マイクロサービスは、テーブルからイベントを読み取って再発行できます。つまり、手順 2 を繰り返します。

2 番目の方法について:EventLog テーブルをキューとして使用し、常にワーカー マイクロサービスを使用してメッセージを発行します。 その場合、図6-23に示すような処理が行われる。 これは、追加のマイクロサービスを示し、テーブルはイベントを発行するときの単一のソースです。

ワーカー マイクロサービスを使用して発行するときの原子性の図。

図 6-23 ワーカー マイクロサービスを使用してイベント バスにイベントを発行するときの原子性

わかりやすくするために、eShopOnContainers サンプルでは、(追加のプロセスやチェッカー マイクロサービスを使用しない) 最初のアプローチとイベント バスを使用します。 ただし、eShopOnContainers サンプルは、考えられるすべての障害ケースを処理しているわけではありません。 クラウドにデプロイされた実際のアプリケーションでは、最終的に問題が発生するという事実を受け入れ、そのチェックロジックと再送信ロジックを実装する必要があります。 テーブルをキューとして使用すると、イベント バスを介して (ワーカーと共に) イベントを発行するときに、そのテーブルをイベントの単一のソースとして持っている場合、最初のアプローチよりも効果的です。

イベント バスを通じて統合イベントを発行するときの原子性の実装

次のコードは、複数の DbContext オブジェクト (更新される元のデータに関連するコンテキストと IntegrationEventLog テーブルに関連する 2 つ目のコンテキスト) を含む 1 つのトランザクションを作成する方法を示しています。

次のコード例のトランザクションは、コードの実行時にデータベースへの接続に問題がある場合、回復性はありません。 これは、サーバー間でデータベースを移動する可能性がある Azure SQL DB などのクラウドベースのシステムで発生する可能性があります。 複数のコンテキストにわたって回復性のあるトランザクションを実装する場合は、このガイドの後半の 「回復性のある Entity Framework Core SQL 接続の実装 」セクションを参照してください。

わかりやすくするために、次の例は、プロセス全体を 1 つのコードで示しています。 ただし、eShopOnContainers の実装はリファクタリングされ、このロジックが複数のクラスに分割されるため、保守が容易になります。

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

ProductPriceChangedIntegrationEvent 統合イベントが作成されると、元のドメイン操作を格納するトランザクション (カタログ項目の更新) にも、イベントの永続化が EventLog テーブルに含まれます。 これにより、1 つのトランザクションになり、イベント メッセージが送信されたかどうかを常に確認できます。

イベント ログ テーブルは、同じデータベースに対するローカル トランザクションを使用して、元のデータベース操作でアトミックに更新されます。 いずれかの操作が失敗した場合、例外がスローされ、完了した操作がトランザクションによってロールバックされるため、ドメイン操作とテーブルに保存されたイベント メッセージの間の一貫性が維持されます。

サブスクリプションからのメッセージの受信: 受信側マイクロサービスのイベント ハンドラー

イベント サブスクリプション ロジックに加えて、統合イベント ハンドラー (コールバック メソッドなど) の内部コードを実装する必要があります。 イベント ハンドラーでは、特定の種類のイベント メッセージを受信して処理する場所を指定します。

イベント ハンドラーは、最初にイベント バスからイベント インスタンスを受け取ります。 次に、その統合イベントに関連して処理されるコンポーネントを検索し、受信側マイクロサービスの状態の変化としてイベントを伝達および永続化します。 たとえば、ProductPriceChanged イベントがカタログ マイクロサービスで発生した場合、そのイベントはバスケット マイクロサービスで処理され、次のコードに示すように、このレシーバー バスケット マイクロサービスの状態も変更されます。

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

イベント ハンドラーは、製品がいずれかのバスケット インスタンスに存在するかどうかを確認する必要があります。 また、関連する買い物かご品目の品目価格をすべて更新します。 最後に、図 6-24 に示すように、価格変更についてユーザーに表示されるアラートを作成します。

ユーザー カートの価格変更通知を示すブラウザーのスクリーンショット。

図 6-24 統合イベントによって伝達された品目価格の変更をバスケットに表示する

更新メッセージ イベントでのべき等

更新メッセージ イベントの重要な側面は、通信の任意の時点でエラーが発生すると、メッセージが再試行されることです。 それ以外の場合、バックグラウンド タスクは、既に発行されているイベントの発行を試み、競合状態を作成する可能性があります。 更新がべき等であるか、または、確実に重複を検出し、破棄し、1 つの応答のみを送信するために十分な情報が更新によって提供されることを確認してください。

前に説明したように、べき等性とは、結果を変更せずに操作を複数回実行できることを意味します。 メッセージング環境において、イベントを配信する際、受信側のマイクロサービスの結果を変更することなく複数回配信できるイベントは、冪等性を持っています。 これは、イベント自体の性質、またはシステムがイベントを処理する方法のために必要な場合があります。 メッセージのべき等性は、イベント バス パターンを実装するアプリケーションだけでなく、メッセージングを使用するすべてのアプリケーションで重要です。

べき等操作の例として、テーブルにデータがまだ含まれていない場合にのみデータをテーブルに挿入する SQL ステートメントがあります。 SQL ステートメントを挿入する回数は関係ありません。結果は同じになります。テーブルにはそのデータが含まれます。 このようなべき等性は、メッセージが送信され、複数回処理される可能性がある場合に、メッセージを処理するときにも必要になる場合があります。 たとえば、再試行ロジックによって送信者が同一のメッセージを複数回送信する場合は、そのメッセージが何度も送信されても結果が変わらないことを確認する必要があります。

べき等なメッセージを設計することは可能です。 たとえば、"製品価格に $5 を追加する" の代わりに "製品価格を $25 に設定する" というイベントを作成できます。最初のメッセージは何度でも安全に処理でき、結果は同じになります。 これは、2 番目のメッセージには当てはまりません。 しかし、最初のケースでも、システムが新しい価格変更イベントを送信し、新しい価格を上書きする可能性があるため、最初のイベントを処理したくない場合があります。

もう 1 つの例として、複数のサブスクライバーに伝達される注文完了イベントがあります。 同じ注文完了イベントに重複したメッセージ イベントがある場合でも、アプリは他のシステムで注文情報が 1 回だけ更新されるようにする必要があります。

各イベントがレシーバーごとに 1 回だけ処理されるようにロジックを作成できるように、イベントごとに何らかの ID を設定すると便利です。

一部のメッセージ処理は本質的に再現性があります。 たとえば、システムが画像サムネイルを生成する場合、生成されたサムネイルに関するメッセージが処理される回数は関係ありません。その結果、サムネイルが生成され、毎回同じになります。 一方、クレジットカードを請求するために支払いゲートウェイを呼び出すような操作は、冪等性がまったくない場合があります。 このような場合は、メッセージを複数回処理することが期待される効果があることを確認する必要があります。

その他のリソース

統合イベント メッセージの重複除去

メッセージ イベントが送信され、異なるレベルのサブスクライバーごとに 1 回だけ処理されるようにすることができます。 1 つの方法は、使用しているメッセージング インフラストラクチャによって提供される重複除去機能を使用することです。 もう 1 つは、ターゲット マイクロサービスにカスタム ロジックを実装することです。 トランスポート レベルとアプリケーション レベルの両方で検証を行うのが最善の策です。

EventHandler レベルでのメッセージ イベントの重複除去

イベントが受信側によって 1 回だけ処理されるようにする 1 つの方法は、イベント ハンドラーでメッセージ イベントを処理するときに特定のロジックを実装することです。 たとえば、これは eShopOnContainers アプリケーションで使用されるアプローチです。統合イベントを 受け取ったときに UserCheckoutAcceptedIntegrationEventで確認できます。 (この場合、CreateOrderCommandは、コマンド ハンドラーに送信する前に、IdentifiedCommandを識別子として使用して、eventMsg.RequestIdでラップされます)。

RabbitMQ を使用する場合のメッセージの重複除去

断続的なネットワーク障害が発生した場合、メッセージが複製される可能性があり、メッセージ受信側は、これらの重複したメッセージを処理する準備ができている必要があります。 受信側は、可能な場合、べき等な方法でメッセージを処理する必要があります。これは、重複除去でメッセージを明示的に処理する方法よりも優れています。

RabbitMQ のドキュメントによると、「メッセージがコンシューマーに配信され、(たとえば、コンシューマー接続が切断される前に確認されなかったために) 再キューされた場合、RabbitMQ は再配信時に再配信フラグを設定します (同じコンシューマーに対しても別のコンシューマーに対しても)。

"再配信" フラグが設定されている場合は、メッセージが既に処理されている可能性があるため、受信者はそのフラグを考慮する必要があります。 しかし、それは保証されていません。メッセージがメッセージ ブローカーを離れた後、ネットワークの問題が原因でメッセージが受信者に到達しなかった可能性があります。 一方、"再配信" フラグが設定されていない場合は、メッセージが複数回送信されていないことが保証されます。 したがって、受信側は、メッセージに "再配信" フラグが設定されている場合にのみ、メッセージを重複除去するか、べき等の方法でメッセージを処理する必要があります。

その他のリソース