JSON 消息格式 - 更改事件流式处理

SQL Server 2025 (17.x) 预览版

本文介绍使用 SQL Server 2025(17.x) 预览版中引入 的更改事件流式处理功能 时从 SQL Server 流式传输到 Azure 事件中心的 CloudEvents 消息的 JSON 格式。

注释

更改事件流目前以 SQL Server 2025 预览版提供

概述

更改事件流发出的事件遵循 CloudEvents 规范,使其易于与事件驱动系统集成。 所有 CES CloudEvents 都包含 11 个属性(字段)。 CES 可配置为将 CloudEvents 序列化为 JSON(本机),或将 Avro 二进制文件序列化。 本文的以下部分详细介绍了消息格式,包括 CES CloudEvent 属性和序列化。

如果适用,本节中的说明取自 CloudEvent 规范,其中包括更多详细信息。

特性

  • specversion:

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 事件使用的 CloudEvents 规范的版本。 这样就可以解释上下文。
  • type

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 包含一个值,该值描述与发起事件相关的事件类型。 此属性通常用于路由、可观测性或策略强制实施。 此格式由生成者定义,可能包含类型版本等信息。 有关详细信息,请查看 CloudEvents 的版本控制
  • source

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 标识发生事件的上下文。 源 + ID 对于每个事件必须是唯一的。
  • id

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 标识事件。 生成者必须确保源 + ID 对于每个非重复事件是唯一的。 如果重复事件重新发送(例如,由于网络错误),它可能具有相同的 ID。 使用者可能假定具有相同源和 ID 的事件是重复的。
  • logicalid

    • 数据类型:字符串
    • 扩展属性
    • 拆分消息(由于事件中心 msg 大小限制)由共享逻辑 ID 标识。
  • time

    • 数据类型:时间戳
    • 可选 CloudEvent 属性
    • 事件发生时间的时间戳。 如果无法确定发生时间,则 CloudEvents 生成者可能会将此属性设置为其他时间(例如当前时间)。 同一源的所有生成者必须在这方面保持一致 - 要么都使用发生的实际时间,要么都使用相同的算法来确定使用的值。
  • datacontenttype

    • 数据类型:字符串
    • 可选 CloudEvent 属性
    • 数据类型值。 此属性使数据能够携带任何类型的内容,其中格式和编码可能与所选事件格式的不同。 例如,使用 JSON 信封格式呈现的事件可能会在数据中携带 XML 有效负载,使用者会通过此属性设置为“application/xml”来通知使用者。 为不同的 datacontenttype 值呈现数据内容的方式规则是在事件格式规范中定义的;例如,JSON 事件格式定义第 3.1 节中的关系。
  • operation

    • 数据类型:字符串
    • 扩展
    • 表示发生的 SQL作的类型。
  • segmentindex

    • 数据类型:整数
    • 扩展属性
    • 段索引,表示消息在逻辑消息区块中的位置。 段索引提供有关消息在逻辑消息片段序列中的位置的信息。 在此更改事件流式处理实现中,此字段始终存在。
  • finalsegment

    • 数据类型:布尔值
    • 扩展属性
    • 指示此段是否为序列的最终段。 在此更改事件流式处理实现中,此字段始终存在。
  • data

    • 数据类型:字符串
    • 可选 CloudEvent 属性
    • 特定于域的事件数据。 对于 CES,数据是可以分析为 JSON 的字符串。 此 JSON 描述数据是如何更改的。 数据属性的格式采用 数据属性格式

例子

