你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

事件复制任务模式

联合概述复制器函数概述介绍了复制任务的基本原理和基本元素,建议在继续本文之前熟悉它们。

在本文中,我们详细介绍了概述部分中突出显示的几种模式的实现指南。

重复

复制模式将事件从一个事件中心复制到下一个事件中心,或从事件中心复制到其他一些目标,例如服务总线队列。 事件将被转发,而不对事件有效负载进行任何修改。

此模式的实现涵盖了 事件中心之间的事件复制事件中心与服务总线之间的事件复制 示例,以及 将 Apache Kafka MirrorMaker 与事件中心配合使用 教程,特别是在将 Apache Kafka 中转站中的数据复制到事件中心的情况下。

流和顺序保留

复制(通过 Azure Functions 或 Azure 流分析)并不旨在确保将源事件中心的确切 1:1 克隆创建到目标事件中心,但侧重于保留应用程序需要它的相对事件顺序。 应用程序通过将相关事件与同一分区键分组, 事件中心按顺序在同一分区中排列具有相同分区键的消息来传达这一点。

重要

对于每个事件中心,“偏移量”信息是唯一的,相同事件的偏移量在事件中心实例之间将有所不同。 若要在复制的事件流中查找某个位置,请使用基于时间的偏移,并参考传播的服务分配元数据

基于时间的偏移在特定时间点启动接收器:

  • EventPosition.FromStart() - 再次读取所有保留的数据。
  • EventPosition.FromEnd() - 从连接时读取所有新数据。
  • EventPosition.FromEnqueuedTime(dateTime) - 从给定日期和时间开始的所有数据。

在 EventProcessor 中,你可以通过 EventProcessorOptions 中的 InitialOffsetProvider 来设置位置。 使用其他接收方 API 时,位置将通过构造函数传递。

预生成的复制函数帮助程序作为示例 提供,在基于 Azure Functions 的指南中,这些帮助程序确保从源分区检索到的具有相同分区键的事件流,以批处理的形式提交到目标事件中心,同时保持原始流和相同的分区键。

如果源和目标事件中心的分区计数相同,则目标中的所有流将映射到与源中相同的分区。 如果分区计数不同,这在以下所述的一些进一步模式中很重要,则映射将有所不同,但流始终一起并按顺序排列。

属于不同流或没有目标分区中分区键的独立事件的事件的相对顺序可能始终不同于源分区。

由服务分配的元数据

从源事件中心获取的事件的服务分配元数据、原始排队时间、序列号和偏移量由目标事件中心的新服务分配值替换,但借助帮助程序函数、示例中提供的复制任务,原始值将保留在用户属性中: repl-enqueue-time (ISO8601字符串),repl-sequencerepl-offset

这些属性的类型为字符串,并包含相应原始属性的字符串化值。 如果事件被多次转发,则由服务分配的最近来源的元数据会被追加到已存在的属性中,值用分号分隔。

故障转移

如果将复制用于灾难恢复目的、防范事件中心服务中发生区域性的可用性事件或防范网络中断,在出现任何此类情景时,都需要执行从一个事件中心到下一个事件中心的故障转移,并告知生成者和/或使用者使用次要终结点。

对于所有故障转移方案,假定命名空间的必需元素在结构上是相同的,这意味着事件中心和使用者组的名称相同,并且共享访问签名规则和/或基于角色的访问控制规则以相同的方式设置。 可以通过遵循有关移动命名空间的指导(省略清理步骤)来创建(和更新)辅助命名空间。

若要强制生产者和消费者进行更改,需要在一个易于访问和更新的位置中提供关于使用哪个命名空间进行查找的信息。 如果生成者或使用者遇到频繁或持久性错误,则应咨询该位置并调整其配置。 有许多方法可以共享该配置,但我们在以下两种方法中指出:DNS 和文件共享。

基于 DNS 的故障转移配置

一种候选方法是将 DNS SRV 记录中的信息保存在你控制的 DNS 中,并指向相应的事件中心终结点。

重要

请注意,事件中心不允许其终结点直接使用 CNAME 记录进行别名,这意味着你将使用 DNS 作为终结点地址的弹性查找机制,而不是直接解析 IP 地址信息。

假设你拥有域 example.com,而你的应用程序拥有区域 test.example.com。 对于两个备用事件中心,你现在将创建另外两个嵌套区域,并在每个区域中创建一个 SRV 记录。

SRV 记录遵循常见约定,其 _azure_eventhubs._amqp 前缀对应两个终结点记录:一个用于端口 5671 上的 AMQP-over-TLS,另一个用于端口 443 上的 AMQP-over-WebSockets。这两个记录都指向与该区域对应的命名空间的事件中心终结点。

区域 SRV 记录
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

在应用程序的区域中,你将创建一个 CNAME 条目,该条目指向对应于主要事件中心的从属区域:

CNAME 记录 别名
eventhub.test.example.com eh1.test.example.com

使用允许显式查询 CNAME 和 SRV 记录的 DNS 客户端(Java 和 .NET 的内置客户端仅允许将名称简单解析为 IP 地址),然后可以解析所需的终结点。 例如,对于 DnsClient.NET,查找函数为:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

该函数返回为当前别名为 CNAME 的区域的端口 5671 注册的目标主机名,如上所示。

执行故障转移需要编辑 CNAME 记录并将其指向备用区域。

使用 DNS(特别是 Azure DNS)的优点是,Azure DNS 信息是全局复制的,因此可以抵御单区域服务中断。

此过程类似于事件中心异地灾难恢复的工作方式,但完全在你自己的控制下,并且也适用于主动/主动方案。

基于文件共享的故障转移配置

