이 빠른 시작에서는 Azure.Messaging.EventHubs .NET 라이브러리를 사용하여 스키마 유효성 검사를 사용하여 이벤트 허브에서 이벤트를 보내고 받는 방법을 알아봅니다.
Azure 스키마 레지스트리 는 Event Hubs의 기능입니다. 레지스트리는 이벤트 기반 및 메시징 중심 애플리케이션에 대한 스키마에 대한 중앙 리포지토리를 제공합니다. 생산자 및 소비자 애플리케이션이 스키마를 관리 및 공유하지 않고도 데이터를 교환할 수 있는 유연성을 제공합니다. 또한 재사용 가능한 스키마에 대한 간단한 거버넌스 프레임워크를 제공하고, 그룹화 구성(스키마 그룹)을 통해 스키마 간의 관계를 정의합니다. 자세한 내용은 Event Hubs의 Azure 스키마 레지스트리를 참조하세요.
필수 조건
Azure Event Hubs를 처음 사용하는 경우 이 빠른 시작을 수행하기 전에 Event Hubs 개요를 참조하세요.
이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.
Azure 구독이 아직 없는 경우, 시작하기 전에 무료 계정을 만드십시오.
Microsoft Visual Studio 2022.
Azure Event Hubs 클라이언트 라이브러리는 C# 8.0에서 도입된 기능을 사용합니다. 이전 C# 언어 버전으로 라이브러리를 계속 사용할 수 있지만 새 구문은 사용할 수 없습니다. 완전한 구문을 사용하려면 .NET Core SDK 3.0 이상 및 언어 버전으로 컴파일할 것을
latest
권장합니다.Visual Studio를 사용하는 경우 Visual Studio 2019 이전 버전은 C# 8.0 프로젝트를 빌드하는 데 필요한 도구와 호환되지 않습니다. 무료 Community 버전을 포함하여 Visual Studio 2019 또는 Visual Studio 2022를 다운로드하려면 Visual Studio를 참조하세요.
이벤트 허브 만들기
Event Hubs 네임스페이스 및 이벤트 허브를 만들려면 Event Hubs 네임스페이스 및 이벤트 허브 만들기의 지침을 따릅니다.
Event Hubs 네임스페이스에 대한 연결 문자열을 얻으려면 연결 문자열 가져오기의 지침을 따릅니다.
현재 빠른 시작에서 사용할 다음 설정을 적어 둡니다.
- Event Hubs 네임스페이스에 대한 연결 문자열
- 이벤트 허브의 이름
스키마 만들기
스키마 그룹 및 스키마를 만들려면 스키마 레지스트리를 사용하여 스키마 만들기의 지침을 따릅니다.
스키마 레지스트리 포털을 사용하여 contoso-sg 라는 스키마 그룹을 만듭니다. 호환 모드에서는 Avro 를 serialization 형식으로 사용하고 없음 을 사용합니다.
해당 스키마 그룹에서 스키마 이름으로
Microsoft.Azure.Data.SchemaRegistry.example.Order
새 Avro 스키마를 만듭니다. 다음 스키마 콘텐츠를 사용합니다.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
스키마 레지스트리 읽기 권한자 역할에 사용자 추가
네임스페이스 수준에서 스키마 레지스트리 읽기 권한자 역할에 사용자 계정을 추가합니다. 스키마 레지스트리 기여자 역할을 사용할 수도 있지만 이 빠른 시작에는 필요하지 않습니다.
- Event Hubs 네임스페이스 페이지의 왼쪽 메뉴에서 액세스 제어(IAM)를 선택합니다.
- 액세스 제어(IAM) 페이지에서 +역할 할당 추가> 선택합니다.
- 역할 페이지에서 스키마 레지스트리 읽기 프로그램을 선택한 다음, 다음을 선택합니다.
- + 멤버 선택 링크를 사용하여 역할에 사용자 계정을 추가한 다음, 다음을 선택합니다.
- 검토 + 할당 페이지에서 검토 + 할당을 선택합니다.
스키마 유효성 검사를 사용하여 이벤트 허브에 이벤트 생성
이벤트 생산자용 콘솔 애플리케이션 만들기
Visual Studio를 시작합니다.
새 프로젝트 만들기를 선택합니다.
새 프로젝트 만들기 대화 상자에서 다음 단계를 수행합니다. 이 대화 상자가 표시되지 않으면 메뉴에서 파일을 선택하고 새로 만들기를 선택한 다음 프로젝트를 선택합니다.
프로그래밍 언어로 C#을 선택합니다.
애플리케이션 유형으로 콘솔을 선택합니다.
결과 목록에서 콘솔 애플리케이션을 선택합니다.
그런 후에 다음을 선택합니다.
프로젝트 이름에 OrderProducer 를 입력하고 솔루션 이름에 대한 SRQuickStart 를 입력한 다음 확인을 선택하여 프로젝트를 만듭니다.
Event Hubs NuGet 패키지 추가
도구>NuGet 패키지 관리자>패키지 관리자 콘솔을 선택합니다.
다음 명령을 실행하여 Azure.Messaging.EventHubs 및 기타 NuGet 패키지를 설치합니다. ENTER를 눌러 마지막 명령을 실행합니다.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Visual Studio를 사용하여 Azure에 연결하도록 생산자 애플리케이션을 인증합니다. 자세한 내용은 .NET용 Azure ID 클라이언트 라이브러리를 참조하세요.
네임스페이스 수준에서 역할의
Schema Registry Reader
멤버인 사용자 계정을 사용하여 Azure에 로그인합니다. 스키마 레지스트리 역할에 대한 자세한 내용은 Azure 역할 기반 액세스 제어를 참조하세요.
Avro 스키마를 사용하여 코드 생성
- 스키마를 만드는 데 사용한 것과 동일한 콘텐츠를 사용하여 이름이 지정된 파일을 만듭니다
Order.avsc
. 프로젝트 또는 솔루션 폴더에 파일을 저장합니다. - 이 스키마 파일을 사용하여 .NET에 대한 코드를 생성합니다. 코드 생성을 위해 avrogen 과 같은 외부 코드 생성 도구를 사용할 수 있습니다. 예를 들어 코드를 생성하기 위해 실행
avrogen -s .\Order.avsc .
합니다. - 코드를 생성한 후 폴더에
Order.cs
이름이 지정된\Microsoft\Azure\Data\SchemaRegistry\example
파일이 표시됩니다. 여기서 Avro 스키마의 경우 네임스페이스에서 C# 형식을Microsoft.Azure.Data.SchemaRegistry.example
생성합니다. -
Order.cs
프로젝트에OrderProducer
파일을 추가합니다.
이벤트를 직렬화하고 이벤트 허브로 보내는 코드 작성
Program.cs
파일에 다음 코드를 추가합니다. 자세한 내용은 코드 주석을 참조하세요. 코드의 개략적인 단계는 다음과 같습니다.- 이벤트 허브에 이벤트를 보내는 데 사용할 수 있는 생산자 클라이언트를 만듭니다.
- 개체의 데이터를 직렬화하고 유효성을 검사하는 데 사용할 수 있는 스키마 레지스트리 클라이언트를
Order
만듭니다. - 생성된
Order
형식을 사용하여 새Order
개체를 만듭니다. - 스키마 레지스트리 클라이언트를 사용하여
Order
개체를EventData
로 직렬화합니다. - 이벤트 일괄 처리를 만듭니다.
- 이벤트 일괄 처리에 이벤트 데이터를 추가합니다.
- 생산자 클라이언트를 사용하여 이벤트 허브에 이벤트 일괄 처리를 보냅니다.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
다음 자리 표시자의 값을 실제 값으로 바꾸십시오.
-
EVENTHUBSNAMESPACECONNECTIONSTRING
- Event Hubs 네임스페이스에 대한 연결 문자열 -
EVENTHUBNAME
- 이벤트 허브의 이름 -
EVENTHUBSNAMESPACENAME
- Event Hubs 네임스페이스의 이름 -
SCHEMAGROUPNAME
- 스키마 그룹의 이름
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
프로그램을 빌드하고 오류가 없는지 확인합니다.
프로그램을 실행하고 확인 메시지가 나타날 때까지 기다립니다.
A batch of 1 order has been published.
Azure Portal에서 이벤트 허브가 이벤트를 수신했는지 확인할 수 있습니다. 메트릭 섹션의 메시지 보기로 전환합니다. 페이지를 새로 고쳐 차트를 업데이트합니다. 메시지를 받았다는 것을 표시하는 데 몇 초 정도 걸릴 수 있습니다.
스키마 유효성 검사를 사용하여 이벤트 허브에서 이벤트 수신
이 섹션에서는 이벤트 허브에서 이벤트를 수신하고 스키마 레지스트리를 사용하여 이벤트 데이터를 역직렬화하는 .NET Core 콘솔 애플리케이션을 작성하는 방법을 보여 줍니다.
추가 필수 구성 요소
- 이벤트 프로세서를 사용할 스토리지 계정을 만듭니다.
소비자 애플리케이션 만들기
- 솔루션 탐색기 창에서 SRQuickStart 솔루션을 마우스 오른쪽 단추로 클릭하고 추가를 선택한 다음 새 프로젝트를 선택합니다.
- 콘솔 애플리케이션을 선택하고 다음을 선택합니다.
- 프로젝트 이름에 OrderConsumer를 입력하고 만들기를 선택합니다.
- 솔루션 탐색기 창에서 OrderConsumer를 마우스 오른쪽 단추로 클릭하고 시작 프로젝트로 설정을 선택합니다.
Event Hubs NuGet 패키지 추가
도구>NuGet 패키지 관리자>패키지 관리자 콘솔을 선택합니다.
패키지 관리자 콘솔 창에서 OrderConsumer가 기본 프로젝트에 대해 선택되어 있는지 확인합니다. 그렇지 않은 경우 드롭다운 목록을 사용하여 OrderConsumer를 선택합니다.
다음 명령을 실행하여 필요한 NuGet 패키지를 설치합니다. ENTER를 눌러 마지막 명령을 실행합니다.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
.NET용 Azure ID 클라이언트 라이브러리에 표시된 것처럼 Visual Studio를 사용하여 Azure에 연결하도록 생산자 애플리케이션을 인증합니다.
네임스페이스 수준에서 역할의
Schema Registry Reader
멤버인 사용자 계정을 사용하여 Azure에 로그인합니다. 스키마 레지스트리 역할에 대한 자세한 내용은 Azure 역할 기반 액세스 제어를 참조하세요.Order.cs
생산자 앱을 만드는 과정의 일부로 생성한 파일을 OrderConsumer 프로젝트에 추가합니다.OrderConsumer 프로젝트를 마우스 오른쪽 단추로 클릭하고 시작 프로젝트로 설정을 선택합니다.
스키마 레지스트리를 사용하여 이벤트를 수신하고 역직렬화하는 코드를 작성합니다.
Program.cs
파일에 다음 코드를 추가합니다. 자세한 내용은 코드 주석을 참조하세요. 코드의 개략적인 단계는 다음과 같습니다.- 이벤트 허브에 이벤트를 보내는 데 사용할 수 있는 소비자 클라이언트를 만듭니다.
- Azure Blob Storage에서 Blob 컨테이너에 대한 Blob 컨테이너 클라이언트를 만듭니다.
- 이벤트 프로세서 클라이언트를 만들고 이벤트 및 오류 처리기를 등록합니다.
- 이벤트 처리기에서 이벤트 데이터를 개체로 역직렬화하는 데 사용할 수 있는 스키마 레지스트리 클라이언트를
Order
만듭니다. - serializer를 사용하여 이벤트 데이터를
Order
개체로 역직렬화합니다. - 받은 주문에 대한 정보를 인쇄합니다.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be used as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
다음 자리 표시자의 값을 실제 값으로 바꾸십시오.
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- Event Hubs 네임스페이스에 대한 연결 문자열 -
EVENTHUBNAME
- 이벤트 허브의 이름 -
EVENTHUBSNAMESPACENAME
- Event Hubs 네임스페이스의 이름 -
SCHEMAGROUPNAME
- 스키마 그룹의 이름 -
AZURESTORAGECONNECTIONSTRING
- Azure Storage 계정에 대한 연결 문자열 -
BLOBCONTAINERNAME
- Blob 컨테이너의 이름
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
프로그램을 빌드하고 오류가 없는지 확인합니다.
수신기 애플리케이션을 실행합니다.
이벤트 허브에서 이벤트를 받았다는 메시지가 표시됩니다.
Received order with ID: 1234, amount: 45.29, description: First sample order.
이러한 이벤트는 앞에서 송신기 프로그램을 실행하여 이벤트 허브로 보낸 세 개 이벤트입니다.
샘플
.NET용 Azure 스키마 레지스트리 Apache Avro 클라이언트 라이브러리를 참조하세요.
자원을 정리하세요
Event Hubs 네임스페이스를 삭제하거나 네임스페이스가 포함된 리소스 그룹을 삭제합니다.