JSON 消息示例 - 插入

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "d43f09a6-d13b-4902-86d4-17bdb5edb872",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C00000300017C:00000000000000000001",
  "time": "2025-03-14T16:45:20.650Z",
  "datacontenttype": "application\/json",
  "operation": "INS",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"101\\\", \\\"product_name\\\": \\\"Game 2077\\\", \\\"price_per_item\\\": \\\"60\\\", \\\"quantity\\\": \\\"1\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON 消息示例 - 已更新

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "c425575f-00bb-45cf-acec-c55fdc7d08cd",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003500004:00000000000000000001",
  "time": "2025-03-14T16:49:59.567Z",
  "datacontenttype": "application\/json",
  "operation": "UPD",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON 消息示例 - 删除

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "24fa0c2c-c45d-4abf-9a8d-fba04c29fc86",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003600019:00000000000000000001",
  "time": "2025-03-14T16:51:39.613Z",
  "datacontenttype": "application\/json",
  "operation": "DEL",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\",\n    \"current\": \"{}\"\n  }\n}"
}

数据属性格式

数据是包装在字符串属性中的 JSON 对象,其中包含两个属性:

  • eventSource
  • eventRow
"data": "{ "eventsource": {<eventSource>}, "eventdata": {<eventData>}}"

以下各节将更详细地介绍这两个属性的详细信息:

eventsource

描述有关数据库和发生事件的表的元数据:

  • db

    • 数据类型:字符串
    • 说明:表所在的数据库的名称。
    • 示例:cessqldb001
  • schema

    • 数据类型:字符串
    • 说明:包含表的数据库架构。
    • 示例:dbo
  • tbl

    • 数据类型:字符串
    • 说明:发生事件的表。
    • 示例:Purchases
  • cols

    • 数据类型:数组
    • 说明:详细说明表中的列的数组。
      • name (字符串):列的名称。
      • type (string):列的数据类型(VARCHARINT)。
      • index (整数):表中列的索引或位置。
  • pkkey

    • 数据类型:数组
    • 说明:表示用于标识特定行的主键列及其值。
      • columnname (string):主键中使用的列的名称。
      • value (string/int/etc.):主键中使用的列的值有助于唯一标识行。

eventrow

描述行级更改,并比较记录中字段的旧值和当前值。

  • old (对象包装在字符串中):表示事件前行中的值。
    • 每个键值对包括:
      • <column_name>:(字符串):列的名称。
      • <column_value>:(string/int/etc.):该列的上一个值。
  • current (对象包装在字符串中):表示事件后行中的更新值。
    • 与旧对象类似,每个键值对的结构如下:
      • <column_name> (字符串):列的名称。
      • <column_value> (string/int/etc.):该列的新值或当前值。

CES CloudEvent JSON 架构

{
  "type": "record",
  "name": "ChangeEvent",
  "fields": [
    {
      "name": "specversion",
      "type": "string"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "logicalid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    },
    {
      "name": "datacontenttype",
      "type": "string"
    },
    {
      "name": "operation",
      "type": "string"
    },
    {
      "name": "segmentindex",
      "type": "int"
    },
    {
      "name": "finalsegment",
      "type": "boolean"
    },
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}

CES 数据属性 JSON 架构

{
  "name": "Data",
  "type": "record",
  "fields": [
    {
      "name": "eventsource",
      "type": {
        "name": "EventSource",
        "type": "record",
        "fields": [
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "tbl",
            "type": "string"
          },
          {
            "name": "cols",
            "type": {
              "type": "array",
              "items": {
                "name": "Column",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "index",
                    "type": "int"
                  }
                ]
              }
            }
          },
          {
            "name": "pkkey",
            "type": {
              "type": "array",
              "items": {
                "name": "PkKey",
                "type": "record",
                "fields": [
                  {
                    "name": "columnname",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "string"
                  }
                ]
              }
            }
          },
          {
            "name": "transaction",
            "type": {
              "name": "Transaction",
              "type": "record",
              "fields": [
                {
                  "name": "commitlsn",
                  "type": "string"
                },
                {
                  "name": "beginlsn",
                  "type": "string"
                },
                {
                  "name": "sequencenumber",
                  "type": "int"
                },
                {
                  "name": "committime",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "eventrow",
      "type": {
        "name": "EventRow",
        "type": "record",
        "fields": [
          {
            "name": "old",
            "type": "string"
          },
          {
            "name": "current",
            "type": "string"
          }
        ]
      }
    }
  ]
}