Kafka消息格式
更新時間 2024-02-22 11:03:21
最近更新時間: 2024-02-22 11:03:21
分享文章
本章節介紹同步到Kafka集群中的數據以Avro、JSON和JSON-C格式存儲。
同步到Kafka集群中的數據以Avro、JSON和JSON-C格式存儲。
avro格式
Avro格式的schema定義詳情請參見。在實時同步到Kafka集群后,您需要根據schema定義進行數據解析,數據解析樣例請參見。
JSON格式
MySQL到Kafka的JSON格式定義詳情參考表,Oracle到Kafka的JSON格式定義詳情參考表。
表 MySQL到Kafka的參數說明
| 參數名稱 | 說明 |
|---|---|
| mysqlType | 源端表字段名稱和類型。 |
| id | DRS內部定義的事件操作的序列號,單調遞增。 |
| es | 源庫產生這一條記錄的時間,13位Unix時間戳,單位為毫秒。 |
| ts | 寫入到目標kafka的時間,13位Unix時間戳,單位為毫秒。 |
| database | 數據庫名稱。 |
| table | 表名。 |
| type | 操作類型,比如DELETE,UPDATE,INSERT,DDL。 |
| isDdl | 是否是DDL操作。 |
| sql | DDL的SQL語句,在DML操作中,取值為""。 |
| sqlType | 源端表字段的jdbc類型。 |
| data | 最新的數據,為JSON數組,如果type參數是插入則表示最新插入的數據,如果是更新,則表示更新后的最新數據。 |
| old | 舊數據,如果type參數是更新,則表示更新前的數據;如果是刪除,則表示被刪除的數據;如果是插入,取值為null。 |
| pkNames | 主鍵名稱。 |
{
"mysqlType":{
"c11":"binary",
"c10":"varchar",
"c13":"text",
"c12":"varbinary",
"c14":"blob",
"c1":"varchar",
"c2":"varbinary",
"c3":"int",
"c4":"datetime",
"c5":"timestamp",
"c6":"char",
"c7":"float",
"c8":"double",
"c9":"decimal",
"id":"int"
},
"id":27677,
"es":1624614713000,
"ts":1625058726990,
"database":"test01",
"table":"test ",
"type":"UPDATE",
"isDdl":false,
"sql":"",
"sqlType":{
"c11":-2,
"c10":12,
"c13":-1,
"c12":-3,
"c14":2004,
"c1":12,
"c2":-3,
"c3":4,
"c4":94,
"c5":93,
"c6":1,
"c7":6,
"c8":8,
"c9":3,
"id":4
},
"data":[
{
"c11":"[]",
"c10":"cloud",
"c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103",
"c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
"c2":"[]",
"c3":"103",
"c4":"2021-06-25 17:51:53",
"c5":"1624614713.201",
"c6":"!@#$%90weurtg103",
"c7":"10357.0",
"c8":"1.2510357E7",
"c9":"9874510357",
"id":"104"
}
],
"old":[
{
"c11":"[]",
"c10":"cloud",
"c13":"asfiajhfiaf939-0239",
"c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]",
"c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104",
"c2":"[]",
"c3":"103",
"c4":"2021-06-25 17:51:53",
"c5":"1624614713.201",
"c6":"!@#$%90weurtg103",
"c7":"10357.0",
"c8":"1.2510357E7",
"c9":"9874510357",
"id":"103"
}
],
"pkNames":[
"id"
]
}
表 其他數據庫到Kafka的參數說明
| 參數名稱 | 說明 |
|---|---|
| columnType | 源端表字段名稱和數據類型。 說明 * 數據類型不帶長度、精度等。* dbType為Oracle時暫為空。 |
| dbType | 源庫類型。 不同引擎對應類型如下: Oracle:Oracle |
| schema | scheme名稱。 |
| opType | 操作類型,比如DELETE,UPDATE,INSERT,DDL。 |
| id | DRS內部定義的事件操作的序列號,單調遞增。 |
| es | 源庫不同引擎對應類型如下: Oracle:這一條記錄的commit時間,13位Unix時間戳,單位為毫秒。 |
| ts | 寫入到目標kafka的時間,13位Unix時間戳,單位為毫秒。 |
| database | 數據庫名稱,dbType為Oracle時暫時為空。 |
| table | 表名。 |
| type | 操作類型,比如DELETE,UPDATE,INSERT,DDL。 |
| isDdl | 是否是DDL操作。 |
| sql | DDL的SQL語句,在DML操作中,取值為""。 |
| sqlType | 源端表字段的jdbc類型。 |
| data | 最新的數據,為JSON數組,如果type參數是插入則表示最新插入的數據,如果是更新,則表示更新后的最新數據。 |
| old | 舊數據,如果type參數是更新,則表示更新前的數據;如果是刪除,則表示被刪除的數據;如果是插入,取值為null。 |
| pkNames | 主鍵名稱。 |
{
"columnType": {
"timestamp_column": "timestamp without time zone",
"tstzrange_column": "tstzrange",
"int4range_column": "int4range",
"char_column": "character",
"jsonb_column": "json",
"boolean_column": "boolean",
"bit_column": "bit",
"smallint_column": "smallint",
"bytea_column": "bytea"
},
"dbType": "GaussDB(for openGauss) Primary/Standby",
"schema": "schema01",
"opType": "UPDATE",
"id": 332,
"es": 1639626187000,
"ts": 1639629261915,
"database": "database01",
"table": "table01",
"type": "UPDATE",
"isDdl": false,
"sql": "",
"sqlType": {
"timestamp_column": 16,
"tstzrange_column": 46,
"int4range_column": 42,
"char_column": 9,
"jsonb_column": 22,
"boolean_column": 8,
"bit_column": 20,
"smallint_column": 2,
"bytea_column": 15
},
"data": [
{
"timestamp_column": "2021-12-16 12:31:49.344365",
"tstzrange_column": "("2010-01-01 14:30:00+08","2010-01-01 15:30:00+08")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{"key1": "value1", "key2": "value2"}",
"boolean_column": "false",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"old": [
{
"timestamp_column": "2014-07-02 06:14:00.742",
"tstzrange_column": "("2010-01-01 14:30:00+08","2010-01-01 15:30:00+08")",
"int4range_column": "[11,20)",
"char_column": "g",
"jsonb_column": "{"key1": "value1", "key2": "value2"}",
"boolean_column": "true",
"bit_column": "1",
"smallint_column": "12",
"bytea_column": "62797465615f64617461"
}
],
"pkNames": null
}
JSON-C格式
JSON-C格式與JSON格式類似,區別是對于刪除操作,JSON數據放在old上,JSON-C放在data上。對于timestamp類型數據轉換成yyyy-mm-dd hh:mm:ss的字符串。
JSON-C定義詳情參考表:
表 JSON-C參數說明
| 參數名稱 | 說明 |
|---|---|
| mysqlType | 源端表字段名稱和類型。 |
| id | DRS內部定義的事件操作的序列號,單調遞增。 |
| es | 源庫產生這一條記錄的時間,13位Unix時間戳,單位為毫秒。 |
| ts | 寫入到目標kafka的時間,13位Unix時間戳,單位為毫秒。 |
| database | 數據庫名稱(Oracle數據庫填寫schema)。 |
| table | 表名。 |
| type | 操作類型,比如DELETE,UPDATE,INSERT,DDL。 |
| isDdl | 是否是DDL操作。 |
| sql | DDL的SQL語句,在DML操作中,取值為""。 |
| sqlType | 源端表字段的jdbc類型。 |
| data | 最新的數據,為JSON數組,如果type參數是插入則表示最新插入的數據,如果是更新,則表示更新后的最新數據;如果是刪除,則表示被刪除的數據。 |
| old | 舊數據,如果type參數是更新,則表示更新前的數據;如果是插入,取值為null。 |
| pkNames | 主鍵名稱。 |
JSON格式數據中常見的轉義字符
表 轉義字符
| 字符 | 轉義字符 |
|---|---|
| < | \u003d |
| > | \u003e |
| & | \u0026 |
| = | \u003d |
| ' | \u0027 |