使用 DNS 共享终结点信息的最简单替代方法是将主终结点的名称放入纯文本文件中,并从基础结构中为文件提供服务,该基础结构具有可靠的中断性,但仍允许更新。

如果已运行具有全局可用性和内容复制的高可用性网站基础结构,请在其中添加此类文件,并在需要切换时重新发布该文件。

谨慎

应仅以这种方式发布终结点名称,而不是包含机密的完整连接字符串。

有关故障转移使用者的其他注意事项

对于事件中心使用者,故障转移策略的其他注意事项取决于事件处理程序的需求。

如果发生灾难后需要基于备份数据重新构建系统(包括数据库),而数据库中的数据是直接馈送的,或者是通过对事件中心内保存的事件进行中间处理来馈送的,那么,你可以还原备份,然后开始将事件重播到系统,这些事件是创建数据库备份的那一刻,而不是从销毁原始系统的那一刻生成的。

如果故障只影响到系统的某个切片或者确实只影响了一个不可访问的事件中心,则你可以从处理中断的大致位置继续处理事件。

若要实现任一方案并使用相应 Azure SDK 的事件处理程序, 将创建新的检查点存储 ,并根据要从中恢复处理的 时间戳 提供初始分区位置。

如果你仍可以访问从中切换出去的事件中心的检查点存储,则上述传播的元数据将帮助你跳过已处理的事件,并从上次离开的精确位置恢复处理。

合并

合并模式有一个或多个指向一个目标的复制任务,可能与常规生成者同时向同一目标发送事件。

这些模式的变体包括:

  • 两个或多个复制函数同时从单独的源获取事件并将其发送到同一目标。
  • 另外一个复制函数从源获取事件,而目标也由生成者直接使用。
  • 之前提到的模式,但在两个或多个 Event Hubs 之间进行镜像,使这些 Event Hubs 包含相同的流,无论事件在哪生成。

前两种模式变体是微不足道的,与普通复制任务不同。

最后一个方案要求排除已复制的事件,防止将其再次复制。 此技术在 EventHubToEventHubMerge 示例中进行了演示和说明。

编辑

编辑器模式基于 复制 模式生成,但在转发消息之前会对其进行修改。

此类修改的示例包括:

  • 转码 - 如果事件内容(也称为“正文”或“有效负载”)从使用 Apache Avro 格式或某种专有序列化格式进行编码的源到达,但拥有目标的系统预期是让内容进行 JSON 编码,转码复制任务首先将 Apache Avro 的有效负载反序列化为内存中对象图,然后将该图序列化为 JSON 正在转发的事件的格式。 转码还包括 内容压缩 和解压缩任务。
  • 转换 - 包含结构化数据的事件可能需要重塑该数据,以便下游使用者更容易使用。 这可能涉及平展嵌套结构、修剪多余的数据元素或重新调整有效负载以完全适合给定架构等工作。
  • 批处理 - 事件可以批量接收(单个传输中的多个事件),但必须单向目标转发,反之亦然。 因此,任务可以基于单个输入事件传输来转发多个事件,或者聚合一组事件一起传输。
  • 验证 - 通常需要检查来自外部源的事件数据是否符合一组规则,然后才能转发这些规则。 规则可以使用架构或代码表示。 如果发现不合规的事件可能会被删除,并且日志中记录了问题,或者可能会转发到特殊目标目标以进一步处理它们。
  • 扩充 - 来自某些源的事件数据可能需要使用进一步上下文进行扩充,以便在目标系统中可用。 这可能涉及查找引用数据和嵌入该数据与事件,或添加有关复制任务已知的源的信息,但不包含在事件中。
  • 筛选 - 根据某些规则,来自源头的某些事件可能需要被阻止,不传递给目标。 筛选器针对规则测试事件,如果事件与规则不匹配,则删除该事件。 通过观察某些条件并删除具有相同值的后续事件来筛选重复事件是一种筛选形式。
  • 加密 - 复制任务可能必须解密从源和/或加密转发到目标的内容,/或它可能需要验证内容和元数据相对于事件中携带的签名的完整性,或附加此类签名。
  • 证明 - 复制任务可能会将元数据(可能受数字签名保护)附加到证明事件已通过特定通道或特定时间接收的事件。
  • 链接 - 复制任务可以将签名应用于事件流,从而保护流的完整性并检测到缺失的事件。

转换、批处理和扩充模式通常最好使用 Azure 流分析 作业实现。

所有这些模式都可以使用 Azure Functions 实现,使用 事件中心触发器 获取事件以及用于传送事件的 事件中心输出绑定

路线规划

路由模式基于 复制 模式构建,但复制任务没有创建一个源和一个目标,而是在 C# 中演示了多个目标:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

路由函数将考虑消息元数据和/或消息有效负载,然后选择要发送到的可用目标之一。

在 Azure 流分析中,可以通过定义多个输出,然后为每个输出执行查询来实现相同的目的。

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

日志投影

日志投影模式将事件流平展到索引数据库,事件将成为数据库中的记录。 通常,事件会添加到同一集合或表中,事件中心分区键将成为主键的一部分,用于使记录保持唯一。

日志投影可以生成事件数据的时间序列记录,或者提供一种简化视图,仅保留每个分区键的最新事件。 目标数据库的形状最终由你和应用程序的需求决定。 此模式也称为“事件溯源”。

小窍门

可以轻松地在 Azure 流分析中创建 Azure SQL 数据库Azure Cosmos DB 中的日志投影,并且应该首选该选项。

以下 Azure Function 将压缩后的 Azure 事件中心内容投影到 Azure Cosmos DB 集合中。

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

后续步骤