使用 RabbitMQ 发送消息
编写用于创建队列、发送消息和从 RabbitMQ 接收消息的代码非常简单。 在 .NET Aspire 解决方案中,还有助于创建 RabbitMQ 容器并从微服务建立连接。
在你任职的户外设备零售商中,你决定将 RabbitMQ 作为面向客户的产品目录网站的集中式消息代理进行实施。 你想要使用 .NET Aspire RabbitMQ 集成来管理此代理及其队列。
在本单元中,你将了解如何创建 RabbitMQ 容器并使用它来发送和接收消息。
使用 .NET Aspire RabbitMQ 集成
从 .NET 使用 RabbitMQ 时,通常需要使用连接字符串创建 ConnectionFactory
对象,然后使用它连接到服务。 在 .NET Aspire 项目中,更易于管理 RabbitMQ 连接,因为:
- 你在 AppHost 项目中注册连接和连接字符串。
- 将对服务的引用传递给使用的项目时,它们可以使用依赖项注入来与 RabbitMQ 建立连接。 它们不需要创建和配置自己的连接。
在应用托管进程中配置 RabbitMQ
在 .NET Aspire 中,必须在应用托管进程中安装 Rabbit MQ 托管集成:
dotnet add package Aspire.Hosting.RabbitMQ
现在,可以注册 RabbitMQ 服务并将其传递给使用它的项目:
// Service registration
var rabbit = builder.AddRabbitMQ("messaging");
// Service consumption
builder.AddProject<Projects.CatalogAPI>()
.WithReference(rabbit);
AppHost
管理解决方案中所有项目的连接。
配置 Rab
接下来,将 .NET Aspire RabbitMQ 集成添加到使用它的每个项目:
dotnet add package Aspire.RabbitMQ.Client
若要获取对 RabbitMQ 消息代理的引用,请调用 AddRabbitMQClient()
方法:
builder.AddRabbitMQClient("messaging");
现在,可以使用依赖项注入来获取与 RabbitMQ 的连接:
public class CatalogAPI(IConnection rabbitConnection)
{
// Send and receive messages here
}
通过连接,下一步是创建消息通道,如下所示:
var channel = connection.CreateModel();
发送消息
拥有消息通道后,可以使用它来设置消息拓扑的队列、交换和其他集成。 例如,若要创建队列,请使用以下代码:
channel.QueueDeclare(queue: "catalogEvents",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
使用 BasicPublish
方法将消息发送到此队列,但该消息希望正文为字节数组:
var body = Encoding.UTF8.GetBytes("Getting all items in the catalog.");
channel.BasicPublish(exchange: string.Empty,
routingKey: "catalogEvents",
basicProperties: null,
body: body);
接收消息
在接收集成中,以与发送方相同的方式创建消息通道和队列。 请确保队列名称与在发送集成中创建的队列名称匹配。 否则,你将创建两个单独的队列,消息不会到达正确的目标。
必须创建新的 EventingBasicConsumer()
方法并注册一个方法来处理 Received
事件:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ProcessMessageAsync;
消息处理程序使用 BasicDeliverEventArgs
对象来获取消息的属性,包括消息正文。 务必反序列化消息正文:
private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs args)
{
string messagetext = Encoding.UTF8.GetString(args.Body.ToArray());
logger.LogInformation("The message is: {text}", messagetext);
}
最后,要检查队列中是否有新消息,请调用 BasicConsume()
方法;
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);