次の方法で共有


JSON メッセージ形式 - イベント ストリーミングの変更

SQL Server 2025 (17.x) プレビュー

この記事では、SQL Server 2025 (17.x) プレビューで導入された 変更イベント ストリーミング (CES) 機能を使用するときに、SQL Server から Azure Event Hubs にストリーミングされる CloudEvents メッセージの JSON 形式について説明します。

変更イベント ストリーミングは現在、SQL Server 2025 の プレビュー段階 です。

概要

変更イベント ストリーミングによって生成されるイベントは CloudEvents 仕様に従い、イベント ドリブン システムと簡単に統合できます。 すべての CES CloudEvent には、11 個の属性 (フィールド) が含まれています。 CES は、CloudEvents を JSON (ネイティブ) または Avro バイナリとしてシリアル化するように構成できます。 この記事の以降のセクションでは、CES CloudEvent 属性やシリアル化など、メッセージ形式について詳しく説明します。

該当する場合、このセクションの説明は、追加の詳細を含む CloudEvent 仕様から取得されます。

属性

  • specversion:

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • イベントが使用する CloudEvents 仕様のバージョン。 これにより、コンテキストの解釈が可能になります。
  • type

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • 発生元の発生に関連するイベントの種類を表す値を格納します。 多くの場合、この属性はルーティング、可観測性、またはポリシーの適用に使用されます。 この形式はプロデューサーによって定義され、型のバージョンなどの情報が含まれる場合があります。 詳細については、「 CloudEvents のバージョン管理」を参照してください。
  • source

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • イベントが発生したコンテキストを識別します。 ソース + ID は、イベントごとに一意である必要があります。
  • id

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • イベントを識別します。 プロデューサーは、個別のイベントごとにソース + ID が一意であることを確認する必要があります。 重複するイベントが再送信された場合 (たとえば、ネットワーク エラーが原因)、同じ ID を持つことができます。 コンシューマーは、ソースと ID が同一のイベントが重複していると見なす場合があります。
  • logicalid

    • データ型: 文字列
    • 拡張属性
    • 分割メッセージ (Event Hubs メッセージ のサイズ制限のため) は、共有論理 ID によって識別されます。
  • time

    • データ型: タイムスタンプ
    • オプションの CloudEvent 属性
    • 出来事が発生した日時のタイムスタンプ。 発生時刻を特定できない場合、この属性は CloudEvents プロデューサーによって他の時刻 (現在の時刻など) に設定される可能性があります。 同じソースのすべてのプロデューサーは、この点で一貫している必要があります。すべて実際の発生時刻を使用するか、すべて同じアルゴリズムを使用して使用される値を決定します。
  • datacontenttype

    • データ型: 文字列
    • オプションの CloudEvent 属性
    • データ値のコンテンツ タイプ。 この属性を使用すると、データは任意の種類のコンテンツを保持できます。形式とエンコードは、選択したイベント形式とは異なる場合があります。 たとえば、JSON エンベロープ形式を使用してレンダリングされたイベントでは、データに XML ペイロードが含まれる可能性があり、コンシューマーには、この属性が "application/xml" に設定されたことが通知されます。 さまざまな datacontenttype 値に対するデータ コンテンツのレンダリング方法に関する規則は、イベント形式の仕様で定義されます。たとえば、JSON イベント形式では、セクション 3.1 のリレーションシップが定義されます。
  • operation

    • データ型: 文字列
    • 拡張
    • 発生した SQL 操作の種類を表します。
  • segmentindex

    • データ型: 整数
    • 拡張属性
    • セグメント インデックス。論理メッセージ チャンク内のメッセージの位置を示します。 セグメント インデックスは、メッセージが論理メッセージ フラグメントのシーケンス内のどこに存在するかについての情報を提供します。 この変更イベント ストリーミングの実装では、このフィールドは常に存在します。
  • finalsegment

    • データ型: Boolean
    • 拡張属性
    • このセグメントがシーケンスの最後のセグメントであるかどうかを示します。 この変更イベント ストリーミングの実装では、このフィールドは常に存在します。
  • data

    • データ型: 文字列
    • オプションの CloudEvent 属性
    • ドメイン固有のイベント データ。 CES の場合、データは JSON として解析できる文字列です。 この JSON では、データがどのように変更されたかについて説明します。 データ属性の形式はデータ 属性の形式です

例示

