Azure 数据资源管理器支持使用更改源从 Azure Cosmos DB for NoSql引入数据。 Cosmos DB 更改源数据连接是一个引入管道,用于侦听 Cosmos DB 更改源并将数据引入到数据资源管理器表中。 更改源侦听新文档和更新的文档,但不记录删除。 有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述。
每个数据连接都侦听特定的 Cosmos DB 容器,并将数据引入到指定的表中(多个连接可以在一个表中引入数据)。 该引入方法支持流式引入(如果已启用)和排队引入。
使用 Cosmos DB 更改源数据连接的两个主要方案:
本文介绍如何设置 Cosmos DB 更改源数据连接,以便使用系统托管标识将数据引入到 Azure 数据资源管理器中。 在开始之前,请查看注意事项。
使用以下步骤设置连接器:
步骤 1:选择 Azure 数据资源管理器表并配置其表映射
步骤 2:创建 Cosmos DB 数据连接
步骤 3:测试数据连接
先决条件
在创建数据连接之前,请创建一个表,你将在其中存储引入的数据并应用与源 Cosmos DB 容器中的架构匹配的映射。 如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。
下面显示了 Cosmos DB 容器中某个项的示例架构:
{
"id": "17313a67-362b-494f-b948-e2a8e95e237e",
"name": "Cousteau",
"_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
"_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
"_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
"_attachments": "attachments/",
"_ts": 1651131409
}
使用以下步骤创建表并应用表映射:
在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择要在其中创建表的数据库。
运行以下命令来创建一个名为 TestTable 的表。
.create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
运行以下命令来创建表映射。
该命令将 Cosmos DB JSON 文档中的自定义属性映射到 TestTable 表中的列,如下所示:
Cosmos DB 属性 |
表列 |
转换 |
id |
ID |
无 |
name |
名称 |
无 |
_ts |
_ts |
无 |
_ts |
_timestamp |
使用 DateTimeFromUnixSeconds 可将 _ts(UNIX 秒)转换为 _timestamp (datetime ) |
注意
建议使用以下时间戳列:
- _ts:使用此列将数据与 Cosmos DB 协调。
- _timestamp:使用此列在 Kusto 查询中运行高效的时间筛选器。 有关详细信息,请参阅查询最佳做法。
.create table TestTable ingestion json mapping "DocumentMapping"
```
[
{"column":"Id","path":"$.id"},
{"column":"Name","path":"$.name"},
{"column":"_ts","path":"$._ts"},
{"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
]
```
如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。
更新策略是在数据引入到表时转换数据的一种方式。 它们以 Kusto 查询语言编写,在引入管道上运行。 它们可用于转换从 Cosmos DB 更改源中引入的数据,例如在以下场景中:
- 你的文档包含数组,如果使用
mv-expand
运算符将它们转换为多行,则这些数组将更易于查询。
- 你想要筛选出文档。 例如,可以使用
where
运算符按类型筛选出文档。
- 你的复杂逻辑无法在表映射中表示。
有关如何创建和管理更新策略的信息,请参阅更新策略概述。
步骤 2:创建 Cosmos DB 数据连接
可使用以下方法创建数据连接器:
在Azure 门户中,转到群集概述页,然后选择“入门”选项卡。
在“数据引入”磁贴上,选择“创建数据连接”“Cosmos DB”。
在 Cosmos DB 的“创建数据连接”窗格中,使用下表中的信息填写表单:
字段 |
说明 |
数据库名称 |
选择要将数据引入到的 Azure 数据资源管理器数据库。 |
数据连接名称 |
指定数据连接的名称。 |
订阅 |
选择包含 Cosmos DB NoSQL 帐户的订阅。 |
Cosmos DB 帐户 |
选择要从中引入数据的 Cosmos DB 帐户。 |
SQL 数据库 |
选择要从中引入数据的 Cosmos DB 数据库。 |
SQL 容器 |
选择要从中引入数据的 Cosmos DB 容器。 |
表名称 |
指定要将数据引入到的 Azure 数据资源管理器表名称。 |
映射名称 |
(可选)指定要用于数据连接的映射名称。 |
(可选)在“高级设置”部分下执行以下操作:
指定“事件检索开始日期”。 这是连接器开始引入数据的时间。 如果未指定时间,连接器将从你创建数据连接的时间开始引入数据。 建议的日期格式是 ISO 8601 UTC 标准,按以下方式指定:yyyy-MM-ddTHH:mm:ss.fffffffZ
。
选择“用户分配”,然后选择标识。 默认情况下,连接使用系统分配的托管标识。 如有必要,可以使用用户分配的标识。
选择“创建”以创建数据连接。
使用以下示例 ARM 模板作为创建自己的数据连接模板的基础,然后在 Azure 门户中部署它。
若要配置 Cosmos DB 连接,请执行以下操作:
配置系统托管标识以用于 Cosmos DB 连接身份验证。
- 在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择用于数据连接的群集或数据库。
授予数据连接访问 Cosmos DB 帐户的权限。 向数据连接提供对 Cosmos DB 的访问权限将使其能够访问并检索数据库中的数据。 你将需要群集的主体 ID,该 ID 可在 Azure 门户中找到。 有关详细信息,请参阅为群集配置托管标识。
使用以下选项之一授予对 Cosmos DB 帐户的访问权限:
使用 Azure CLI 授予访问权限:运行以下 CLI 命令(请使用下表中的信息将占位符替换为适当的值):
az cosmosdb sql role assignment create --account-name <CosmosDbAccountName> --resource-group <CosmosDbResourceGroup> --role-definition-id 00000000-0000-0000-0000-000000000001 --principal-id <ClusterPrincipalId> --scope "/"
az role assignment create --role fbdf93bf-df7d-467e-a4d2-9458aa1360c8 --assignee <ClusterPrincipalId> --scope <CosmosDBAccountResourceId>
占位符 |
说明 |
<CosmosDBAccountName> |
Cosmos DB 帐户的名称。 |
<CosmosDBResourceGroup> |
包含 Cosmos DB 帐户的资源组的名称。 |
<CosmosDBAccountResourceId> |
Cosmos DB 帐户的 Azure 资源 ID(以 subscriptions/ 开头)。 |
<ClusterPrincipalId> |
分配给群集的托管标识的主体 ID。 可在 Azure 门户中找到群集的主体 ID。 有关详细信息,请参阅为群集配置托管标识。 |
使用 ARM 模板授予访问权限:在 Cosmos DB 帐户资源组中部署以下模板:
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"clusterPrincipalId": {
"type": "string",
"metadata": { "description": "The principle ID of your cluster." }
},
"cosmosDbAccount": {
"type": "string",
"metadata": { "description": "The name of your Cosmos DB account." }
},
"cosmosDbAccountResourceId": {
"type": "string",
"metadata": { "description": "The resource ID of your Cosmos DB account." }
}
},
"variables": {
"cosmosDataReader": "00000000-0000-0000-0000-000000000001",
"dataRoleDefinitionId": "[format('/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DocumentDB/databaseAccounts/{2}/sqlRoleDefinitions/{3}', subscription().subscriptionId, resourceGroup().name, parameters('cosmosDbAccount'), variables('cosmosDataReader'))]",
"roleAssignmentId": "[guid(parameters('cosmosDbAccountResourceId'), parameters('clusterPrincipalId'))]",
"rbacRoleDefinitionId": "[format('/subscriptions/{0}/providers/Microsoft.Authorization/roleDefinitions/{1}', subscription().subscriptionId, 'fbdf93bf-df7d-467e-a4d2-9458aa1360c8')]"
},
"resources": [
{
"type": "Microsoft.DocumentDB/databaseAccounts/sqlRoleAssignments",
"apiVersion": "2022-08-15",
"name": "[concat(parameters('cosmosDbAccount'), '/', guid(parameters('clusterPrincipalId'), parameters('cosmosDbAccount')))]",
"properties": {
"principalId": "[parameters('clusterPrincipalId')]",
"roleDefinitionId": "[variables('dataRoleDefinitionId')]",
"scope": "[resourceId('Microsoft.DocumentDB/databaseAccounts', parameters('cosmosDbAccount'))]"
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2022-04-01",
"name": "[variables('roleAssignmentId')]",
"scope": "[format('Microsoft.DocumentDb/databaseAccounts/{0}', parameters('cosmosDbAccount'))]",
"properties": {
"description": "Giving RBAC reader on Cosmos DB",
"principalId": "[parameters('clusterPrincipalId')]",
"principalType": "ServicePrincipal",
"roleDefinitionId": "[variables('rbacRoleDefinitionId')]"
}
}
]
}
部署以下 ARM 模板以创建 Cosmos DB 数据连接。 将占位符替换为适当的值。
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"kustoClusterName": {
"type": "string",
"metadata": { "description": "Kusto Cluster name" }
},
"kustoDbName": {
"type": "string",
"metadata": { "description": "Kusto Database name" }
},
"kustoConnectionName": {
"type": "string",
"metadata": { "description": "Kusto Database connection name" }
},
"kustoLocation": {
"type": "string",
"metadata": { "description": "Location (Azure Region) of the Kusto cluster" }
},
"kustoTable": {
"type": "string",
"metadata": { "description": "Kusto Table name where to ingest data" }
},
"kustoMappingRuleName": {
"type": "string",
"defaultValue": "",
"metadata": { "description": "Mapping name of the Kusto Table (if omitted, default mapping is applied)" }
},
"managedIdentityResourceId": {
"type": "string",
"metadata": { "description": "ARM resource ID of the managed identity (cluster resource ID for system or user identity)" }
},
"cosmosDbAccountResourceId": {
"type": "string",
"metadata": { "description": "ARM resource ID of Cosoms DB account" }
},
"cosmosDbDatabase": {
"type": "string",
"metadata": { "description": "Cosmos DB Database name" }
},
"cosmosDbContainer": {
"type": "string",
"metadata": { "description": "Cosmos DB container name" }
},
"retrievalStartDate": {
"type": "string",
"defaultValue": "",
"metadata": { "description": "Date-time at which to start the data retrieval; default: 'now' if not provided. Recommended format: yyyy-MM-ddTHH:mm:ss.fffffffZ" }
}
},
"variables": { },
"resources": [{
"type": "Microsoft.Kusto/Clusters/Databases/DataConnections",
"apiVersion": "2022-11-11",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDbName'), '/', parameters('kustoConnectionName'))]",
"___location": "[parameters('kustoLocation')]",
"kind": "CosmosDb",
"properties": {
"tableName": "[parameters('kustoTable')]",
"mappingRuleName": "[parameters('kustoMappingRuleName')]",
"managedIdentityResourceId": "[parameters('managedIdentityResourceId')]",
"cosmosDbAccountResourceId": "[parameters('cosmosDbAccountResourceId')]",
"cosmosDbDatabase": "[parameters('cosmosDbDatabase')]",
"cosmosDbContainer": "[parameters('cosmosDbContainer')]",
"retrievalStartDate": "[parameters('retrievalStartDate')]"
}
}]
}
步骤 3:测试数据连接
在 Cosmos DB 容器中,插入以下文档:
{
"name":"Cousteau"
}
在 Azure 数据资源管理器 Web UI 中,运行以下查询:
TestTable
结果集应如下图所示:
注意
Azure 数据资源管理器具有用于排队数据引入的聚合(批处理)策略,旨在优化引入过程。 默认批处理策略配置为在批满足以下条件之一时封装该批:最大延迟时间为 5 分钟、总大小为 1 GB 或 1000 个 blob。 因此,你可能会遇到延迟。 有关详细信息,请参阅批处理策略。 若要降低延迟,请将表配置为支持流式处理。 请参阅流式处理策略。
注意事项
以下注意事项适用于 Cosmos DB 更改源:
更改源不会公开“删除”事件。
Cosmos DB 更改源仅包括新文档和更新的文档。 如果需要了解有关已删除的文档的信息,可以配置源以使用软标记将 Cosmos DB 文档标记为已删除。 这会添加一个属性来更新用于指示文档是否已被删除的事件。 然后,可以在查询中使用 where
运算符来筛选出它们。
例如,如果将 deleted 属性映射到名为“IsDeleted”的表列,则可以使用以下查询筛选出已删除的文档:
TestTable
| where not(IsDeleted)
更改源仅公开文档的最新更新。
若要了解第二个注意事项的影响,请查看以下方案:
Cosmos DB 容器包含文档 A 和 B。下表显示了对名为 foo 的属性的更改:
文档 ID |
属性 foo |
事件 |
文档时间戳 (_ts) |
A |
红色 |
创建 |
10 |
B |
蓝色 |
创建 |
20 |
A |
橙色 |
更新 |
30 |
A |
粉色 |
更新 |
40 |
B |
紫罗兰色 |
更新 |
50 |
A |
胭脂红色 |
更新 |
50 |
B |
霓虹蓝色 |
更新 |
70 |
数据连接器定期轮询(通常每隔几秒钟轮询一次)更改源 API。 每个轮询都包含两次调用之间容器中发生的更改(但每个文档只有最新版的更改)。
为了说明此问题,请考虑带有时间戳 15、35、55 和 75 的一系列 API 调用,如下表所示:
API 调用时间戳 |
文档 ID |
属性 foo |
文档时间戳 (_ts) |
15 |
A |
红色 |
10 |
35 |
B |
蓝色 |
20 |
35 |
A |
橙色 |
30 |
55 |
B |
紫罗兰色 |
50 |
55 |
A |
胭脂红色 |
60 |
75 |
B |
霓虹蓝色 |
70 |
将 API 结果与 Cosmos DB 文档中所做的一系列更改进行比较时,你会发现它们不匹配。 文档 A 的更新事件(已在更改表中的时间戳 40 所在一行突出显示)未显示在 API 调用结果中。
为了了解未显示该事件的原因,我们将检查 API 调用在时间戳 35 和 55 之间对文档 A 所做的更改。 在这两次调用之间,文档 A 更改了两次,如下所示:
文档 ID |
属性 foo |
事件 |
文档时间戳 (_ts) |
A |
粉色 |
更新 |
40 |
A |
胭脂红色 |
更新 |
50 |
在时间戳 55 时进行 API 调用的时候,更改源 API 返回文档的最新版本。 在本例中,最新版本的文档 A 是在时间戳 50 时的更新,即属性 foo 从“粉红色”到“胭脂红色”的更新。
由于这种情况,数据连接器可能会错过一些中间文档更改。 例如,如果数据连接服务关闭几分钟,或者文档更改频率高于 API 轮询频率,则可能会错过某些事件。 但是,每个文档的最新状态均会被捕获。
不支持删除并重新创建 Cosmos DB 容器
Azure 数据资源管理器跟踪更改源的方式是检查它在源中的“位置”。 这是通过在容器的每个物理分区上使用延续令牌来完成的。 删除/重新创建容器时,延续令牌无效且不会重置。 在这种情况下,必须删除并重新创建数据连接。
估算成本
使用 Cosmos DB 数据连接对 Cosmos DB 容器的请求单位 (RU) 使用量影响有多大?
连接器在容器的每个物理分区上调用 Cosmos DB 更改源 API,每秒最多调用一次。 以下成本与这些调用相关:
成本 |
说明 |
固定成本 |
固定成本大约为每秒每个物理分区 2 RU。 |
可变成本 |
可变成本大约为用于写入文档的 RU 的 2%,不过,具体比例因场景而异。 例如,如果将 100 个文档写入 Cosmos DB 容器,则写入这些文档的成本为 1,000 RU。 使用连接器读取这些文档的相应成本大约为写入文档的成本的 2%,即大约为 20 RU。 |
相关内容