使用 Azure Cosmos DB 的事务发件箱模式

Azure Cosmos DB
Azure 服务总线
Azure Functions

在分布式系统中实现可靠的消息传送可能很有挑战性。 本文介绍如何使用事务发件箱模式来可靠消息传递和保证事件传递,这是支持 幂等消息处理的重要组成部分。 为此,你将使用 Azure Cosmos DB 事务批处理和更改源与 Azure 服务总线结合使用。

概述

微服务体系结构越来越受欢迎,并承诺解决可伸缩性、可维护性和敏捷性等问题,尤其是在大型应用程序中。 但是,在数据处理方面,这种体系结构模式也带来了挑战。 在分布式应用程序中,每个服务独立维护在专用服务拥有的数据存储中作所需的数据。 为了支持此类方案,通常使用 RabbitMQ、Kafka 或 Azure 服务总线等消息传送解决方案,该解决方案通过消息传递总线将数据(事件)从一个服务分发到应用程序的其他服务。 然后,内部或外部使用者可以订阅这些消息,并在作数据后立即收到更改通知。

该区域中的一个已知示例是排序系统:当用户想要创建订单时, Ordering 服务将通过 REST 终结点从客户端应用程序接收数据。 它将有效负载映射到对象的内部表示形式 Order ,以验证数据。 成功提交数据库后,它会将 OrderCreated 事件发布到消息总线。 对新订单(例如InventoryInvoicing或服务)感兴趣的任何其他服务都将订阅OrderCreated消息、处理消息并将其存储在自己的数据库中。

以下伪代码显示此过程通常从 Ordering 服务的角度来看如何显示:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

此方法在保存订单对象和发布相应事件之间发生错误之前效果良好。 由于许多原因,此时发送事件可能会失败:

  • 网络错误
  • 消息服务中断
  • 主机故障

无论错误是什么,结果是事件 OrderCreated 无法发布到消息总线。 不会通知其他服务已创建订单。 该服务 Ordering 现在必须处理与实际业务流程无关的各种事项。 它需要跟踪在消息总线重新联机后仍需要放在消息总线上的事件。 即使是最坏的情况也可能发生:由于事件丢失,应用程序中的数据不一致。

显示没有事务发件箱模式的事件处理的关系图。

解决方案

有一种称为 事务发件箱 的已知模式,可以帮助你避免这些情况。 在事件最终推送到消息中转站之前,它可确保事件保存在数据存储中(通常保存在数据库中的发件箱表中)。 如果业务对象和相应的事件保存在同一数据库事务中,则保证不会丢失任何数据。 将提交所有内容,或者如果出现错误,所有内容都将回滚。 为了最终发布事件,不同的服务或工作进程会查询发件箱表以获取未经处理的条目,发布事件,并将其标记为已处理。 此模式可确保在创建或修改业务对象后不会丢失事件。

此图显示了使用事务发件箱模式的事件处理以及用于将事件发布到消息代理的中继服务。

下载此体系结构的 Visio 文件

在关系数据库中,模式的实现非常简单。 例如,如果服务使用 Entity Framework Core,它将使用 Entity Framework 上下文创建数据库事务、保存业务对象和事件以及提交事务或执行回滚。 此外,处理事件的辅助角色服务很容易实现:它定期查询发件箱表以获取新条目,将新插入的事件发布到消息总线,最后将这些条目标记为已处理。

在实践中,事情并不像他们一样容易看。 最重要的是,需要确保保留事件的顺序,以便 OrderUpdated 事件在事件之前 OrderCreated 不会发布。

Azure Cosmos DB 中的实现

本部分介绍如何在 Azure Cosmos DB 中实现事务发箱模式,以便在 Azure Cosmos DB 更改源和服务总线的帮助下实现不同服务之间的可靠有序消息传送。 它演示了一个管理Contact对象(FirstName、、LastNameEmailCompany信息等)的示例服务。 它使用命令和查询责任分离(CQRS)模式,并遵循基本的域驱动设计(DDD)概念。 可以在 GitHub 上找到实现的示例代码。

Contact示例服务中的对象具有以下结构:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

