SQL Server 2025 (17.x) 预览版
本文介绍使用 SQL Server 2025(17.x) 预览版中引入 的更改事件流式处理功能 时从 SQL Server 流式传输到 Azure 事件中心的 CloudEvents 消息的 JSON 格式。
注释
更改事件流目前以 SQL Server 2025 预览版提供 。
概述
更改事件流发出的事件遵循 CloudEvents 规范,使其易于与事件驱动系统集成。 所有 CES CloudEvents 都包含 11 个属性(字段)。 CES 可配置为将 CloudEvents 序列化为 JSON(本机),或将 Avro 二进制文件序列化。 本文的以下部分详细介绍了消息格式,包括 CES CloudEvent 属性和序列化。
相关规范和资源
如果适用,本节中的说明取自 CloudEvent 规范,其中包括更多详细信息。
特性
specversion
:- 数据类型:字符串
- 必需的 CloudEvent 属性
- 事件使用的 CloudEvents 规范的版本。 这样就可以解释上下文。
type
- 数据类型:字符串
- 必需的 CloudEvent 属性
- 包含一个值,该值描述与发起事件相关的事件类型。 此属性通常用于路由、可观测性或策略强制实施。 此格式由生成者定义,可能包含类型版本等信息。 有关详细信息,请查看 CloudEvents 的版本控制。
source
- 数据类型:字符串
- 必需的 CloudEvent 属性
- 标识发生事件的上下文。 源 + ID 对于每个事件必须是唯一的。
id
- 数据类型:字符串
- 必需的 CloudEvent 属性
- 标识事件。 生成者必须确保源 + ID 对于每个非重复事件是唯一的。 如果重复事件重新发送(例如,由于网络错误),它可能具有相同的 ID。 使用者可能假定具有相同源和 ID 的事件是重复的。
logicalid
- 数据类型:字符串
- 扩展属性
- 拆分消息(由于事件中心 msg 大小限制)由共享逻辑 ID 标识。
time
- 数据类型:时间戳
- 可选 CloudEvent 属性
- 事件发生时间的时间戳。 如果无法确定发生时间,则 CloudEvents 生成者可能会将此属性设置为其他时间(例如当前时间)。 同一源的所有生成者必须在这方面保持一致 - 要么都使用发生的实际时间,要么都使用相同的算法来确定使用的值。
datacontenttype
- 数据类型:字符串
- 可选 CloudEvent 属性
- 数据类型值。 此属性使数据能够携带任何类型的内容,其中格式和编码可能与所选事件格式的不同。 例如,使用 JSON 信封格式呈现的事件可能会在数据中携带 XML 有效负载,使用者会通过此属性设置为“application/xml”来通知使用者。 为不同的
datacontenttype
值呈现数据内容的方式规则是在事件格式规范中定义的;例如,JSON 事件格式定义第 3.1 节中的关系。
operation
- 数据类型:字符串
- 扩展
- 表示发生的 SQL作的类型。
segmentindex
- 数据类型:整数
- 扩展属性
- 段索引,表示消息在逻辑消息区块中的位置。 段索引提供有关消息在逻辑消息片段序列中的位置的信息。 在此更改事件流式处理实现中,此字段始终存在。
finalsegment
- 数据类型:布尔值
- 扩展属性
- 指示此段是否为序列的最终段。 在此更改事件流式处理实现中,此字段始终存在。
data
- 数据类型:字符串
- 可选 CloudEvent 属性
- 特定于域的事件数据。 对于 CES,数据是可以分析为 JSON 的字符串。 此 JSON 描述数据是如何更改的。 数据属性的格式采用 数据属性格式。
例子
JSON 消息示例 - 插入
{
"specversion": "1.0",
"type": "com.microsoft.SQL.CES.DML.V1",
"source": "\/",
"id": "d43f09a6-d13b-4902-86d4-17bdb5edb872",
"logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C00000300017C:00000000000000000001",
"time": "2025-03-14T16:45:20.650Z",
"datacontenttype": "application\/json",
"operation": "INS",
"splitindex": 0,
"splittotalcnt": 0,
"data": "{\n \"eventsource\": {\n \"db\": \"db1\",\n \"schema\": \"dbo\",\n \"tbl\": \"Purchases\",\n \"cols\": [\n {\n \"name\": \"purchase_id\",\n \"type\": \"int\",\n \"index\": 0\n },\n {\n \"name\": \"customer_name\",\n \"type\": \"varchar(100)\",\n \"index\": 1\n },\n {\n \"name\": \"product_id\",\n \"type\": \"int\",\n \"index\": 2\n },\n {\n \"name\": \"product_name\",\n \"type\": \"varchar(100)\",\n \"index\": 3\n },\n {\n \"name\": \"price_per_item\",\n \"type\": \"int\",\n \"index\": 4\n },\n {\n \"name\": \"quantity\",\n \"type\": \"int\",\n \"index\": 5\n },\n {\n \"name\": \"purchase_date\",\n \"type\": \"datetime\",\n \"index\": 6\n },\n {\n \"name\": \"payment_method\",\n \"type\": \"varchar(50)\",\n \"index\": 7\n }\n ],\n \"pkkey\": [\n {\n \"columnname\": \"purchase_id\",\n \"value\": \"105\"\n }\n ]\n },\n \"eventrow\": {\n \"old\": \"{}\",\n \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"101\\\", \\\"product_name\\\": \\\"Game 2077\\\", \\\"price_per_item\\\": \\\"60\\\", \\\"quantity\\\": \\\"1\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n }\n}"
}
JSON 消息示例 - 已更新
{
"specversion": "1.0",
"type": "com.microsoft.SQL.CES.DML.V1",
"source": "\/",
"id": "c425575f-00bb-45cf-acec-c55fdc7d08cd",
"logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003500004:00000000000000000001",
"time": "2025-03-14T16:49:59.567Z",
"datacontenttype": "application\/json",
"operation": "UPD",
"splitindex": 0,
"splittotalcnt": 0,
"data": "{\n \"eventsource\": {\n \"db\": \"db1\",\n \"schema\": \"dbo\",\n \"tbl\": \"Purchases\",\n \"cols\": [\n {\n \"name\": \"purchase_id\",\n \"type\": \"int\",\n \"index\": 0\n },\n {\n \"name\": \"customer_name\",\n \"type\": \"varchar(100)\",\n \"index\": 1\n },\n {\n \"name\": \"product_id\",\n \"type\": \"int\",\n \"index\": 2\n },\n {\n \"name\": \"product_name\",\n \"type\": \"varchar(100)\",\n \"index\": 3\n },\n {\n \"name\": \"price_per_item\",\n \"type\": \"int\",\n \"index\": 4\n },\n {\n \"name\": \"quantity\",\n \"type\": \"int\",\n \"index\": 5\n },\n {\n \"name\": \"purchase_date\",\n \"type\": \"datetime\",\n \"index\": 6\n },\n {\n \"name\": \"payment_method\",\n \"type\": \"varchar(50)\",\n \"index\": 7\n }\n ],\n \"pkkey\": [\n {\n \"columnname\": \"purchase_id\",\n \"value\": \"105\"\n }\n ]\n },\n \"eventrow\": {\n \"old\": \"{}\",\n \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n }\n}"
}
JSON 消息示例 - 删除
{
"specversion": "1.0",
"type": "com.microsoft.SQL.CES.DML.V1",
"source": "\/",
"id": "24fa0c2c-c45d-4abf-9a8d-fba04c29fc86",
"logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003600019:00000000000000000001",
"time": "2025-03-14T16:51:39.613Z",
"datacontenttype": "application\/json",
"operation": "DEL",
"splitindex": 0,
"splittotalcnt": 0,
"data": "{\n \"eventsource\": {\n \"db\": \"db1\",\n \"schema\": \"dbo\",\n \"tbl\": \"Purchases\",\n \"cols\": [\n {\n \"name\": \"purchase_id\",\n \"type\": \"int\",\n \"index\": 0\n },\n {\n \"name\": \"customer_name\",\n \"type\": \"varchar(100)\",\n \"index\": 1\n },\n {\n \"name\": \"product_id\",\n \"type\": \"int\",\n \"index\": 2\n },\n {\n \"name\": \"product_name\",\n \"type\": \"varchar(100)\",\n \"index\": 3\n },\n {\n \"name\": \"price_per_item\",\n \"type\": \"int\",\n \"index\": 4\n },\n {\n \"name\": \"quantity\",\n \"type\": \"int\",\n \"index\": 5\n },\n {\n \"name\": \"purchase_date\",\n \"type\": \"datetime\",\n \"index\": 6\n },\n {\n \"name\": \"payment_method\",\n \"type\": \"varchar(50)\",\n \"index\": 7\n }\n ],\n \"pkkey\": [\n {\n \"columnname\": \"purchase_id\",\n \"value\": \"105\"\n }\n ]\n },\n \"eventrow\": {\n \"old\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\",\n \"current\": \"{}\"\n }\n}"
}
数据属性格式
数据是包装在字符串属性中的 JSON 对象,其中包含两个属性:
eventSource
eventRow
"data": "{ "eventsource": {<eventSource>}, "eventdata": {<eventData>}}"
以下各节将更详细地介绍这两个属性的详细信息:
eventsource
描述有关数据库和发生事件的表的元数据:
db
- 数据类型:字符串
- 说明:表所在的数据库的名称。
- 示例:
cessqldb001
schema
- 数据类型:字符串
- 说明:包含表的数据库架构。
- 示例:
dbo
tbl
- 数据类型:字符串
- 说明:发生事件的表。
- 示例:
Purchases
cols
- 数据类型:数组
- 说明:详细说明表中的列的数组。
- name (字符串):列的名称。
- type (string):列的数据类型(VARCHAR 或 INT)。
- index (整数):表中列的索引或位置。
pkkey
- 数据类型:数组
- 说明:表示用于标识特定行的主键列及其值。
- columnname (string):主键中使用的列的名称。
- value (string/int/etc.):主键中使用的列的值有助于唯一标识行。
eventrow
描述行级更改,并比较记录中字段的旧值和当前值。
-
old (对象包装在字符串中):表示事件前行中的值。
- 每个键值对包括:
-
<column_name>
:(字符串):列的名称。 -
<column_value>
:(string/int/etc.):该列的上一个值。
-
- 每个键值对包括:
-
current (对象包装在字符串中):表示事件后行中的更新值。
- 与旧对象类似,每个键值对的结构如下:
-
<column_name>
(字符串):列的名称。 -
<column_value>
(string/int/etc.):该列的新值或当前值。
-
- 与旧对象类似,每个键值对的结构如下:
CES CloudEvent JSON 架构
{
"type": "record",
"name": "ChangeEvent",
"fields": [
{
"name": "specversion",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "source",
"type": "string"
},
{
"name": "id",
"type": "string"
},
{
"name": "logicalid",
"type": "string"
},
{
"name": "time",
"type": "string"
},
{
"name": "datacontenttype",
"type": "string"
},
{
"name": "operation",
"type": "string"
},
{
"name": "segmentindex",
"type": "int"
},
{
"name": "finalsegment",
"type": "boolean"
},
{
"name": "data",
"type": "bytes"
}
]
}
CES 数据属性 JSON 架构
{
"name": "Data",
"type": "record",
"fields": [
{
"name": "eventsource",
"type": {
"name": "EventSource",
"type": "record",
"fields": [
{
"name": "db",
"type": "string"
},
{
"name": "schema",
"type": "string"
},
{
"name": "tbl",
"type": "string"
},
{
"name": "cols",
"type": {
"type": "array",
"items": {
"name": "Column",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "index",
"type": "int"
}
]
}
}
},
{
"name": "pkkey",
"type": {
"type": "array",
"items": {
"name": "PkKey",
"type": "record",
"fields": [
{
"name": "columnname",
"type": "string"
},
{
"name": "value",
"type": "string"
}
]
}
}
},
{
"name": "transaction",
"type": {
"name": "Transaction",
"type": "record",
"fields": [
{
"name": "commitlsn",
"type": "string"
},
{
"name": "beginlsn",
"type": "string"
},
{
"name": "sequencenumber",
"type": "int"
},
{
"name": "committime",
"type": "string"
}
]
}
}
]
}
},
{
"name": "eventrow",
"type": {
"name": "EventRow",
"type": "record",
"fields": [
{
"name": "old",
"type": "string"
},
{
"name": "current",
"type": "string"
}
]
}
}
]
}