次の方法で共有


クイック スタート: Event Hubs .NET SDK (AMQP) を使用してイベントをストリーミングするときに Avro スキーマを使用して検証する

このクイックスタートでは、 Azure.Messaging.EventHubs .NET ライブラリを使用して、スキーマ検証を使用してイベント ハブとの間でイベントを送受信する方法について説明します。

Azure スキーマ レジストリ は、Event Hubs の機能です。 レジストリは、イベント ドリブンおよびメッセージング中心のアプリケーション用のスキーマの中央リポジトリを提供します。 プロデューサーとコンシューマー アプリケーションに対し、スキーマを管理して共有することなく、データを交換できる柔軟性を提供します。 また、再利用可能なスキーマのための単純なガバナンス フレームワークが用意されており、グループ化構成体 (スキーマ グループ) を使用してスキーマ間のリレーションシップを定義します。 詳細については、Event Hubs の Azure スキーマ レジストリ に関するページを参照してください。

[前提条件]

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に 、Event Hubs の概要 を参照してください。

このクイックスタートを完了するには、次の前提条件が必要です。

  • Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。

  • Microsoft Visual Studio 2022。

    Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された機能が使用されます。 以前のバージョンの C# 言語でライブラリを使うこともできますが、新しい構文は使用できません。 完全な構文を使用するには、 .NET Core SDK 3.0 以降を使用してコンパイルし、 言語バージョンlatest に設定することをお勧めします。

    Visual Studio を使用している場合、Visual Studio 2019 より前のバージョンは、C# 8.0 プロジェクトのビルドに必要なツールと互換性がありません。 無料の Community エディションを含む Visual Studio 2019 または Visual Studio 2022 をダウンロードするには、 Visual Studio を参照してください。

イベント ハブの作成

Event Hubs 名前空間とイベント ハブを作成するには、「 Event Hubs 名前空間とイベント ハブを作成する」の手順に従います。

Event Hubs 名前空間への接続文字列を取得するには、「接続文字列を取得する」 の手順に従います。

現在のクイック スタートで使用する次の設定に注意してください。

  • Event Hubs 名前空間の接続文字列
  • イベント ハブの名前

スキーマの作成

スキーマ グループとスキーマを作成するには、「 スキーマ レジストリを使用してスキーマを作成する」の手順に従います。

  1. スキーマ レジストリ ポータルを使用して 、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換モードには None を使用します。

  2. そのスキーマ グループで、スキーマ名が " Microsoft.Azure.Data.SchemaRegistry.example.Order" の新しい Avro スキーマを作成します。 次のスキーマ コンテンツを使用します。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

スキーマ レジストリ閲覧者ロールにユーザーを追加する

名前空間レベルで スキーマ レジストリ閲覧者 ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイック スタートでは必要ありません。

  1. [Event Hubs 名前空間] ページの左側のメニューで、[アクセス制御 (IAM)] を選択します。
  2. [ アクセス制御 (IAM)] ページで、[ + 追加>ロールの割り当ての追加] を選択します。
  3. [ ロール ] ページで、[ スキーマ レジストリ閲覧者] を選択し、[ 次へ] を選択します。
  4. [+ メンバーの選択] リンクを使用してユーザー アカウントをロールに追加し、[次へ] を選択します。
  5. [ 確認と割り当て ] ページで、[ 確認と割り当て] を選択します。

スキーマ検証を使用してイベント ハブにイベントを生成する

イベント プロデューサー用のコンソール アプリケーションを作成する

  1. Visual Studio を起動します。

  2. [ 新しいプロジェクトの作成] を選択します

  3. [ 新しいプロジェクトの作成 ] ダイアログで、次の手順を実行します。 このダイアログボックスが表示されない場合は、メニューの [ ファイル ] を選択し、[ 新規] を選択して、[ プロジェクト] を選択します。

    1. プログラミング言語として **[C#]** を選択します。

    2. アプリケーションの種類として [コンソール] を選択します。

    3. 結果リストから **[コンソール アプリケーション]** を選択します。

    4. 次に、 [次へ] を選択します。

      Visual Studio の [新しいプロジェクト] ダイアログを示すスクリーンショット。

  4. プロジェクト名 として「OrderProducer 」、ソリューション名に 「SRQuickStart 」と入力し、[ OK] を 選択してプロジェクトを作成します。

Event Hubs NuGet パッケージを追加する

  1. [ ツール>NuGet パッケージ マネージャー>Package Manager コンソール] を選択します

  2. 次のコマンドを実行して 、Azure.Messaging.EventHubs やその他の NuGet パッケージをインストールします。 Enter キーを押して最後のコマンドを実行します。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Visual Studio を使用して、プロデューサー アプリケーションを認証して Azure に接続します。 詳細については、.NET 用 Azure Identity クライアント ライブラリに関するページを参照してください。

  4. 名前空間レベルで Schema Registry Reader ロールのメンバーであるユーザー アカウントを使用して Azure にサインインします。 スキーマ レジストリ ロールの詳細については、 Azure ロールベースのアクセス制御に関するページを参照してください。

Avro スキーマを使用したコード生成

  1. スキーマの作成に使用したのと同じコンテンツを使用して、 Order.avscという名前のファイルを作成します。 プロジェクトまたはソリューション フォルダーにファイルを保存します。
  2. このスキーマ ファイルを使用して、.NET のコードを生成します。 コード生成には avrogen などの任意の外部コード生成ツールを使用できます。 たとえば、 avrogen -s .\Order.avsc . を実行してコードを生成します。
  3. コードを生成すると、Order.cs フォルダーに \Microsoft\Azure\Data\SchemaRegistry\example という名前のファイルが表示されます。 Avroスキーマに基づいて、Microsoft.Azure.Data.SchemaRegistry.example 名前空間にC#型を生成します。
  4. Order.cs ファイルをOrderProducer プロジェクトに追加します。

イベント ハブにイベントをシリアル化して送信するコードを記述する

  1. 次のコードを Program.cs ファイルに追加します。 詳細については、コード コメントを参照してください。 コードの大まかな手順は次のとおりです。

    1. イベント ハブにイベントを送信するために使用できるプロデューサー クライアントを作成します。
    2. Order オブジェクト内のデータをシリアル化および検証するために使用できるスキーマ レジストリ クライアントを作成します。
    3. 生成されたOrder型を使用して、新しいOrder オブジェクトを作成します。
    4. スキーマ レジストリ クライアントを使用して、 Order オブジェクトをシリアル化して EventDataします。
    5. イベントのバッチを作成します。
    6. イベント データをイベント バッチに追加します。
    7. プロデューサー クライアントを使用して、イベントのバッチをイベント ハブに送信します。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. 次のプレースホルダー値を実際の値に置き換えます。

    • EVENTHUBSNAMESPACECONNECTIONSTRING - Event Hubs 名前空間の接続文字列
    • EVENTHUBNAME - イベント ハブの名前
    • EVENTHUBSNAMESPACENAME - Event Hubs 名前空間の名前
    • SCHEMAGROUPNAME - スキーマ グループの名前
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. プロジェクトをビルドし、エラーがないことを確認します。

  4. プログラムを実行し、確認メッセージが表示されるまで待ちます。

    A batch of 1 order has been published.
    
  5. Azure portal で、イベント ハブがイベントを受信したことを確認できます。 [メトリック] セクションの [メッセージ] ビューに切り替えます。 ページを最新の情報に更新して、グラフを更新します。 メッセージを受信したことを示すには、数秒かかる場合があります。

    イベント ハブがイベントを受信したことを確認するための Azure portal ページの画像。

スキーマ検証を使用してイベント ハブからイベントを使用する

このセクションでは、イベント ハブからイベントを受信する .NET Core コンソール アプリケーションを記述し、スキーマ レジストリを使用してイベント データを逆シリアル化する方法について説明します。

追加の前提条件

  • イベント プロセッサを使用するストレージ アカウントを作成します。

コンシューマー アプリケーションを作成する

  1. ソリューション エクスプローラー ウィンドウで、 SRQuickStart ソリューションを右クリックし、[ 追加] を選択して、[ 新しいプロジェクト] を選択します。
  2. [コンソール アプリケーション] を選択し、 [次へ] を選択します。
  3. プロジェクト名として「OrderConsumer」と入力し、[作成] を選択します。
  4. ソリューション エクスプローラー ウィンドウでOrderConsumer を右クリックし、[スタートアップ プロジェクトとして設定] を選択します。

Event Hubs NuGet パッケージを追加する

  1. [ ツール>NuGet パッケージ マネージャー>Package Manager コンソール] を選択します

  2. [パッケージ マネージャー コンソール] ウィンドウで、既定のプロジェクトOrderConsumer が選択されていることを確認します。 そうでない場合は、ドロップダウン リストを使用して OrderConsumer を選択します。

  3. 次のコマンドを実行して、必要な NuGet パッケージをインストールします。 Enter キーを押して最後のコマンドを実行します。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. .NET 用 Azure Identity クライアント ライブラリに示すように、Visual Studio を使用して Azure に接続するプロデューサー アプリケーションを認証します。

  5. 名前空間レベルで Schema Registry Reader ロールのメンバーであるユーザー アカウントを使用して Azure にサインインします。 スキーマ レジストリ ロールの詳細については、 Azure ロールベースのアクセス制御に関するページを参照してください。

  6. プロデューサー アプリの作成の一環として生成した Order.cs ファイルを OrderConsumer プロジェクトに追加します。

  7. OrderConsumer プロジェクトを右クリックし、[スタートアップ プロジェクトとして設定] を選択します。

スキーマ レジストリを使用してイベントを受信して逆シリアル化するコードを記述する

  1. 次のコードを Program.cs ファイルに追加します。 詳細については、コード コメントを参照してください。 コードの大まかな手順は次のとおりです。

    1. イベント ハブにイベントを送信するために使用できるコンシューマー クライアントを作成します。
    2. Azure BLOB ストレージ内の BLOB コンテナー用の BLOB コンテナー クライアントを作成します。
    3. イベント プロセッサ クライアントを作成し、イベント ハンドラーとエラー ハンドラーを登録します。
    4. イベント ハンドラーで、イベント データを Order オブジェクトに逆シリアル化するために使用できるスキーマ レジストリ クライアントを作成します。
    5. シリアライザーを使用して、イベント データを Order オブジェクトに逆シリアル化します。
    6. 受け取った注文に関する情報を印刷します。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be used as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. 次のプレースホルダー値を実際の値に置き換えます。

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - Event Hubs 名前空間の接続文字列
    • EVENTHUBNAME - イベント ハブの名前
    • EVENTHUBSNAMESPACENAME - Event Hubs 名前空間の名前
    • SCHEMAGROUPNAME - スキーマ グループの名前
    • AZURESTORAGECONNECTIONSTRING - Azure ストレージ アカウントの接続文字列
    • BLOBCONTAINERNAME - BLOB コンテナーの名前
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. プロジェクトをビルドし、エラーがないことを確認します。

  4. 受信側アプリを実行します。

  5. イベント ハブがイベントを受信したことを示すメッセージが表示されます。

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    これらのイベントは、前に送信側プログラムを実行してイベント ハブに送信した 3 つのイベントです。

サンプル

.NET 用 Azure Schema Registry Apache Avro クライアント ライブラリを参照してください。

リソースをクリーンアップする

Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。

次のステップ