创建或更新后 Contact ,它会发出包含有关当前更改的信息的事件。 等等,域事件可以是:

  • ContactCreated。 添加联系人时引发。
  • ContactNameUpdated。 在更改LastName或更改时FirstName引发。
  • ContactEmailUpdated。 更新电子邮件地址时引发。
  • ContactCompanyUpdated。 更改任何公司属性时引发。

事务批处理

若要实现此模式,需要确保 Contact 业务对象和相应的事件将保存在同一数据库事务中。 在 Azure Cosmos DB 中,事务的工作方式不同于关系数据库系统中的事务。 Azure Cosmos DB 事务(称为 事务批处理)在单个 逻辑分区上运行,因此它们保证原子性、一致性、隔离和持续性(ACID)属性。 不能在不同的容器或逻辑分区的事务批处理作中保存两个文档。 对于示例服务,这意味着业务对象和事件或事件都将放在同一容器和逻辑分区中。

上下文、存储库和 UnitOfWork

示例实现的核心是一个 容器上下文 ,用于跟踪保存在同一事务批处理中的对象。 它维护已创建和修改的对象列表,并在单个 Azure Cosmos DB 容器上运行。 它的接口如下所示:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

容器上下文组件中的列表跟踪 ContactDomainEvent 对象。 两者都将放在同一容器中。 这意味着,多种类型的对象存储在同一 Type Azure Cosmos DB 容器中,并使用属性来区分业务对象和事件。

对于每种类型,都有一个专用存储库来定义和实现数据访问。 存储库 Contact 接口提供以下方法:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

存储库 Event 看起来类似,但只有一种方法,该方法在存储中创建新事件:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

这两个存储库接口的实现通过依赖项注入获取对单个 IContainerContext 实例的引用,以确保两者在同一 Azure Cosmos DB 上下文上运行。

最后一个组件是 UnitOfWork将实例中保留的 IContainerContext 更改提交到 Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

事件处理:创建和发布

每次创建、修改或(软)删除对象时 Contact ,服务都会引发相应的事件。 提供的解决方案的核心是域驱动设计(DDD)和 吉米·博加德提出的调解人模式的组合。 他建议在将实际对象保存到数据库之前,维护由于域对象的修改而发生的事件列表,并发布这些事件。

更改列表保留在域对象本身中,以便其他组件无法修改事件链。 在域对象中维护事件(IEvent 实例)的行为通过接口 IEventEmitter<IEvent> 定义,并在抽象 DomainEntity 类中实现:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Contact 对象引发域事件。 实体 Contact 遵循基本的 DDD 概念,将域属性的 setter 配置为私有。 类中不存在公共资源库。 而是提供作内部状态的方法。 在这些方法中,可以引发特定修改的适当事件(例如 ContactNameUpdatedContactEmailUpdated)。

