-
前提条件 已创建包含Doris服务的集群,集群内各服务运行正常。 待连接Doris数据库的节点与
MRS 集群网络互通。 创建具有Doris管理权限的用户。 集群已启用Kerberos认证(安全模式) 在
FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。 使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。 已准备待导入至Doris的数据文件。
-
前提条件 已创建包含Doris服务的集群,集群内各服务运行正常。 待连接Doris数据库的节点与MRS集群网络互通。 创建具有Doris管理权限的用户。 集群已启用Kerberos认证(安全模式) 在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。 使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。 Doris中已安装并启动DBroker实例。 已安装Hive客户端。 如果Doris通过Broker Load跨集群导入数据,需要配置跨集群互信,相关操作可参考配置跨Manager集群互信。
-
步骤一:创建CloudTable ClickHouse集群 登录
表格存储服务 控制台,创建非安全ClickHouse集群。 下载客户端和客户端校验文件。 创建E
CS 。 安装客户端并校验客户端安装客户端并校验客户端。 建立flink数据库。 create database flink; 使用flink数据库。 use flink; 创建flink.order表。 create table flink.order(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 查看表是否创建成功。 select * from flink.order;
-
请求示例 创建数据导入任务 POST https://{endpoint}/v1.0/{project_id}/data/import-records
{
"packages" : [ "my_package1", "my_package2" ],
"type" : "OBS",
"path" : "octopus-user-087679f0aaxxxxxxxxxxxxxx2f5e902b/rosbag/DO202302010834046875241",
"ak" : "ABCTUKMSXXXXXXXXXOKY",
"sk" : "012345jPI3YpmVRVTlbExxxxxxxxxx4Pvmkhp0L1s",
"agreement_confirm" : true,
"opendata_processor_config" : {
"envs" : [ {
"key" : "topic",
"value" : "camera_1"
} ],
"processor_id" : 10241,
"resource_spec" : "4Core_8GiB",
"cluster_type" : "CCE",
"cluster_id" : "087679f0aaxxxxxxxxxxxxxx2f5e902b"
}
}
-
请求参数 表2 请求Header参数 参数 是否必选 参数类型 描述 X-Auth-Token 是 String 用户Token,通过调用
IAM 服务获取用户Token接口获取(响应消息头中X-Subject-Token的值) Content-Type 是 String 内容类型。填application/json 表3 请求Body参数 参数 是否必选 参数类型 描述 type 是 String 传输方式 path 是 String 数据包路径,路径长度不超过255位 ak 是 String ak值,大写英文字母或数字,长度为20位 sk 是 String sk值,英文字母或数字,长度为40位 packages 是 Array of strings 数据包名列表,最多10个数据包 agreement_confirm 是 Boolean 是否同意协议,必须为true opendata_processor_config 否 DataProcessorConfig object 转opendata处理任务 desensitize_processor_config 否 DataProcessorConfig object 脱敏处理任务 workspace_id 是 String 工作空间id 表4 DataProcessorConfig 参数 是否必选 参数类型 描述 processor_id 是 Long 算子id,配置算子时此项必填 resource_spec 是 String 资源规格,配置算子时此项必填,示例:1Core_4GiB。具体获取方式可参考“查询可用资源规格“接口 envs 否 Array of JobEnv objects 环境变量,最多十组 cluster_type 否 String 集群类型 cluster_id 否 String 集群id 表5 JobEnv 参数 是否必选 参数类型 描述 key 是 String 环境变量键,不能重复,配置环境变量时此项必填 value 是 String 环境变量值,配置环境变量时此项必填
-
响应参数 状态码:201 表6 响应Body参数 参数 参数类型 描述 payload payloadWithSingleString object 实际返回信息 meta_info RespMetaInfo object 元数据 表7 payloadWithSingleString 参数 参数类型 描述 item String 创建的导入任务id 表8 RespMetaInfo 参数 参数类型 描述 current_time String 请求时间,UTC时间格式,时间为UTC标准时间
-
开始导入 下面我们通过几个实际的场景示例来看Broker Load的使用。 数据样例: '100','101','102','103','104','105',100.00,100.01,100.02,'100',200,100.08,2022-04-01 '101','102','103','104','105','105',100.00,100.01,100.02,'100',200,100.08,2022-04-02 '102','103','104','105','106','105',100.00,100.01,100.02,'100',200,100.08,2022-04-03 准备工作: 在本地创建示例数据文件source_text.txt,并上传至hdfs的/tmp/。 在hive中创建ods_source表。 CREATE TABLE `ods_source`(
`id` string,
`store_id` string,
`company_id` string,
`tower_id` string,
`commodity_id` string,
`commodity_name` string,
`commodity_price` double,
`member_price` double,
`cost_price` double,
`unit` string,
`quantity` string,
`actual_price` double,
`day ` string
)
row format delimited fields terminated by ','
lines terminated by '\n'
stored as textfile; 将hdfs创建的txt文件导入到ods_source表。 load data inpath '/tmp/source_text.txt' into table ods_source;
-
相关系统配置 FE配置。 下面几个配置属于Broker load的系统级别配置,也就是作用于所有Broker load导入任务的配置。主要通过修改FE配置项来调整配置值。 max_bytes_per_broker_scanner/max_broker_concurrency max_bytes_per_broker_scanner配置限制了单个BE处理的数据量的最大值。max_broker_concurrency配置限制了一个作业的最大的导入并发数。最小处理的数据量(默认64M),最大并发数,源文件的大小和当前集群BE的个数 共同决定了本次导入的并发数。 本次导入并发数=Math.min(源文件大小/最小处理量(默认64M),最大并发数,当前BE节点个数)。
本次导入单个BE的处理量=源文件大小/本次导入的并发数。 通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner*BE节点数。如果需要导入更大数据量,则需要适当调整max_bytes_per_broker_scanner参数的大小。 默认配置: 参数名:max_broker_concurrency, 默认10。 参数名:max_bytes_per_broker_scanner,默认3G,单位bytes。
-
作业调度 系统会限制一个集群内正在运行的Broker Load作业数量,以防止同时运行过多的Load作业。 首先,FE的配置参数:desired_max_waiting_jobs会限制一个集群内未开始或正在运行(作业状态为PENDING或LOADING)的Broker Load作业数量。默认为100。如果超过这个阈值,新提交的作业将会被直接拒绝。 一个Broker Load作业会被分为pending task和loading task阶段。其中pending task负责获取导入文件的信息,而loading task会发送给BE执行具体的导入任务。 FE的配置参数async_pending_load_task_pool_size用于限制同时运行的pending task的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入LOADING状态开始执行,而其他作业处于PENDING等待状态。 FE的配置参数async_loading_load_task_pool_size用于限制同时运行的loading task的任务数量。一个Broker Load作业会有1 pending task和多个loading task(等于LOAD语句中DATA INFILE子句的个数)。所以async_loading_load_task_pool_size应该大于等于async_pending_load_task_pool_size。
-
基本原理 用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。 BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。 +
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+
-
ClickHouse通过MySQL引擎对接RDS服务 MySQL引擎用于将远程的MySQL服务器中的表映射到ClickHouse中,并允许您对表进行INSERT和SELECT查询,以方便您在ClickHouse与MySQL之间进行数据交换。 MySQL引擎使用语法: CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MySQL('host:port', ['database' | database], 'user', 'password') MySQL数据库引擎参数说明: host:port :RDS服务MySQL数据库实例IP地址和端口。 database :RDS服务MySQL数据库名。 user :RDS服务MySQL数据库用户名。 password:RDS服务MySQL数据库用户密码,命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。 MySQL引擎使用示例: 连接到RDS服务的MySQL数据库。详细操作可以参考RDS服务MySQ
L实例 连接。 在MySQL数据库上创建表,并插入数据。 创建表mysql_table: CREATE TABLE `mysql_table` ( `int_id` INT NOT NULL AUTO_INCREMENT, `float` FLOAT NOT NULL, PRIMARY KEY (`int_id`)); 插入表数据: insert into mysql_table (`int_id`, `float`) VALUES (1,2); 登录ClickHouse客户端安装节点。执行以下命令,切换到客户端安装目录。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建ClickHouse表的权限,具体请参见ClickHouse用户及权限管理章节,为用户绑定对应角色。如果当前集群未启用Kerberos认证,则无需执行本步骤。 如果是MRS 3.1.0版本集群,则需要先执行:export CLICKHOUSE_SECURITY_ENABLED=true kinit 组件业务用户 例如,kinit clickhouseuser。 使用客户端命令连接ClickHouse。 clickhouse client --host clickhouse实例IP --user 用户名 --password --port 端口号 输入用户密码 在ClickHouse中创建MySQL引擎的数据库,创建成功后自动与MySQL服务器交换数据。 CREATE DATABASE mysql_db ENGINE = MySQL('RDS服务MySQL数据库实例IP地址:MySQL数据库实例端口', 'MySQL数据库名', 'MySQL数据库用户名', 'MySQL数据库用户名密码'); 切换到新建的数据库mysql_db,并查询表数据。 USE mysql_db; 在ClickHouse中查询MySQL数据库表数据。 SELECT * FROM mysql_table; ┌─int_id─┬─float─┐
│ 1 │ 2 │
└─────┴──── ┘ 新增插入数据后也可以正常进行查询。 INSERT INTO mysql_table VALUES (3,4); SELECT * FROM mysql_table; ┌─int_id─┬─float─┐
│ 1 │ 2 │
│ 3 │ 4 │
└─────┴──── ┘
-
示例 以下示例中,“backup/MPPDB_backup.sql”表示导出的文件,其中backup表示相对于当前目录的相对目录;“37300”表示数据库服务器端口;“testdb”表示要访问的数据库名。 导出操作时,请确保该目录存在并且当前的操作系统用户对其具有读写权限。 示例一 执行gs_dump,导出testdb数据库全量信息,导出的MPPDB_backup.sql文件格式为纯文本格式。 gs_dump -U omm -f backup/MPPDB_backup.sql -p 37300 testdb -F p
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 09:49:17]: The total objects number is 356.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 09:49:17]: [100.00%] 356 objects have been dumped.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 09:49:17]: dump database testdb successfully
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 09:49:17]: total time: 1274 ms 示例二 执行gs_dump,导出testdb数据库全量信息,导出的MPPDB_backup.tar文件格式为tar格式。 gs_dump -U omm -f backup/MPPDB_backup.tar -p 37300 testdb -F t
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:02:24]: The total objects number is 1369.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:02:53]: [100.00%] 1369 objects have been dumped.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:02:53]: dump database testdb successfully
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:02:53]: total time: 50086 ms 示例三 执行gs_dump,导出testdb数据库全量信息,导出的MPPDB_backup.dmp文件格式为自定义归档格式。 gs_dump -U omm -f backup/MPPDB_backup.dmp -p 37300 testdb -F c
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:05:40]: The total objects number is 1369.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:06:03]: [100.00%] 1369 objects have been dumped.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:06:03]: dump database testdb successfully
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:06:03]: total time: 36620 ms 示例四 执行gs_dump,导出testdb数据库全量信息,导出的MPPDB_backup文件格式为目录格式。 gs_dump -U omm -f backup/MPPDB_backup -p 37300 testdb -F d
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:16:04]: The total objects number is 1369.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:16:23]: [100.00%] 1369 objects have been dumped.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:16:23]: dump database testdb successfully
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:16:23]: total time: 33977 ms 示例五 执行gs_dump,导出testdb数据库信息,但不导出/home/MPPDB_temp.sql中指定的表信息。导出的MPPDB_backup.sql文件格式为纯文本格式。 gs_dump -U omm -p 37300 testdb --exclude-table-file=/home/MPPDB_temp.sql -f backup/MPPDB_backup.sql
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:37:01]: The total objects number is 1367.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:37:22]: [100.00%] 1367 objects have been dumped.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:37:22]: dump database testdb successfully
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-27 10:37:22]: total time: 37017 ms 示例六 执行gs_dump,仅导出依赖于指定表testtable的视图信息。 gs_dump -U omm -s -p 37300 testdb -t PUBLIC.testtable --include-depend-objs --exclude-self -f backup/MPPDB_backup.sql -F p
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-15 14:12:54]: The total objects number is 331.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-15 14:12:54]: [100.00%] 331 objects have been dumped.
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-15 14:12:54]: dump database testdb successfully
gs_dump[user='omm'][localhost][port='37300'][testdb][2018-06-15 14:12:54]: total time: 327 ms
-
注意事项 使用gs_dump前请确保gs_dump版本与数据库版本保持一致,高版本gs_dump不保证完全兼容低版本内核数据。 gs_dump不适合使用在库中对象数量(表、视图、索引等)过多的场景。当库中对象数量超过十万级,或者对象间依赖关系过于复杂时,gs_dump导出时间会很长。 禁止修改-F c/d/t格式导出的文件和内容,否则可能无法恢复成功。对于-F p格式导出的文件,如有需要,可谨慎编辑导出的文件。 为了保证数据一致性和完整性,gs_dump会对需要转储的表设置共享锁。如果表在别的事务中设置了共享锁,gs_dump会等待锁释放后锁定表。如果无法在指定时间内锁定某个表,转储会失败。用户可以通过指定--lock-wait-timeout选项,自定义等待锁超时时间。 不支持加密导出存储过程和函数。 对于物化视图,本工具仅支持物化视图定义的导出,在导入后需手动执行REFRESH命令来进行数据恢复。 对于临时对象,本工具仅支持导出全局临时表。 本工具不支持在备机上使用。 由于DN上系统表中分布信息不完整,所以在DN使用gs_dump时,不会转储表的分布信息。 gs_dump导出分区索引时,部分索引分区的属性无法导出,比如索引分区的unusable状态。可以通过查询系统表PG_PARTITION或者查询视图ADM_IND_PARTITIONS获取索引分区的具体属性,通过ALTER INDEX命令可以手动设置索引分区属性。 对于定时任务,本工具仅支持导出在MYSQL兼容性数据库中,通过CREATE EVENT创建的定时任务,通过高级包创建的定时任务不支持被导出。仅初始用户可导出所有定时任务,具有sysadmin权限的用户可导出owner为该用户的定时任务,没有相关权限的普通用户无法导出定时任务。 gs_dump不支持导出自定义Tokenweight分词词典,可以根据报错WARNING: dictionary xx cannot be automatically exported, please create it manually手动创建对应分词词典。 如果数据库中存在初始用户创建的表且表上有含用户自定义函数的表达式索引,系统管理员使用gs_dump导出后,需要使用初始用户通过gsql或gs_restore进行导入。否则会因为安全原因,导致创建索引失败。 普通用户不支持导出DIRECTORY、SYNONYM,若普通用户进行相关导出,会提示“WARNING: xx not dumped because current user is not a superuser”。 如果源数据库实例的template1中存在本地数据,请将gs_dump的输出恢复到一个真正的空数据库(从template0而非template1复制),否则可能会因为被添加对象的定义被复制,出现错误。创建一个无本地添加的空数据库,需从template0而非template1复制,例如: CREATE DATABASE foo WITH TEMPLATE template0; tar归档形式的单个文件大小不得超过8GB(tar文件格式的固有限制)。tar文档整体大小和任何其他输出格式没有限制,操作系统可能对此有要求。 由gs_dump生成的转储文件不包含优化程序用来做执行计划决定的统计数据。因此,建议从某转储文件恢复之后运行ANALYZE以确保最佳效果。转储文件不包含ALTER DATABASE…SET命令、数据库用户和其他完成安装设置,这些设置由gs_dumpall转储。 gs_dump当前设计不兼容三权分立,三权分立下仅可通过初始用户使用gs_dump工具。导入的文件也需要通过初始用户执行。 gs_dump不导出失效函数,需要导出函数前请确保函数有效,失效的函数会打印告警,提示对应OID的函数失效。 PACKAGE外的函数参数类型是表的列类型(table.column%type)、视图的列类型(view.column%type),且列类型是基础类型时,导出的函数定义中,参数类型会被转化为对应的基础类型。通过ALTER TABLE MODIFY COLUMN的方式修改表的列类型并执行ALTER FUNCTION COMPILE,或者修改视图查询表的列类型后重建视图并执行ALTER FUNCTION COMPILE,PG_PROC系统表中该函数PROARGSRC字段中参数类型不会随之变化,导出的函数定义中,参数类型依然是对象列类型变更前的基础类型。
-
工具介绍 gs_dump是
GaussDB 用于导出数据库相关信息的工具,用户可以自定义导出一个数据库或其中的对象(模式、表、视图等)。支持导出的数据库可以是默认数据库postgres,也可以是自定义数据库。 gs_dump支持SSL加密通信。 gs_dump工具在进行数据导出时,其他用户可以访问集群数据库(读或写)。 gs_dump工具导出完整一致的数据。例如,T1时刻启动gs_dump导出A数据库,那么导出数据结果将会是T1时刻A数据库的数据状态,T1时刻之后对A数据库的修改不会被导出。 gs_dump支持将数据库信息导出至纯文本格式的SQL脚本文件或其他归档文件中。 纯文本格式的SQL脚本文件:包含将数据库恢复为其保存时的状态所需的SQL语句。通过gsql连接数据库运行该SQL脚本文件,可以恢复数据库。即使在其他主机和其他数据库产品上,只要对SQL脚本文件稍作修改,也可以用来重建数据库。 归档格式文件:包含将数据库恢复为其保存时的状态所需的数据,可以是tar格式、目录归档格式或自定义归档格式,详见表1。该导出结果必须与gs_restore配合使用来恢复数据库,使用gs_restore工具导入时,系统允许用户选择需要导入的内容,甚至可以在导入之前对等待导入的内容进行排序。 gs_dump支持四种不同的导出文件格式,通过[-F或者--format=]选项指定,具体如表1所示。 表1 导出文件格式 格式名称 -F的参数值 说明 建议 对应导入工具 纯文本格式 p 纯文本脚本文件包含SQL语句和命令。命令可以由gsql命令行终端程序执行,用于重新创建数据库对象并加载表数据。 小型数据库,或者需要对导出的sql文件进行修改,推荐纯文本格式。 使用gsql工具恢复数据库对象前,可根据需要使用文本编辑器编辑纯文本导出文件。 自定义归档格式 c 一种二进制文件。支持从导出文件中恢复所有或所选的数据库对象。 中型或大型数据库,推荐自定义归档格式。 使用gs_restore可以选择要从自定义归档/目录归档/tar归档导出文件中导入相应的数据库对象。 目录归档格式 d 该格式会创建一个目录,该目录包含两类文件,一类是包含数据库对象的目录文件,另一类是每个表和blob对象对应的数据文件。 需要将数据库对象与数据分目录存储导出,推荐使用目录归档格式。 tar归档格式 t tar归档文件支持从导出文件中恢复所有或所选的数据库对象。tar归档格式不支持压缩且对于单个文件大小应小于8GB。 需要将归档结果导出并打包,可以使用tar归档格式。 可以使用gs_dump工具将文件压缩为目录归档或自定义归档导出文件,减少导出文件的大小。生成目录归档或自定义归档导出文件时,默认进行中等级别的压缩。gs_dump程序无法压缩已归档导出文件。 M-Compatibility模式数据库下,禁止在lower_case_table_names参数不同的库之间进行导入导出,否则可能引起数据丢失。
-
命令格式 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 LOAD [ DATA ]
[CHARACTERSET char_set_name]
[INFILE [directory_path] [filename ] ]
[BADFILE [directory_path] [filename ] ]
[OPTIONS(name=value)]
[{ INSERT | APPEND | REPLACE | TRUNCATE }]
INTO TABLE table_name
[{ INSERT | APPEND | REPLACE | TRUNCATE }]
[FIELDS CSV]
[TERMINATED [BY] { 'string' }]
[OPTIONALLY ENCLOSED BY { 'string' }]
[TRAILING NULLCOLS]
[ WHEN { (start:end) | column_name } {= | !=} 'string' ]
[(
col_name [ [ POSITION ({ start:end }) ] ["sql_string"] ] | [ FILLER [column_type [external] ] ] | [ CONSTANT "string" ] | [ SEQUENCE ( { COUNT | MAX | integer } [, incr] ) ]|[NULLIF (COL=BLANKS)]
[, ...]
)]