教程:在事件流中使用 Apache Kafka 终结点在实时智能中流式传输和使用事件

本教程介绍如何使用 Microsoft Fabric 事件流的增强功能中的自定义终结点源提供的 Apache Kafka 终结点,将事件流式传输到实时智能。 (在 Fabric 事件流的标准功能中,自定义终结点称为“自定义应用”。)你还将了解如何通过事件流的自定义终结点目标中的 Apache Kafka 终结点使用这些流式处理事件。

在本教程中,你将了解:

  • 创建事件流。
  • 从自定义终结点源获取 Kafka 终结点。
  • 使用 Kafka 应用程序发送事件。
  • 从自定义终结点目标获取 Kafka 终结点。
  • 通过 Kafka 应用程序使用事件。

先决条件

在 Microsoft Fabric 中创建 eventstream

  1. 将 Fabric 体验更改为“实时智能”

    屏幕截图显示了切换到实时智能工作负载的切换器。

  2. 请按照以下步骤之一开始创建事件流:

    • 在“实时智能”主页上的“建议创建的项”部分,选择“事件流”磁贴

      显示主页上事件流磁贴的屏幕截图。

    • 选择左侧导航栏上的“我的工作区”。 在“我的工作区”页上,依次选择“新建项”、“事件流”

      屏幕截图显示在工作区页面的“新建”菜单中找到事件流选项的位置。

  3. 输入新事件流的名称,然后选择“创建”

    屏幕截图显示“新建事件流”对话框。

  4. 在工作区中创建新 eventstream 可能需要几秒钟时间。 创建事件流 后,将跳转到主编辑器,可以在其中开始向事件流添加源。

    屏幕截图显示编辑器。

从添加的自定义终结点源检索 Kafka 终结点

若要获取 Kafka 主题终结点,需将自定义终结点源添加到事件流。 然后,Kafka 连接终结点将随时可用,并在自定义终结点源中公开。

若要将自定义终结点源添加到事件流:

  1. 如果自定义终结点为空事件流,在事件流主页上选择“使用自定义终结点”

    显示选择“使用自定义终结点”的选项的屏幕截图。

    或者,在功能区上,选择“添加源”>“自定义终结点”

    选择自定义终结点作为事件流的源的屏幕截图。

  2. 输入自定义终结点的源名称值,然后选择“添加”。

    输入自定义终结点名称的屏幕截图。

  3. 检查自定义终结点源是否以编辑模式出现在事件流的画布上,然后选择“发布”

    显示编辑模式下添加的自定义终结点的屏幕截图。

  4. 成功发布事件流后,可以检索其详细信息,包括有关 Kafka 终结点的信息。 在画布上选择自定义终结点源磁贴。 然后在自定义终结点源节点的底部窗格中,选择“Kafka”选项卡

    在“SAS 密钥身份验证”页上,可以获取以下重要的 Kafka 终结点信息

    • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
    • security.protocol=SASL_SSL
    • sasl.mechanism=PLAIN
    • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.CONNECTION.STRING}";

    {YOUR.BOOTSTRAP.SERVER} 是“SAS 密钥身份验证”页上的 Bootstrap 服务器的值{YOUR.CONNECTION.STRING} 可以是连接字符串(主密钥)的值或连接字符串(辅助密钥)的值。 选择要使用的一个。

    显示 Kafka 密钥和示例代码的屏幕截图。

    有关“SAS 密钥身份验证”和“示例代码”页的详细信息,请参阅 Kafka 终结点详细信息

使用 Kafka 应用程序发送事件

使用从上一步获取的重要 Kafka 信息,可以替换现有 Kafka 应用程序中的连接配置。 然后,可以将事件发送到事件流。

