云服务器内容精选

  • SASL_PLAINTEXT认证 使用SASL机制连接Kafka,需要设置SASL相关配置。 图2 SASL_PLAINTEXT 表1 参数信息 参数 描述 SASL机制 用于客户端连接的SASL机制,支持以下四项,Kafka server默认是 GSSAPI 机制,更多说明可参考SASL机制。 GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512 令牌委托 是否为委托令牌鉴权,SASL机制选择“SCRAM-SHA-256”或者“SCRAM-SHA-512”时可见。 用户名 登录使用的用户名。 密码 登录使用的密码
  • SSL认证 使用SSL加密方式连接Kafka,需要设置SSL相关配置。 图3 SSL 表2 参数信息 参数 描述 Truststore证书 后缀名为jks的SSL证书,证书文件生成可参考SSL证书。 Truststore证书密码 证书对应的秘钥。 主机名端点识别算法 指定通过服务端证书验证服务端主机名的端点识别算法,选填,不填表示禁用主机名验证。 SSL双向认证 是否开启SSL双向认证。 Keystore证书 SSL双向认证开启可见,需要上传后缀名为jks的双向认证证书。 Keystore证书密码 SSL双向认证开启可见,SSL双向认证证书对应的秘钥。 Keystore私钥密码 选填,Keystore证书中私钥的密码。
  • JSON-C格式 JSON-C格式与JSON格式类似,区别是对于删除操作,JSON数据放在old上,JSON-C放在data上。对于timestamp类型数据转换成yyyy-mm-dd hh:mm:ss的字符串。 JSON-C定义详情参考表5: 表5 JSON-C参数说明 参数名称 说明 mysqlType 源端表字段名称和类型。 id DRS内部定义的事件操作的序列号,单调递增。 es 源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 ts 写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 database 数据库名称(Oracle数据库填写schema)。 table 表名。 type 操作类型,比如DELETE,UPDATE,INSERT,DDL。 isDdl 是否是DDL操作。 sql DDL的SQL语句,在DML操作中,取值为""。 sqlType 源端表字段的jdbc类型。 data 最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据;如果是删除,则表示被删除的数据。 old 旧数据,如果type参数是更新,则表示更新前的数据;如果是插入,取值为null。 pkNames 主键名称。
  • JSON格式 MySQL、GaussDB(MySQL)到Kafka的JSON格式定义详情参考表2,DDS到Kafka的JSON格式定义详情参考表3,Oracle、PostgreSQL、GaussDB、Microsoft SQL Server到Kafka的JSON格式定义详情参考表4。 表2 MySQL到Kafka的参数说明 参数名称 说明 mysqlType 源端表字段名称和类型。 id DRS内部定义的事件操作的序列号,单调递增。 es 源库产生这一条记录的时间,13位Unix时间戳,单位为毫秒。 ts 写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 database 数据库名称。 table 表名。 type 操作类型,比如DELETE,UPDATE,INSERT,DDL,全量同步为INIT和INIT_DDL。 isDdl 是否是DDL操作。 sql DDL的SQL语句,在DML操作中,取值为""。 sqlType 源端表字段的jdbc类型。 data 最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 old 旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 pkNames 主键名称。 { "mysqlType":{ "c11":"binary", "c10":"varchar", "c13":"text", "c12":"varbinary", "c14":"blob", "c1":"varchar", "c2":"varbinary", "c3":"int", "c4":"datetime", "c5":"timestamp", "c6":"char", "c7":"float", "c8":"double", "c9":"decimal", "id":"int" }, "id":27677, "es":1624614713000, "ts":1625058726990, "database":"test01", "table":"test ", "type":"UPDATE", "isDdl":false, "sql":"", "sqlType":{ "c11":-2, "c10":12, "c13":-1, "c12":-3, "c14":2004, "c1":12, "c2":-3, "c3":4, "c4":94, "c5":93, "c6":1, "c7":6, "c8":8, "c9":3, "id":4 }, "data":[ { "c11":"[]", "c10":"华为云huaweicloud", "c13":"asfiajhfiaf939-0239uoituqorjoqirfoidjfqrniowejoiwqjroqwjrowqjojoiqgoiegnkjgoi23roiugouofdug9u90weurtg103", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"104" } ], "old":[ { "c11":"[]", "c10":"华为云huaweicloud", "c13":"asfiajhfiaf939-0239", "c12":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c14":"[106, 103, 111, 106, 103, 111, 105, 100, 115, 106, 103, 111, 106, 111, 115, 111, 103, 57, 51, 52, 48, 57, 52, 51, 48, 57, 116, 106, 104, 114, 103, 106, 101, 119, 57, 116, 117, 48, 57, 51, 52, 48, 116, 101, 114, 111, 101, 106, 103, 57, 56, 51, 48, 52, 105, 55, 57, 56, 52, 54, 53, 52, 54, 54, 54, 49, 52, 54, 53, 33, 64, 35, 36, 37, 94, 42, 40, 41, 95, 41, 43, 95, 43, 124, 125, 34, 63, 62, 58, 58, 101, 117, 114, 103, 57, 101, 119, 117, 114, 103, 48, 119, 101, 117, 116, 57, 114, 48, 52, 117, 48, 57, 53, 116, 117, 51, 48, 57, 50, 117, 116, 48, 57, 51, 117, 116, 48, 119, 57, 101]", "c1":"cf3f70a7-7565-44b0-ae3c-83bec549ea8e:104", "c2":"[]", "c3":"103", "c4":"2021-06-25 17:51:53", "c5":"1624614713.201", "c6":"!@#$%90weurtg103", "c7":"10357.0", "c8":"1.2510357E7", "c9":"9874510357", "id":"103" } ], "pkNames":[ "id" ] } 表3 DDS到Kafka的参数说明 参数名称 说明 id DRS内部定义的事件操作的序列号,单调递增。 op 操作类型,比如DELETE,UPDATE,INSERT,DDL。 dbType 源库类型:Mongo。 db 数据库名称。 coll 集合名称。 value 这一条记录的变更值。 where 这一条记录的变更条件。 recordType 具体的记录类型,比如insert、update、replace、doc。其中,update和replace表示op中的UPDATE具体操作。doc表示op中的DELETE删除的是文档数据而非视图数据。 extra 拓展字段,一般和recordType保持一致,作为扩展oplog记录使用。 es 这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 ts 写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 clusterTime 与事件关联的oplog条目的时间戳,格式为timestamp:incre(timestamp是10位unix时间戳,单位为秒;incre代表当前命令在同一秒内执行的次序)。 // insert操作 { "id": 256, "op": "INSERT", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"baz\", \"tags\": [\"mongodb\", \"database\", \"NoSQL\"]}", "where": null, "recordType": "insert", "extra": "insert", "es": 1684315111439, "ts": 1684315111576, "clusterTime": "1684344064:1" } // replace操作 { "id": 340, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\"), \"c1\": \"sss\"}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "replace", "extra": "replace", "es": 1684315951831, "ts": 1684315951961, "clusterTime": "1684344904:9" } // update 更新值操作 { "id": 386, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"$set\": {\"c1\": \"aaa\"}}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "update", "extra": "update", "es": 1684316412008, "ts": 1684316412146, "clusterTime": "1684345365:1" } // update 更新键操作 { "id": 414, "op": "UPDATE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"$unset\": {\"c1\": true}, \"$set\": {\"column1\": \"aaa\"}}", "where": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "recordType": "update", "extra": "update", "es": 1684316692054, "ts": 1684316692184, "clusterTime": "1684345648:1" } // remove 操作 { "id": 471, "op": "DELETE", "dbType": "MongoDB", "db": "ljx", "coll": "ljx", "value": "{\"_id\": ObjectId(\"64650cf67dc36a464e76e583\")}", "where": null, "recordType": "doc", "extra": "doc", "es": 1684317252747, "ts": 1684317252869, "clusterTime": "1684346209:1" } 表4 其他数据库到Kafka的参数说明 参数名称 说明 columnType 源端表字段名称和数据类型。 说明: 数据类型不带长度、精度等。 dbType为Oracle、Microsoft SQL Server时暂为空。 dbType 源库类型。 schema schema名称。 opType 操作类型,比如DELETE,UPDATE,INSERT,DDL。 id DRS内部定义的事件操作的序列号,单调递增。 es 源库不同引擎对应类型如下: GaussDB主备版:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。 GaussDB分布式:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。 PostgreSQL:这一条记录上一个事务的commit时间,13位Unix时间戳,单位为毫秒。 Oracle:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 Microsoft SQL Server:这一条记录的commit时间,13位Unix时间戳,单位为毫秒。 ts 写入到目标kafka的时间,13位Unix时间戳,单位为毫秒。 database 数据库名称,dbType为Oracle时暂时为空。 table 表名。 type 操作类型,比如DELETE,UPDATE,INSERT,DDL。 isDdl 是否是DDL操作。 sql DDL的SQL语句,在DML操作中,取值为""。 sqlType 源端表字段的jdbc类型。 data 最新的数据,为JSON数组,如果type参数是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据。 old 旧数据,如果type参数是更新,则表示更新前的数据;如果是删除,则表示被删除的数据;如果是插入,取值为null。 pkNames 主键名称。 { "columnType": { "timestamp_column": "timestamp without time zone", "tstzrange_column": "tstzrange", "int4range_column": "int4range", "char_column": "character", "jsonb_column": "json", "boolean_column": "boolean", "bit_column": "bit", "smallint_column": "smallint", "bytea_column": "bytea" }, "dbType": "GaussDB Primary/Standby", "schema": "schema01", "opType": "UPDATE", "id": 332, "es": 1639626187000, "ts": 1639629261915, "database": "database01", "table": "table01", "type": "UPDATE", "isDdl": false, "sql": "", "sqlType": { "timestamp_column": 16, "tstzrange_column": 46, "int4range_column": 42, "char_column": 9, "jsonb_column": 22, "boolean_column": 8, "bit_column": 20, "smallint_column": 2, "bytea_column": 15 }, "data": [ { "timestamp_column": "2021-12-16 12:31:49.344365", "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", "int4range_column": "[11,20)", "char_column": "g", "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", "boolean_column": "false", "bit_column": "1", "smallint_column": "12", "bytea_column": "62797465615f64617461" } ], "old": [ { "timestamp_column": "2014-07-02 06:14:00.742", "tstzrange_column": "(\"2010-01-01 14:30:00+08\",\"2010-01-01 15:30:00+08\")", "int4range_column": "[11,20)", "char_column": "g", "jsonb_column": "{\"key1\": \"value1\", \"key2\": \"value2\"}", "boolean_column": "true", "bit_column": "1", "smallint_column": "12", "bytea_column": "62797465615f64617461" } ], "pkNames": null }
  • 前提条件 当前支持的DDL操作包含如下: 表级同步支持:TRUNCATE(仅PostgreSQL 11及以上版本支持)、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)、COMMENT ON COLUMN、COMMENT ON TABLE。 库级同步支持:TRUNCATE(仅PostgreSQL 11及以上版本支持)、CREATE SCHEMA/TABLE、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)、CREATE SEQUENCE、DROP SEQUENCE、ALTER SEQUENCE、CREATE INDEX、ALTER INDEX、DROP INDEX、CREATE VIEW、ALTER VIEW、COMMENT ON COLUMN、COMMENT ON TABLE、COMMENT ON SCHEMA、COMMENT ON SEQUENCE、COMMENT ON INDEX、COMMENT ON VIEW。 表级同步:RENAME表名之后,向更改名称后的表插入新的数据时,DRS不会同步新的数据到目标库。 库级同步:源库使用非CREATE TABLE方式创建的表不会同步到目标库。常见地如:使用CREATE TABLE AS创建表、调用函数创建表。 暂不支持以注释开头的DDL语句的同步,以注释开头的DDL语句将被忽略。 不支持函数和存储过程中DDL语句的同步,函数和存储过程中执行的DDL语句将被忽略。 源库和目标库版本不同时,请使用源库和目标库都兼容的SQL语句执行DDL操作。例如:源库为pg11,目标库为pg12,要将源库表的列类型从char修改为int时,请使用如下语句: alter table tablename alter column columnname type int USING columnname::int; 执行如下操作步骤前,请检查待同步的源数据库public模式下,是否存在名为hwdrs_ddl_info的表、名为hwdrs_ddl_function()的函数、名为hwdrs_ddl_event的触发器。如存在,请将其删除。 库级同步时,如创建无主键表,请执行如下命令,将无主键表复制属性设置为full。 alter table tablename replica identity full;