JSON メッセージの例 - 挿入

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "d43f09a6-d13b-4902-86d4-17bdb5edb872",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C00000300017C:00000000000000000001",
  "time": "2025-03-14T16:45:20.650Z",
  "datacontenttype": "application\/json",
  "operation": "INS",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"101\\\", \\\"product_name\\\": \\\"Game 2077\\\", \\\"price_per_item\\\": \\\"60\\\", \\\"quantity\\\": \\\"1\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON メッセージの例 – 更新

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "c425575f-00bb-45cf-acec-c55fdc7d08cd",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003500004:00000000000000000001",
  "time": "2025-03-14T16:49:59.567Z",
  "datacontenttype": "application\/json",
  "operation": "UPD",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON メッセージの例 - 削除

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "24fa0c2c-c45d-4abf-9a8d-fba04c29fc86",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003600019:00000000000000000001",
  "time": "2025-03-14T16:51:39.613Z",
  "datacontenttype": "application\/json",
  "operation": "DEL",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\",\n    \"current\": \"{}\"\n  }\n}"
}

データ属性の形式

データは、2 つの属性を含む文字列属性でラップされた JSON オブジェクトです。

  • eventSource
  • eventRow
"data": "{ "eventsource": {<eventSource>}, "eventdata": {<eventData>}}"

これら 2 つの属性の詳細については、次のセクションで詳しく説明します。

eventsource

イベントが発生したデータベースとテーブルに関するメタデータについて説明します。

  • db

    • データ型: 文字列
    • 説明: テーブルが配置されているデータベースの名前。
    • 例: cessqldb001
  • schema

    • データ型: 文字列
    • 説明: テーブルを含むデータベース スキーマ。
    • 例: dbo
  • tbl

    • データ型: 文字列
    • 説明: イベントが発生したテーブル。
    • 例: Purchases
  • cols

    • データ型: 配列
    • 説明: テーブル内の列の詳細を示す配列。
      • name (string): 列の名前。
      • type (string): 列のデータ型 (VARCHAR または INT)。
      • index (整数): テーブル内の列のインデックスまたは位置。
  • pkkey

    • データ型: 配列
    • 説明: 特定の行を識別するための主キー列とその値を表します。
      • columnname (string): 主キーで使用される列の名前。
      • value (string/int/etc): 主キーで使用される列の値は、行を一意に識別するのに役立ちます。

eventrow

行レベルの変更について説明し、レコード内のフィールドの古い値と現在の値を比較します。

  • old (文字列でラップされたオブジェクト): イベントの前の行の値を表します。
    • 各キーと値のペアは、次の要素で構成されます。
      • <column_name>: (文字列): 列の名前。
      • <column_value>: (string/int/etc):その列の前の値。
  • current (文字列でラップされたオブジェクト): イベントの後の行の更新された値を表します。
    • 古いオブジェクトと同様に、各キーと値のペアは次のように構造化されています。
      • <column_name> (文字列): 列の名前。
      • <column_value> (string/int/etc): その列の新しい値または現在の値。

CES CloudEvent JSON スキーマ

{
  "type": "record",
  "name": "ChangeEvent",
  "fields": [
    {
      "name": "specversion",
      "type": "string"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "logicalid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    },
    {
      "name": "datacontenttype",
      "type": "string"
    },
    {
      "name": "operation",
      "type": "string"
    },
    {
      "name": "segmentindex",
      "type": "int"
    },
    {
      "name": "finalsegment",
      "type": "boolean"
    },
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}

CES データ属性の JSON スキーマ

{
  "name": "Data",
  "type": "record",
  "fields": [
    {
      "name": "eventsource",
      "type": {
        "name": "EventSource",
        "type": "record",
        "fields": [
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "tbl",
            "type": "string"
          },
          {
            "name": "cols",
            "type": {
              "type": "array",
              "items": {
                "name": "Column",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "index",
                    "type": "int"
                  }
                ]
              }
            }
          },
          {
            "name": "pkkey",
            "type": {
              "type": "array",
              "items": {
                "name": "PkKey",
                "type": "record",
                "fields": [
                  {
                    "name": "columnname",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "string"
                  }
                ]
              }
            }
          },
          {
            "name": "transaction",
            "type": {
              "name": "Transaction",
              "type": "record",
              "fields": [
                {
                  "name": "commitlsn",
                  "type": "string"
                },
                {
                  "name": "beginlsn",
                  "type": "string"
                },
                {
                  "name": "sequencenumber",
                  "type": "int"
                },
                {
                  "name": "committime",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "eventrow",
      "type": {
        "name": "EventRow",
        "type": "record",
        "fields": [
          {
            "name": "old",
            "type": "string"
          },
          {
            "name": "current",
            "type": "string"
          }
        ]
      }
    }
  ]
}