你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本快速入门介绍如何使用 azure-messaging-eventhubs Java 包向 Azure 事件中心发送事件并从中接收事件。
小窍门
如果是在 Spring 应用程序中使用 Azure 事件中心资源,建议考虑 Spring Cloud Azure 作为替代方法。 Spring Cloud Azure 是一个开源项目,提供 Spring 与 Azure 服务的无缝集成。 要详细了解 Spring Cloud Azure,并查看使用事件中心的示例,请参阅使用 Azure 事件中心的 Spring Cloud 流。
先决条件
如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述。
若要完成本快速入门,需要满足以下先决条件:
- Microsoft Azure 订阅。 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。 如果没有现有的 Azure 帐户,可以注册免费试用帐户,或者在创建帐户时使用 MSDN 订阅者权益。
- Java 开发环境。 本快速入门使用 Eclipse。 需要 Java 开发工具包 (JDK) 版本 8 或更高版本。
- 创建事件中心命名空间和事件中心。 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 要创建命名空间和事件中心,请按照此文中的步骤操作。 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串:获取连接字符串。 稍后将在本快速入门中使用连接字符串。
发送事件
本部分介绍如何创建一个向事件中心发送事件的 Java 应用程序。
将引用添加到 Azure 事件中心库
首先,请在你最喜欢的 Java 开发环境中为控制台/shell 应用程序创建一个新的 Maven 项目。 按如下所示更新 pom.xml
文件。
Maven 中心存储库中提供了事件中心的 Java 客户端库。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.20.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.16.1</version>
<scope>compile</scope>
</dependency>
注释
将版本更新为发布到 Maven 存储库的最新版本。
向 Azure 验证应用
本快速入门介绍如何通过两种方式连接到 Azure 事件中心:
- 无密码。 使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到事件中心命名空间。 无需担心在代码、配置文件或安全存储(如 Azure Key Vault)中具有硬编码连接字符串。
- 连接字符串。 使用连接字符串连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。
建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅 Azure 服务的服务总线身份验证和授权 和 无密码连接。
将角色分配给 Microsoft Entra 用户
在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 需要 Azure 事件中心数据所有者 角色来发送和接收消息。 若要将此角色分配给自己,您需要 "User Access Administrator" 角色或其他包含 Microsoft.Authorization/roleAssignments/write
操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅 “了解 Azure RBAC”页的范围 。
以下示例将 Azure Event Hubs Data Owner
角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。
Azure 事件中心的内置 Azure 角色
对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下内置角色,用于授权访问事件中心命名空间:
- Azure 事件中心数据所有者:允许数据访问事件中心命名空间及其实体(队列、主题、订阅和筛选器)。
- Azure 事件中心数据发送者:使用此角色授予对事件中心命名空间及其实体的发送者访问权限。
- Azure 事件中心数据接收者:使用此角色授予接收者对事件中心命名空间及其实体的访问权限。
如果要创建自定义角色,请参阅执行事件中心操作所需的权限。
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
- Azure 门户
- Azure CLI
- PowerShell
在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。
在概述页面上,从左侧菜单中选择“访问控制(IAM)”。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。
使用搜索框将结果筛选为所需角色。 对于此示例,请搜索
Azure Event Hubs Data Owner
并选择匹配的结果。 然后选择“下一步”。在“ 分配访问权限”下,选择“ 用户”、“组”或服务主体。 然后选择 “+ 选择成员”。
在对话框中,搜索Microsoft Entra 用户名(通常是 user@___domain 电子邮件地址)。 选择对话框底部的 “选择 ”。
选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。
编写代码以将消息发送到事件中心
添加一个名为 Sender
的类,并将以下代码添加至该类中:
重要
- 将
<NAMESPACE NAME>
更新为事件中心命名空间的名称。 - 将
<EVENT HUB NAME>
更新为事件中心的名称。
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
生成程序,并确保没有引发任何错误。 将在运行接收器程序后运行此程序。
接收事件
本教程中的代码基于 GitHub 上的 EventProcessorClient 示例,您可以查看该示例以了解完整的应用程序运作。
使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:
- 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
- 不要将存储帐户用于任何其他用途。
- 不要将容器用于任何其他用途。
- 在部署的应用程序所在的同一区域中创建存储帐户。 如果应用程序位于本地,请尝试选择最近的区域。
在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。
- 分层命名空间
- Blob 软删除
- 版本控制
创建 Azure 存储和 Blob 容器
本快速入门将使用 Azure 存储(特别是 Blob 存储)作为检查点存储。 标记检查点是一个进程,被事件处理器用来标记或提交分区中最后一个成功处理的事件的位置。 标记检查点通常在处理事件的函数中进行。 了解有关检查点的更多信息,请参阅事件处理器。
按照以下步骤创建 Azure 存储帐户。
- 创建 Azure 存储帐户
- 创建一个 blob 容器
- 对 blob 容器进行身份验证
在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需要具备存储 Blob 数据参与者角色才能读写 Blob 数据。 若要为自己分配此角色,需要被分配 “用户访问管理员” 角色,或另一个包含 Microsoft.Authorization/roleAssignments/write 操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅了解 Azure RBAC 的范围。
在此方案中,将权限分配给用户帐户(作用域为存储帐户)以遵循 最低特权原则。 这种做法仅为用户提供所需的最低权限,并创建更安全的生产环境。
以下示例将 存储 Blob 数据参与者 角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
- Azure 门户
- Azure CLI
- PowerShell
在 Azure 门户中,使用主搜索栏或左侧导航找到存储帐户。
在存储帐户页上,从左侧菜单中选择 “访问控制”(IAM )。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。
使用搜索框将结果筛选为所需角色。 对于此示例,搜索“存储 Blob 数据参与者”。 选择匹配的结果,然后选择 “下一步”。
在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索你的 Microsoft Entra 用户名(通常是你的 user@___domain 电子邮件地址),然后在对话框的底部选择“选择”。
选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。
将事件中心库添加到 Java 项目
在 pom.xml 文件中添加以下依赖项。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.20.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.6</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.16.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
将以下
import
语句添加到 Java 文件顶部。import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
创建一个名为
Receiver
的类,并向该类中添加以下字符串变量。 将占位符替换为正确的值。重要
将占位符替换为正确的值。
-
<NAMESPACE NAME>
更新为事件中心命名空间的名称。 -
<EVENT HUB NAME>
更新为命名空间中的事件中心名称。
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
-
将下面的
main
方法添加到该类中。重要
将占位符替换为正确的值。
-
<STORAGE ACCOUNT NAME>
更新为 Azure 存储帐户的名称。 -
<CONTAINER NAME>
更新为存储帐户中 blob 容器的名称
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
-
将两个处理事件和错误的帮助程序方法( 和 )添加到 类中。
public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
生成程序,并确保没有引发任何错误。
运行应用程序
先运行接收器应用程序。
然后运行发送器应用程序。
在“接收器”应用程序窗口中,确认已看到发送器应用程序发布的事件。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
在接收端应用程序窗口中按ENTER以停止该应用程序。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
相关内容
请参阅 GitHub 上的以下示例: