Azure 服务总线 JMS 2.0 开发人员指南

本指南包含详细信息,可帮助你使用 Java 消息服务 (JMS) 2.0 API 成功与 Azure 服务总线通信。

作为 Java 开发人员,如果你不熟悉 Azure 服务总线,请考虑阅读以下文章。

入门指南 概念

Java 消息服务 (JMS) 编程模型

Java 消息服务 API 编程模型如以下部分所示:

注释

Azure 服务总线高级层 支持 JMS 1.1 和 JMS 2.0。

Azure 服务总线 - 标准 层支持有限的 JMS 1.1 功能。 有关更多详细信息,请参阅 本文档

JMS - 基础构件

以下构建基块可用于与 JMS 应用程序通信。

注释

以下指南已改编自 Oracle Java EE 6 Java 消息服务教程(JMS)

建议参考本教程,以便更好地了解 Java 消息服务(JMS)。

连接工厂

连接工厂对象由客户端用来与 JMS 提供程序连接。 连接工厂封装由管理员定义的一组连接配置参数。

每个连接工厂都是 ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory 接口的一个实例。

为了简化与 Azure 服务总线的连接,这些接口将分别通过 ServiceBusJmsConnectionFactoryServiceBusJmsQueueConnectionFactoryServiceBusJmsTopicConnectionFactory 来实现。

重要

利用 JMS 2.0 API 的 Java 应用程序可以通过连接字符串或使用TokenCredential借助 Microsoft Entra 支持的身份验证,连接到 Azure 服务总线。 使用 Microsoft Entra 支持的身份验证时,请确保根据需要 向标识分配角色和权限

在 Azure 上创建 系统分配的托管标识 ,并使用此标识创建一个 TokenCredential

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

然后,可以使用以下参数实例化连接工厂。

  • 令牌凭据 - 表示能够提供 OAuth 令牌的凭据。
  • 主机 - Azure 服务总线高级层命名空间的主机名。
  • ServiceBusJmsConnectionFactorySettings 属性包,其中包含
    • connectionIdleTimeoutMS - 空闲连接超时(以毫秒为单位)。
    • traceFrames - 布尔标志,用于收集 AMQP 跟踪帧以进行调试。
    • 其他配置参数

可以按此处所示创建工厂。 令牌凭据和主机是必需参数,但其他属性是可选的。

String host = "<YourNamespaceName>.servicebus.windows.net";
ConnectionFactory factory = new ServiceBusJmsConnectionFactory(tokenCredential, host, null); 

JMS 目标

目标是客户端用来指定它生成的消息的目标以及它使用的消息源的对象。

目标映射到 Azure 服务总线中的实体 - 队列(点到点方案中)和主题(发布-订阅方案中)。

连接

连接封装与 JMS 提供程序的虚拟连接。 在使用 Azure 服务总线时,它表示应用程序与 Azure 服务总线之间通过 AMQP 的有状态连接。

从连接工厂创建连接,如以下示例所示:

Connection connection = factory.createConnection();

会议

会话是用于生成和使用消息的单线程上下文。 它可用于创建消息、消息生成者和使用者,但它还提供事务上下文,以允许将发送和接收分组到原子工作单元中。

可以从连接对象创建会话,如以下示例所示:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

注释

JMS API 不支持从启用了消息会话的服务总线队列或主题接收消息。

会话模式

可以使用以下任一模式创建会话。

会话模式 行为
Session.AUTO_ACKNOWLEDGE 会话会在两种情况下自动确认客户端收到消息:一是当会话成功从接收调用中返回时,二是当会话调用的消息监听器成功处理完消息返回时。
Session.CLIENT_ACKNOWLEDGE 客户端通过调用消息的确认方法来确认已使用的消息。
Session.DUPS_OK_ACKNOWLEDGE 此确认模式指示会话延迟确认消息送达。
Session.SESSION_TRANSACTED 此值可以作为参数传递给 Connection 对象上的方法 createSession(int sessionMode),以指定会话应使用本地事务。

