你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用 JavaScript 向事件中心发送事件或从事件中心接收事件

在本快速入门中,你将了解如何使用 @azure/event-hubs npm 包向事件中心发送事件以及从事件中心接收事件。

如果不熟悉 Azure 事件中心,请参阅 事件中心概述 ,然后再开始。

先决条件

  • Microsoft Azure 订阅。 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。 如果没有 Azure 帐户,请注册 免费试用版
  • Node.js LTS。 下载最新的 长期支持(LTS)版本
  • Visual Studio Code(推荐)或任何其他集成开发环境 (IDE)。
  • 创建事件中心命名空间和事件中心。 使用 Azure 门户 创建事件中心类型的命名空间,获取应用程序与事件中心通信所需的管理凭据。 有关详细信息,请参阅 使用 Azure 门户创建事件中心

安装 npm 包以发送事件

若要 为事件中心安装 Node Package Manager (npm) 包,请打开其路径中的命令提示符窗口 npm 。 将目录更改为要在其中保存示例的文件夹。

运行以下命令:

npm install @azure/event-hubs
npm install @azure/identity

向 Azure 验证应用

本快速入门介绍如何通过两种方式连接到 Azure 事件中心:

  • 无密码。 使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到事件中心命名空间。 无需担心在代码、配置文件或安全存储(如 Azure Key Vault)中具有硬编码连接字符串。
  • 连接字符串。 使用连接字符串连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。

建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅 Azure 服务的服务总线身份验证和授权无密码连接

将角色分配给 Microsoft Entra 用户

在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 需要 Azure 事件中心数据所有者 角色来发送和接收消息。 若要将此角色分配给自己,您需要 "User Access Administrator" 角色或其他包含 Microsoft.Authorization/roleAssignments/write 操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅 “了解 Azure RBAC”页的范围

以下示例将 Azure Event Hubs Data Owner 角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。

Azure 事件中心的内置 Azure 角色

对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下内置角色,用于授权访问事件中心命名空间:

如果要创建自定义角色,请参阅执行事件中心操作所需的权限

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。

  2. 在概述页面上,从左侧菜单中选择“访问控制(IAM)”。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。

    显示如何分配角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,请搜索 Azure Event Hubs Data Owner 并选择匹配的结果。 然后选择“下一步”。

  6. 在“ 分配访问权限”下,选择“ 用户”、“组”或服务主体。 然后选择 “+ 选择成员”。

  7. 在对话框中,搜索Microsoft Entra 用户名(通常是 user@___domain 电子邮件地址)。 选择对话框底部的 “选择 ”。

  8. 选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。

发送事件

在本部分中,将创建一个 JavaScript 应用程序,用于将事件发送到事件中心。

  1. 打开文本编辑器,如 Visual Studio Code

  2. 创建名为 send.js的文件。 将以下代码粘贴到其中:

    在代码中,使用实际值替换以下占位符:

    • EVENT HUBS NAMESPACE NAME
    • EVENT HUB NAME
    const { EventHubProducerClient } = require("@azure/event-hubs");
    const { DefaultAzureCredential } = require("@azure/identity");
    
    // Event hubs 
    const eventHubsResourceName = "EVENT HUBS NAMESPACE NAME";
    const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; 
    const eventHubName = "EVENT HUB NAME";
    
    // Azure Identity - passwordless authentication
    const credential = new DefaultAzureCredential();
    
    async function main() {
    
      // Create a producer client to send messages to the event hub.
      const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
    
      // Prepare a batch of three events.
      const batch = await producer.createBatch();
      batch.tryAdd({ body: "passwordless First event" });
      batch.tryAdd({ body: "passwordless Second event" });
      batch.tryAdd({ body: "passwordless Third event" });    
    
      // Send the batch to the event hub.
      await producer.sendBatch(batch);
    
      // Close the producer client.
      await producer.close();
    
      console.log("A batch of three events have been sent to the event hub");
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
  3. 若要运行应用程序,请使用以下命令:

    node send.js
    

    该命令将一批三个事件发送到事件中心。

    如果您使用无密码(Microsoft Entra ID 基于角色的访问控制(RBAC))身份验证,则可能需要使用分配到 Azure Event Hubs 数据所有者角色的帐户登录到 Azure。 使用 az login 命令。

  4. 在 Azure 门户中,验证事件中心是否收到了消息。 若要更新图表,请刷新页面。 可能需要在几秒钟后才会显示收到了消息。

    屏幕截图显示了“概述”页,可在其中验证事件中心是否收到消息。

    注释

    有关详细信息和完整的源代码,请参阅 GitHub sendEvents.js 页

接收事件

在本部分中,你将在 JavaScript 应用程序中使用 Azure Blob 存储检查点存储从事件中心接收事件。 该应用程序将在 Azure 存储 Blob 中定期针对收到的消息执行元数据检查点。 使用此方式可以很容易地在以后的某个时间从退出的位置继续接收消息。

使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:

  • 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
  • 不要将存储帐户用于任何其他用途。
  • 不要将容器用于任何其他用途。
  • 在部署的应用程序所在的同一区域中创建存储帐户。 如果应用程序位于本地,请尝试选择最近的区域。

在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。

  • 分层命名空间
  • Blob 软删除
  • 版本控制

创建 Azure 存储帐户和 Blob 容器

若要使用 Blob 容器创建 Azure 存储帐户,请执行以下作:

  1. 创建 Azure 存储帐户
  2. 在存储帐户中创建 Blob 容器
  3. 对 blob 容器进行身份验证

在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需要具备存储 Blob 数据参与者角色才能读写 Blob 数据。 若要为自己分配此角色,需要被分配 “用户访问管理员” 角色,或另一个包含 Microsoft.Authorization/roleAssignments/write 操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅了解 Azure RBAC 的范围

在此方案中,将权限分配给用户帐户(作用域为存储帐户)以遵循 最低特权原则。 这种做法仅为用户提供所需的最低权限,并创建更安全的生产环境。

以下示例将 存储 Blob 数据参与者 角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到存储帐户。

  2. 在存储帐户页上,从左侧菜单中选择 “访问控制”(IAM )。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。

    显示如何分配存储帐户角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,搜索“存储 Blob 数据参与者”。 选择匹配的结果,然后选择 “下一步”。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索你的 Microsoft Entra 用户名(通常是你的 user@___domain 电子邮件地址),然后在对话框的底部选择“选择”。

  8. 选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。

安装 npm 包以接收事件

对于接收端,需要额外安装两个包。 在本快速入门中,你将使用 Azure Blob 存储来保留检查点,以便程序不会读取已读取的事件。 它在 Blob 中按固定的时间间隔对收到的消息执行元数据检查点。 使用此方式可以很容易地在以后的某个时间从退出的位置继续接收消息。

运行以下命令:

npm install @azure/storage-blob
npm install @azure/eventhubs-checkpointstore-blob
npm install @azure/identity

编写代码以接收事件

  1. 打开文本编辑器,如 Visual Studio Code

  2. 创建名为 receive.js的文件。 将以下代码粘贴到其中:

    在代码中,使用实际值替换以下占位符:

    • EVENT HUBS NAMESPACE NAME
    • EVENT HUB NAME
    • STORAGE ACCOUNT NAME
    • STORAGE CONTAINER NAME
    const { DefaultAzureCredential } = require("@azure/identity");
    const { EventHubConsumerClient, earliestEventPosition  } = require("@azure/event-hubs");
    const { ContainerClient } = require("@azure/storage-blob");    
    const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
    
    // Event hubs 
    const eventHubsResourceName = "EVENT HUBS NAMESPACE NAME";
    const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; 
    const eventHubName = "EVENT HUB NAME";
    const consumerGroup = "$Default"; // name of the default consumer group
    
    // Azure Storage 
    const storageAccountName = "STORAGE ACCOUNT NAME";
    const storageContainerName = "STORAGE CONTAINER NAME";
    const baseUrl = `https://${storageAccountName}.blob.core.windows.net`;
    
    // Azure Identity - passwordless authentication
    const credential = new DefaultAzureCredential();
    
    async function main() {
    
      // Create a blob container client and a blob checkpoint store using the client.
      const containerClient = new ContainerClient(
        `${baseUrl}/${storageContainerName}`,
        credential
      );  
      const checkpointStore = new BlobCheckpointStore(containerClient);
    
      // Create a consumer client for the event hub by specifying the checkpoint store.
      const consumerClient = new EventHubConsumerClient(consumerGroup, fullyQualifiedNamespace, eventHubName, credential, checkpointStore);
    
      // Subscribe to the events, and specify handlers for processing the events and errors.
      const subscription = consumerClient.subscribe({
          processEvents: async (events, context) => {
            if (events.length === 0) {
              console.log(`No events received within wait time. Waiting for next interval`);
              return;
            }
    
            for (const event of events) {
              console.log(`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`);
            }
            // Update the checkpoint.
            await context.updateCheckpoint(events[events.length - 1]);
          },
    
          processError: async (err, context) => {
            console.log(`Error : ${err}`);
          }
        },
        { startPosition: earliestEventPosition }
      );
    
      // After 30 seconds, stop processing.
      await new Promise((resolve) => {
        setTimeout(async () => {
          await subscription.close();
          await consumerClient.close();
          resolve();
        }, 30000);
      });
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
  3. 若要运行此代码,请使用命令 node receive.js。 窗口显示有关收到的事件的消息。

    C:\Self Study\Event Hubs\JavaScript>node receive.js
    Received event: 'First event' from partition: '0' and consumer group: '$Default'
    Received event: 'Second event' from partition: '0' and consumer group: '$Default'
    Received event: 'Third event' from partition: '0' and consumer group: '$Default'
    

    注释

    有关完整的源代码(包括信息性注释)请参阅 receiveEventsUsingCheckpointStore.js

    接收方程序从事件中心的默认使用者组的所有分区接收事件。

清理资源

删除具有事件中心命名空间的资源组,或仅删除命名空间(如果要保留资源组)。