下面是一个遵循 Kafka 协议、基于 Java 编写的 Azure 事件中心 SDK 的应用程序。 若要使用此应用程序将事件流式传输到事件流,请使用以下步骤替换 Kafka 终结点信息并正确加以执行:

  1. 克隆用于 Kafka 的 Azure 事件中心存储库

  2. 转到 azure-event-hubs-for-kafka/quickstart/java/producer

  3. 在 src/main/resources/producer.config 中更新生成者的配置详细信息,如下所示:

    • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
    • security.protocol=SASL_SSL
    • sasl.mechanism=PLAIN
    • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.CONNECTION.STRING}";

    {YOUR.BOOTSTRAP.SERVER} 替换为 Bootstrap 服务器的值。 将 {YOUR.CONNECTION.STRING} 替换为连接字符串(主密钥)的值或连接字符串(辅助密钥)的值。 选择要使用的一个。

  4. 使用 src/main/java/TestProducer.java 中的新主题名称更新主题名称,如下所示:private final static String TOPIC = "{YOUR.TOPIC.NAME}";

    可以在“Kafka”选项卡下的“SAS 密钥身份验证”页上找到 {YOUR.TOPIC.NAME}

  5. 运行生产者代码并将事件流式传输到事件流中:

    • mvn clean package
    • mvn exec:java -Dexec.mainClass="TestProducer"

    显示生成者代码的屏幕截图。

  6. 预览使用此 Kafka 应用程序发送的数据。 选择事件流节点,该节点是显示事件流名称的中间节点。

    选择 CSV 的数据格式,其中逗号为分隔符,不带标头。 此选项与应用程序流式传输事件数据的格式匹配。

    显示 Kafka 数据预览的屏幕截图。

从添加的自定义终结点目标获取 Kafka 终结点

可以添加自定义终结点目标,以获取 Kafka 连接终结点详细信息,从而使用事件流中的事件。 添加目标后,可以在实时视图中从目标的“详细信息”窗格中获取信息。

在“基本”页中,可以获取“使用者组”的值。 稍后需要此值来配置 Kafka 使用者应用程序。

在“SAS 密钥身份验证”页面中,可以获取以下重要的 Kafka 终结点信息

  • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
  • security.protocol=SASL_SSL
  • sasl.mechanism=PLAIN
  • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.CONNECTION.STRING}";

{YOUR.BOOTSTRAP.SERVER}Bootstrap 服务器的值。 {YOUR.CONNECTION.STRING} 可以是连接字符串(主密钥)的值或连接字符串(辅助密钥)的值。 选择要使用的一个。

通过 Kafka 应用程序使用事件

现在,可以使用适用于 Kafka 存储库的 Azure 事件中心中的另一个应用程序来使用事件流中的事件。 若要使用此应用程序来使用事件流中的事件,请按照以下步骤替换 Kafka 终结点详细信息,并相应地加以运行:

  1. 克隆用于 Kafka 的 Azure 事件中心存储库

  2. 转到 azure-event-hubs-for-kafka/quickstart/java/consumer

  3. 在 src/main/resources/consumer.config 中更新使用者的配置详细信息,如下所示:

    • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
    • group.id={YOUR.EVENTHUBS.CONSUMER.GROUP}
    • security.protocol=SASL_SSL
    • sasl.mechanism=PLAIN
    • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    • password="{YOUR.CONNECTION.STRING}";

    {YOUR.BOOTSTRAP.SERVER} 替换为 Bootstrap 服务器的值。 可以从自定义终结点目标的“详细信息”窗格中的“基本”页获取 {YOUR.EVENTHUBS.CONSUMER.GROUP} 的值。 将 {YOUR.CONNECTION.STRING} 替换为连接字符串(主密钥)的值或连接字符串(辅助密钥)的值。 选择要使用的一个。

  4. 在 src/main/java/TestConsumer.java 的“SAS 密钥身份验证”页上,使用新主题名称更新现有主题名称,如下所示:private final static String TOPIC = "{YOUR.TOPIC.NAME}";

    可以在“Kafka”选项卡下的“SAS 密钥身份验证”页上找到 {YOUR.TOPIC.NAME}

  5. 运行使用代码并将事件流式传输到事件流中:

    • mvn clean package
    • mvn exec:java -Dexec.mainClass="TestConsumer"

如果事件流具有传入事件(例如,以前的生成者应用程序仍在运行),请验证使用者现在是否正在从事件流主题接收事件。

显示 Kafka 传入事件的屏幕截图。

默认情况下,Kafka 使用者从流末尾读取,而不是从开头读取。 Kafka 使用者在开始运行使用者之前不会读取排队的任何事件。 如果启动使用者但未收到任何事件,请尝试在使用者轮询时再次运行生成者。

结束语

恭喜。 你已了解如何使用从事件流公开的 Kafka 终结点流式传输和使用事件流中的事件。 如果已有从 Kafka 主题发送或使用的应用程序,则可以使用同一应用程序在事件流中发送或使用事件流中的事件,而无需进行任何代码更改。 只需更改连接的配置信息。