如果未指定会话模式,则默认选取 Session.AUTO_ACKNOWLEDGE

JMSContext

注释

JMSContext 定义为 JMS 2.0 规范的一部分。

JMSContext 结合了连接和会话对象提供的功能。 可以根据连接工厂对象创建它。

JMSContext context = connectionFactory.createContext();

JMSContext 模式

Session 对象一样,可以使用 与会话模式中提到的相同确认模式创建 JMSContext。

JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);

如果未指定模式,则默认选取 JMSContext.AUTO_ACKNOWLEDGE

JMS 消息生成方

消息生成者是使用 JMSContext 或会话创建的对象,用于将消息发送到目标。

它可以创建为独立对象,如以下示例所示:

JMSProducer producer = context.createProducer();

或者在需要发送消息时在运行时创建。

context.createProducer().send(destination, message);

JMS 消息使用者

消息使用者是由 JMSContext 或会话创建的对象,用于接收发送到目标的消息。 可以按以下示例所示创建它:

JMSConsumer consumer = context.createConsumer(dest);

通过 receive() 方法同步接收

消息消费者通过receive() 方法提供一种从目标接收消息的同步方式。

如果未指定任何参数/超时,或者超时值指定为“0”,那么消费者会无限期阻塞,除非消息到达或连接中断(以较早者为准)。

Message m = consumer.receive();
Message m = consumer.receive(0);

如果提供非零正参数,则计时器过期之前会一直阻止使用者。

Message m = consumer.receive(1000); // time out after one second.

使用 JMS 消息侦听器进行异步接收

消息侦听器是一个对象,用于对目标上的消息进行异步处理。 它实现 MessageListener 接口,该接口包含必须存在特定业务逻辑的 onMessage 方法。

必须使用 setMessageListener 方法,实例化并向特定的消息消费者注册消息侦听器对象。

Listener myListener = new Listener();
consumer.setMessageListener(myListener);

使用主题

JMS 消息使用者 是针对目标创建的,该目标可以是队列或主题。

队列上的使用者只是客户端对象,它们位于客户端应用程序与 Azure 服务总线之间的会话(和连接)上下文中。

但主题使用者则包含 2 个部分 -

  • 会话(或 JMSContext)上下文中存在的客户端对象
  • Azure 服务总线中的订阅实体。

此处介绍了订阅,它可以是以下内容之一:

  • 共享持久订阅
  • 共享非持久订阅
  • 非共享持久订阅
  • 非共享、非持久的订阅

JMS 队列浏览器

JMS API 提供一个 QueueBrowser 对象,允许应用程序浏览队列中的消息并显示每个消息的标头值。

可以使用 JMSContext 创建队列浏览器,如以下示例所示:

QueueBrowser browser = context.createBrowser(queue);

注释

JMS API 不提供用于浏览主题的 API。

这是因为主题本身不会存储消息。 将消息发送到主题后,它会立即转发到相应的订阅。

JMS 消息选择器

消息选择器可以通过接收应用程序来筛选收到的消息。 使用消息选择器,接收应用程序会将筛选消息的工作卸载到 JMS 提供程序(在本例中为 Azure 服务总线),而不是自行承担该责任。

创建以下任一使用者时,可以使用选择器 -

  • 共享持久订阅
  • 非共享持久订阅
  • 共享的非持久订阅
  • 未共享的非持久订阅
  • 队列浏览器

注释

服务总线选择器不支持“LIKE”和“BETWEEN”SQL 关键字。

AMQP 处置和服务总线操作映射

以下是将 AMQP 处置转换为服务总线操作的方法:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

摘要

本开发人员指南展示了如何使用 Java 消息服务 (JMS) 的 Java 客户端应用程序与 Azure 服务总线连接。

后续步骤

有关 Azure 服务总线的详细信息以及有关 Java 消息服务 (JMS) 实体的详细信息,请查看以下文章: