Real-Time Intelligence 教程中的数字孪生生成器(预览版)第 2 部分:获取和处理流数据

在本教程的这一部分,你将设置另一种类型的示例数据:示例总线数据的实时数据流,其中包含有关总线位置的时序信息。 将示例数据流式传输到 EventHouse,对数据进行一些转换,然后创建一个快捷方式,以便将 EventHouse 中的数据转入在上一部分创建的示例数据湖屋中。 数字孪生生成器需要数据位于湖仓(Lakehouse)中。

创建活动屋

  1. 浏览到要在其中创建教程资源的工作区。 必须在同一工作区中创建所有资源。
  2. 选择 + 新建项目
  3. “按项类型筛选 ”搜索框中,输入 Eventhouse
  4. 选择 Eventhouse 项目。
  5. 输入 Tutorial 作为 eventhouse 名称。 同时创建了一个具有相同名称的 KQL 数据库。
  6. 选择 创建。 预配完成后,将显示 eventhouse System 概述 页。

创建事件流

在本部分中,将创建一个事件流,用于将示例总线流数据发送到 eventhouse。

添加源

按照以下步骤创建 eventstream,并将 Buses 示例数据添加为源。

  1. 在 eventhouse 中的 KQL 数据库 窗格中,选择新的 教程 数据库。

  2. 在菜单功能区中,选择“ 获取数据 ”,然后选择“ 事件流 > 新建事件流”。

    获取 Tutorial 数据库的新事件流的屏幕截图。

  3. 请将您的事件流命名为 BusEventstream。 创建完事件流后,会打开它。

  4. 选择使用示例数据

    为事件流选择示例数据的屏幕截图。

  5. “添加源 ”页中,输入源名称的 BusDataSource 。 在 “示例数据”下,选择 “公交车”。 选择 并添加

    选择总线示例数据的屏幕截图。

    新事件流(eventstream)准备就绪后,它会在编辑画布中打开。

转换数据

在本部分中,你将向传入的示例数据添加一个转换。 此步骤将字符串字段 ScheduleTimeTimestamp 强制转换为 DateTime 类型,并将 Timestamp 重命名为 ArrivalTime 以使更加清晰。 时间戳字段需要转化为 DateTime 格式,以便数字孪生构建器(预览版)可以将它们用作时序数据。

请按照以下步骤来添加数据转换。

  1. 选择 “转换事件或添加目标” 磁贴上的向下箭头,然后选择 “管理字段” 预定义操作。 磁贴重命名为 ManageFields

    选择“管理字段”操作的屏幕截图。

  2. MangeFields 磁贴上选择编辑图标(形状为铅笔),这将打开 “管理字段 ”窗格。

  3. 选择 “添加所有字段”。 该操作可确保源数据中的所有字段在转换过程中存在。

  4. 选择 “时间戳 ”字段。 将 “更改类型” 切换为 “是”。 对于 转换类型,请从下拉列表中选择 DateTime 。 对于 Name,请输入 ActualTime 的新名称。

    更改“时间戳”字段的屏幕截图。

  5. 如果不保存,请选择 “ScheduleTime ”字段。 将 “更改类型” 切换为 “是”。 对于 转换类型,请从下拉列表中选择 DateTime 。 将名称保留为 ScheduleTime

    现在,选择“ 保存”。

  6. 管理字段 ”窗格将关闭。 ManageFields 磁贴在连接到目标之前将继续显示错误。

添加目标

  1. 在菜单功能区中,依次选择“ 添加目标”、“ Eventhouse”。

    选择 Eventhouse 目的地的屏幕截图。

  2. 在“Eventhouse”窗格中输入以下信息

    领域 价值
    数据引入模式 引入前的事件处理
    目标名称 TutorialDestination
    工作空间 选择在其中创建了资源的工作区。
    Eventhouse 教程
    KQL 数据库 教程
    KQL 目标表 新建 - 输入 bus_data_raw 作为表名称
    输入数据格式 Json
  3. 确保已选中 添加数据源后启用导入 框。

  4. 选择“保存”

  5. 在创作画布中,选择 ManageFields 磁贴,然后将箭头拖动到 TutorialDestination 磁贴以连接它们。 此操作解决了流程中的所有错误信息。

  6. 在菜单功能区中,选择“发布”。 事件流现在开始将示例流数据发送到事件库。

  7. 几分钟后,事件流视图中的 TutorialDestination 卡片在 “数据预览 ”选项卡中显示示例数据。等待数据到达时,可能需要多次刷新预览版。

    预览数据的屏幕截图。

  8. 验证数据表是否在您的 eventhouse 中处于活动状态。 转到 "教程" KQL 数据库并刷新视图。 它现在包含一个名为 bus_data_raw 的表,其中包含数据。

    包含数据的bus_data_raw表的屏幕截图。

使用更新策略转换数据

现在,总线流数据位于 KQL 数据库中,可以使用函数和 Kusto 更新策略 进一步转换数据。 在本部分中执行的转换是为数字孪生生成器(预览版)准备数据,包括以下操作:

  • 将 JSON 字段 Properties 拆分为每个包含的数据项的单独列, BusStatus 以及 TimeToNextStation。 数字孪生生成器没有 JSON 分析功能,因此需要在数据进入数字孪生生成器之前分隔这些值。
  • 添加列 StopCode,这是表示每个公交车站的唯一键。 此步骤的目的是完成示例数据集以支持本教程方案。 来自不同数据源的可联接实体必须包含数字孪生生成器可用于将它们链接在一起的公共列。因此,此步骤会添加一组模拟整型值,与静态公交车站数据集中的字段Stop_Code相匹配。 在现实世界中,相关的数据集已包含某种共同点。
  • 创建一个名为 bus_data_processed 的新表,其中包含转换后的总线数据。
  • 为新表启用 OneLake 可用性,以便可以使用快捷方式访问 Tutorial lakehouse 中的数据。

若要运行转换查询,请执行以下步骤。

  1. 选择 Eventhouse 中的 Tutorial KQL 数据库。 在菜单功能区中,选择 包含代码的查询,这将打开 KQL 查询编辑器。

    数据库上带有代码按钮的查询的屏幕截图。

  2. 将以下代码复制并粘贴到查询编辑器中。 按顺序运行每个代码块。

    // Set columns
    .create-or-alter function extractBusData ()
    {
        bus_data_raw
        | extend BusState = tostring(todynamic(Properties).BusState)
            , TimeToNextStation = tostring(todynamic(Properties).TimeToNextStation)
            , StopCode = toint(10000 + abs(((toint(BusLine) * 100) + toint(StationNumber)) % 750))
        | project-away Properties
    }
    
    // Create table
    .create table bus_data_processed (ActualTime:datetime, TripId:string, BusLine:string, StationNumber:string, ScheduleTime:datetime, BusState:string, TimeToNextStation:string, StopCode:int)
    
    //Load data into table
    .alter table bus_data_processed policy update
    ```
    [{
        "IsEnabled": true,
        "Source": "bus_data_raw",
        "Query": "extractBusData",
        "IsTransactional": false,
        "PropagateIngestionProperties": true
    }]
    ```
    
    // Enable OneLake availability
    .alter-merge table bus_data_processed policy mirroring dataformat=parquet with (IsEnabled=true, TargetLatencyInMinutes=5)
    

    小窍门

    还可以通过 UI 为新表启用 OneLake 可用性,而不是使用代码。 选择表并切换 OneLake 可用性

    在 UI 中启用 OneLake 可用性的屏幕截图。

    使用 UI 选项时,默认延迟为 15 分钟到几小时,具体取决于数据量。 若要将延迟降低到 5 分钟,请使用 .alter-merge 表 命令,如前面的代码块所示。

  3. (可选)将查询选项卡保存为 总线数据处理 ,以便稍后可以识别它。

  4. 在名为 bus_data_processed 的数据库中创建了一个新表。 等待一段时间后,它开始填充已处理的总线数据。

    bus_data_processed表的屏幕截图,表中包含数据。

创建湖仓快捷方式

最后,创建一个快捷方式,暴露 Tutorial lakehouse 中处理过的总线数据,该 lakehouse 包含供数字孪生生成器(预览版)使用的示例数据。 此步骤是必要的,因为数字孪生生成器要求其数据源为 Lakehouse。

  1. 转到你的 Tutorial lakehouse(你之前在第一部分中创建的,上传上下文数据)。 在菜单功能区中,选择“ 获取数据>新建”快捷方式

    “新建快捷方式”按钮的屏幕截图。

  2. 在“内部源”下,选择“Microsoft OneLake”。 然后,选择 Tutorial KQL 数据库。

  3. 展开 列表并选中 bus_data_processed旁边的框。 选择“下一步”。

  4. 查看快捷方式详细信息,然后选择“ 创建”。

    创建快捷方式的屏幕截图。

  5. bus_data_processed表现已在 lakehouse 中提供。 验证它是否包含数据(这可能需要几分钟时间)。

    lakehouse 中 bus_data_processed 的屏幕截图。

接下来,将此 Lakehouse 数据用作源,在数字孪生生成器中构建本体。

后续步骤