下面是联系人姓名更新的示例。 (在方法末尾引发该事件。

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

跟踪更改的对应 ContactNameUpdatedEvent项如下所示:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

到目前为止,事件刚刚记录在域对象中,任何事件都不会保存到数据库,甚至发布到消息中转站。 按照建议,在将业务对象保存到数据存储之前,将立即处理事件列表。 在这种情况下,它发生在SaveChangesAsync在专用RaiseDomainEvents方法中实现的IContainerContext实例方法中。 (dObjs 是容器上下文的跟踪实体的列表。

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

在最后一行中, MediatR 包(C# 中中介模式的实现)用于在应用程序中发布事件。 这样做是可能的,因为所有事件(如 ContactNameUpdatedEvent 实现 INotification MediatR 包的接口)。

这些事件需要由相应的处理程序处理。 在这里, IEventsRepository 实现将发挥作用。 下面是事件处理程序的示例 NameUpdated

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

实例 IEventRepository 通过构造函数注入处理程序类。 在服务中发布方法 ContactNameUpdatedEvent 后, Handle 将调用该方法,并使用事件存储库实例创建通知对象。 该通知对象又插入到对象中的 IContainerContext 跟踪对象列表中,并将保存在同一事务批处理中的对象联接到 Azure Cosmos DB。

到目前为止,容器上下文知道要处理的对象。 为了最终将跟踪的对象保存到 Azure Cosmos DB, IContainerContext 实现将创建事务批处理,添加所有相关对象,并针对数据库运行作。 所描述的过程在方法中 SaveInTransactionalBatchAsync 进行处理,该方法调用 SaveChangesAsync 该方法。

下面是需要创建和运行事务批处理的实现的重要部分:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

下面概述了该过程到目前为止的工作原理(用于更新联系人对象的名称):

  1. 客户端想要更新联系人的名称。 该方法 SetName 在联系人对象上调用,并更新属性。
  2. ContactNameUpdated 事件将添加到域对象中的事件列表中。
  3. 调用联系人存储库 Update 的方法,该方法将域对象添加到容器上下文中。 对象现在被跟踪。
  4. CommitAsync 在实例上 UnitOfWork 调用,而实例又调用 SaveChangesAsync 容器上下文。
  5. 在其中SaveChangesAsync,域对象列表中的所有事件都由实例发布,并通过事件存储库添加到同一MediatR容器上下文。
  6. 在其中 SaveChangesAsync创建了 a TransactionalBatch 。 它将同时保存联系人对象和事件。
  7. 运行 TransactionalBatch 和数据已提交到 Azure Cosmos DB。
  8. SaveChangesAsyncCommitAsync 成功返回。

持久性

如前面的代码片段所示,保存到 Azure Cosmos DB 的所有对象都包装在实例 DataObject 中。 此对象提供常见属性:

  • ID
  • PartitionKey
  • Type
  • State。 同样 CreatedUpdated 不会保留在 Azure Cosmos DB 中。
  • Etag。 对于 乐观锁定
  • TTL。 用于自动清理旧文档的生存时间属性。
  • Data。 泛型数据对象。

这些属性在一 IDataObject 个泛型接口中定义,该接口由存储库和容器上下文使用:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

然后,包装在实例中 DataObject 并保存到数据库的对象将如下所示(ContactContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

可以看到 ContactContactNameUpdatedEvent 类型 domainEvent)文档具有相同的分区键,并且这两个文档将保存在同一逻辑分区中。

更改源处理

若要读取事件流并将其发送到消息代理,该服务将使用 Azure Cosmos DB 更改源

更改源是容器中更改的持久日志。 它在后台运行并跟踪修改。 在一个逻辑分区中,保证更改的顺序。 读取更改源的最方便方法是将 Azure 函数与 Azure Cosmos DB 触发器配合使用。 另一种选择是使用 更改源处理器库。 它允许将更改源处理作为后台服务(通过接口)集成到 Web API 中 IHostedService 。 此处的示例使用一个简单的控制台应用程序来实现抽象类 BackgroundService ,以在 .NET Core 应用程序中托管长时间运行的后台任务。

若要从 Azure Cosmos DB 更改源接收更改,需要实例化对象 ChangeFeedProcessor 、注册消息处理的处理程序方法,并开始侦听更改:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

然后,处理程序方法(HandleChangesAsync 此处)处理消息。 在此示例中,事件发布到服务总线主题,该主题已分区以实现可伸缩性,并 启用了重复功能。 任何对对象更改 Contact 感兴趣的服务都可以订阅该服务总线主题,并接收和处理其自己的上下文的更改。

生成的服务总线消息具有一个 SessionId 属性。 在服务总线中使用会话时,可以保证消息的顺序会保留(首先先出(FIFO)。 保留顺序对于此用例是必需的。

下面是处理来自更改源的消息的代码片段:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

错误处理

如果在处理更改时出错,更改源库将在成功处理最后一批的位置重启读取消息。 例如,如果应用程序成功处理了 10,000 条消息,现在正在处理批处理 10,001 到 10,025,并且发生错误,则它可以重启并选取其工作位置为 10,001。 该库通过 Azure Cosmos DB 容器中 Leases 保存的信息自动跟踪已处理的内容。

服务可能已将一些已重新处理的消息发送到服务总线。 通常,这种情况会导致消息处理重复。 如前所述,服务总线具有重复消息检测的功能,需要为此方案启用该功能。 服务会根据消息的应用程序控制 MessageId 属性检查是否已将消息添加到服务总线主题(或队列)。 该属性设置为 ID 事件文档。 如果同一消息再次发送到服务总线,该服务将忽略并删除它。

保养工作

在典型的事务发件箱实现中,服务将更新已处理的事件,并将属性Processedtrue设置为,指示消息已成功发布。 可以在处理程序方法中手动实现此行为。 在当前方案中,无需执行此类过程。 Azure Cosmos DB 跟踪使用更改源(与 Leases 容器结合使用)处理的事件。

最后一步,有时需要从容器中删除事件,以便仅保留最新的记录/文档。 为了定期进行清理,实现将应用 Azure Cosmos DB 的另一项功能:生存时间(TTL)文档。 Azure Cosmos DB 可以根据可添加到文档的属性自动删除文档 TTL :时间跨度(以秒为单位)。 该服务会不断检查容器中是否有属性 TTL 的文档。 文档过期后,Azure Cosmos DB 会将其从数据库中删除。

当所有组件按预期工作时,将快速处理和发布事件:几秒钟内。 如果 Azure Cosmos DB 中出现错误,则不会将事件发送到消息总线,因为业务对象和相应的事件都无法保存到数据库。 唯一需要考虑的一点是,当后台工作器(更改源处理器)或服务总线不可用时,对文档设置适当的 TTLDomainEvent 。 在生产环境中,最好选择多天的时间跨度。 例如,10 天。 然后,所涉及的所有组件将有足够的时间来处理/发布应用程序中的更改。

概要

事务发件箱模式解决了在分布式系统中可靠地发布域事件的问题。 通过将业务对象的状态及其事件提交在同一事务批处理中,并使用后台处理器作为消息中继,可确保其他服务(内部或外部)最终会收到它们依赖的信息。 此示例不是事务发件箱模式的传统实现。 它使用 Azure Cosmos DB 更改源和生存时间等功能,使事情简单干净。

下面是本方案中使用的 Azure 组件的摘要:

此图显示了使用 Azure Cosmos DB 和 Azure 服务总线实现事务发件箱的 Azure 组件。

下载此体系结构的 Visio 文件

此解决方案的优点包括:

  • 可靠的消息传送和事件的保证传送。
  • 通过服务总线保留事件和消息重复顺序。
  • 无需维护指示成功处理事件文档的额外 Processed 属性。
  • 通过生存时间从 Azure Cosmos DB 中删除事件(TTL)。 此过程不使用处理用户/应用程序请求所需的请求单位。 而是在后台任务中使用“剩余”请求单位。
  • 通过 ChangeFeedProcessor (或 Azure 函数)对消息进行错误验证处理。
  • 可选:多个更改源处理器,每个处理器都在更改源中维护自己的指针。

注意事项

本文中讨论的示例应用程序演示如何使用 Azure Cosmos DB 和服务总线在 Azure 上实现事务发件箱模式。 还有其他使用 NoSQL 数据库的方法。 若要保证业务对象和事件将可靠地保存在数据库中,可以在业务对象文档中嵌入事件列表。 此方法的缺点是清理过程需要更新包含事件的每个文档。 与使用 TTL 相比,这并不理想,尤其是在请求单位成本方面。

请记住,不应考虑此处生产就绪代码提供的示例代码。 它对多线程处理有一些限制,尤其是类中 DomainEntity 处理事件的方式以及对象在实现中的 CosmosContainerContext 跟踪方式。 将其用作你自己的实现的起点。 或者,请考虑使用已内置于其中的现有库,例如 NServiceBusMassTransit

部署此方案

可以在 GitHub 上找到源代码、部署文件和测试此方案的说明: https://github.com/mspnp/transactional-outbox-pattern

供稿人

本文由Microsoft维护。 它最初是由以下贡献者撰写的。

主要作者:

要查看非公开的 LinkedIn 个人资料,请登录到 LinkedIn。

后续步骤

请查看以下文章了解详细信息: