本教程介绍如何配置基于 Java 的 Spring Cloud Stream Binder,以使用适用于 Kafka 的 Azure 事件中心通过 Azure 事件中心发送和接收消息。 有关详细信息,请参阅 从 Apache Kafka 应用程序使用 Azure 事件中心
在本教程中,我们将包括两种身份验证方法: Microsoft Entra 身份验证 和 共享访问签名(SAS)身份验证。 “无密码”选项卡显示“Microsoft Entra 身份验证”,“连接字符串”选项卡显示 SAS 身份验证。
Microsoft Entra 身份验证是一种使用 Microsoft Entra ID 中定义的标识连接到适用于 Kafka 的 Azure 事件中心的机制。 通过 Microsoft Entra 身份验证,可以在一个中心位置集中管理数据库用户标识和其他 Microsoft 服务,从而简化权限管理。
SAS 身份验证使用 Azure 事件中心命名空间的连接字符串来委派对适用于 Kafka 的事件中心的访问权限。 如果选择使用共享访问签名作为凭据,则需要自行管理连接字符串。
先决条件
Azure 订阅 - 免费创建一个订阅。
Java 开发工具包 (JDK) 版本 8 或更高版本。
Apache Maven 版本 3.2 或更高版本。
cURL 或类似的 HTTP 实用工具来测试功能。
Azure Cloud Shell 或 Azure CLI 2.37.0 或更高版本。
Azure 事件中心。 如果没有 事件中心,请使用 Azure 门户创建事件中心。
Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建 Maven 项目。 请务必选择 Maven 项目 ,然后在 “依赖项”下添加 Spring Web、 Spring for Apache Kafka 和 Cloud Stream 依赖项,然后选择 Java 版本 8 或更高版本。
重要
要完成本教程中的步骤,需要 Spring Boot 版本 2.5 或更高版本。
准备凭据
Azure 事件中心支持使用 Microsoft Entra ID 对事件中心资源请求进行授权。 使用 Microsoft Entra ID,可以使用 Azure 基于角色的访问控制(Azure RBAC) 向 安全主体(可能是用户或应用程序服务主体)授予权限。
如果要使用 Microsoft Entra 身份验证在本地运行此示例,请确保用户帐户已通过 Azure Toolkit for IntelliJ、Visual Studio Code Azure 帐户插件或 Azure CLI 进行身份验证。 此外,请确保该帐户已被授予足够的权限。
注意
使用无密码连接时,需要授予帐户对资源的访问权限。 在 Azure 事件中心中,将 Azure Event Hubs Data Receiver
和 Azure Event Hubs Data Sender
角色分配给当前正在使用的 Microsoft Entra 帐户。 有关授予访问权限角色的详细信息,请参阅 使用 Azure 门户分配 Azure 角色 , 并使用 Microsoft Entra ID 授权访问事件中心资源。
从 Azure 事件中心发送和接收消息
使用 Azure 事件中心,可以使用 Spring Cloud Azure 发送和接收消息。
若要安装 Spring Cloud Azure Starter 模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.22.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.20.0
。 应在 pom.xml 文件的<dependencyManagement>
部分中配置此物料清单 (BOM)。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Starter 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency>
编写应用程序代码
使用以下步骤配置应用程序,以使用 Azure 事件中心生成和使用消息。
通过将以下属性添加到 application.properties 文件来配置事件中心凭据。
spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 spring.cloud.function.definition=consume;supply spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME} spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
提示
如果使用版本
spring-cloud-azure-dependencies:4.3.0
,则应添加属性spring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources
和值com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration
。由于
4.4.0
,此属性将自动添加,因此不需要手动添加。下表描述了配置中的字段:
字段 说明 spring.cloud.stream.kafka.binder.brokers
指定 Azure 事件中心终结点。 spring.cloud.stream.bindings.consume-in-0.destination
指定输入目标事件中心,在本教程中,该中心是之前创建的中心。 spring.cloud.stream.bindings.consume-in-0.group
从 Azure 事件中心指定一个使用者组,你可以将其设置为 $Default
,以便使用创建 Azure 事件中心实例时创建的基本使用者组。spring.cloud.stream.bindings.supply-out-0.destination
指定输出目标事件中心,在本教程中,该目标与输入目标相同。 注意
如果启用自动主题创建,请确保添加配置项目
spring.cloud.stream.kafka.binder.replicationFactor
,并将值设置为至少1
。 有关详细信息,请参阅 Spring Cloud Stream Kafka Binder 参考指南。编辑启动类文件以显示以下内容。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; @SpringBootApplication public class EventHubKafkaBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(EventHubKafkaBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->LOGGER.info("New message received: '{}'", message.getPayload()); } @Override public void run(String... args) { many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST); } }
提示
在本教程中,配置或代码中没有身份验证操作。 但连接到 Azure 服务需要进行身份验证。 要完成身份验证,需要使用 Azure 标识。 Spring Cloud Azure 使用 Azure 标识库提供的
DefaultAzureCredential
来帮助获取凭据,而无需更改任何代码。DefaultAzureCredential
支持多种身份验证方法,并确定应在运行时使用哪种方法。 通过这种方法,你的应用可在不同环境(例如本地与生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 DefaultAzureCredential。若要在本地开发环境中完成身份验证,可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 有关详细信息,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用用户分配的托管标识。 有关详细信息,请参阅 什么是 Azure 资源的托管标识?
启动应用程序。 类似以下示例的消息将被发布在你的应用程序日志中:
Kafka version: 3.0.1 Kafka commitId: 62abe01bee039651 Kafka startTimeMs: 1622616433956 New message received: 'Hello World'
部署到 Azure Spring Apps
现在,你已在本地运行 Spring Boot 应用程序,是时候将其转移到生产环境了。 使用 Azure Spring Apps 可以轻松地将 Spring Boot 应用程序部署到 Azure,而无需进行任何代码更改。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅 将第一个应用程序部署到 Azure Spring Apps。