-
场景说明 假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,如表1所示,A业务操作流程如下: 创建用户信息表。 在用户信息中新增用户的学历、职称等信息。 根据用户编号查询用户姓名和地址。 根据用户姓名进行查询。 查询年龄段在[20–29]之间的用户信息。 数据统计,统计用户信息表的人员数、年龄最大值、年龄最小值、平均年龄。 用户销户,删除用户信息表中该用户的数据。 A业务结束后,删除用户信息表。 表1 用户信息 编号 姓名 性别 年龄 地址 12005000201 A Male 19 IPA, IPB 12005000202 B Female 23 IPC, IPD 12005000203 C Male 26 IPE, IPF 12005000204 D Male 18 IPG, IPH 12005000205 E Female 21 IPI, IPJ 12005000206 F Male 32 IPK, IPL 12005000207 G Female 29 IPM, IPN 12005000208 H Female 30 IPO, IPP 12005000209 I Male 26 IPQ, IPR 12005000210 J Male 25 IPS, IPT
-
建表规范 【强制】创建表指定分桶buckets时,每个桶的数据大小为应保持在100M-3G之间,单分区中最大分桶数据不超过5000。 【强制】表数据超过5亿条以上必须设置分区分桶策略。 【强制】分桶的列不要设置太多,一般情况下1或2个列,同时需要兼顾数据分布均匀和查询吞吐之间的均衡,考虑数据均匀是为了避免某些桶的数据存在倾斜影响数据均衡和查询效率,考虑查询吞吐是为了利用查询SQL的分桶剪裁优化避免全桶扫描提升查询性能,所以优先考虑哪些数据较为均匀且常用于查询条件的列适合做分桶列。 【强制】2000kw以内数据禁止使用动态分区(动态分区会自动创建分区,而小表用户客户关注不到,会创建出大量不使用分区分桶)。 【强制】创建表时的副本数必须至少为2,默认是3,禁止使用单副本。 【建议】单表物化视图不能超过6个。 【建议】对于有大量历史分区数据,但是历史数据比较少,或者不均衡,或者查询概率的情况,使用如下方式将数据放在特殊分区: 对于历史数据,如果数据量比较小我们可以创建历史分区(比如年分区,月分区),将所有历史数据放到对应分区里。 创建历史分区方式:FROM ("2000-01-01") TO ("2022-01-01") INTERVAL 1 YEAR。 【建议】1000w-2亿以内数据为了方便可以不设置分区,直接用分桶策略(不设置其实Doris内部会有个默认分区)。 【建议】如果分桶字段存在30%以上的数据倾斜,则禁止使用Hash分桶策略,改使用random分桶策略:Create table ... DISTRIBUTED BY RANDOM BUCKETS 10 ... 【建议】建表时第一个字段一定是最常查询使用的列,默认有前缀索引快速查询能力,选取分区分桶外最长查询且高基数的列,前缀索引36位,如果列超长也不能使用前缀索引能力。 【建议】亿级别以上数据,如果有模糊匹配或者等值/in条件,可以使用倒排索引或者是Bloomfilter。如果是低基数列的正交查询适合使用bitmap索引。 【强制】Doris建表不要指定Merge-On-Write属性,当前有很多开源问题,不推荐。如使用了该属性,CloudTable服务不承诺SLA。 【建议】根据业务手动分桶,不使用AUTO自动分桶策略。
-
数据查询规范 【强制】鉴于外表存在不稳定性,目前doris暂不支持外表查询。 【强制】in中条件超过2000后,必须修改为子查询。 【强制】禁止使用REST API(Statement Execution Action)执行大量SQL查询,该接口仅仅用于集群维护。 【建议】一次insert into select数据超过1亿条后,建议拆分为多个insert into select语句执行,分成多个批次来执行。如果非要这样执行不可,必须在集群资源相对空闲的时候可以通过调整并发度来加快的数据导入速度 。 例如:set parallel_fragment_exec_instance_num = 8 建议数值是单BE节点上CPU内核的一半。 【强制】query查询条件返回结果在5w条以上,使用JDBC Catalog或者OUTFILE方式导出。不然大量FE上数据传输将占用FE资源,影响集群稳定性。 如果是交互式查询,建议使用分页方式(offset limit),分页要加Order by。 如果是数据导出提供给第三方使用,建议使用outfile或者export方式。 【强制】2个以上大于3亿的表JOIN使用Colocation Join。 【强制】亿级别大表禁止使用select * 查询,查询时需要明确要查询的字段。 使用SQL Block方式禁止这种操作。 如果是高并发点查,建议开启行存(2.x版本)。 使用PreparedStatement查询。 【强制】亿级以上表数据查询必须带分区分桶条件。 【建议】尽量不要使用OR作为JOIN条件。 【建议】大量数据排序(5亿以上)后返回部分数据,建议先减少数据范围再执行排序,否则大量排序会影响性能。 例如:将from table order by datatime desc limit 10优化为from table where datatime='2023-10-20' order by datatime desc limit 10。
-
数据变更类 【强制】应用程序不可以直接使用delete或者update语句变更数据,可以使用CDC的upsert方式来实现。 低频操作上使用,比如Update几分钟更新一次。 如果使用Delete一定带上分区条件。 【强制】禁止使用INSERT INTO tbl1 VALUES (“1”), (“a”);这种方式做数据导入,少量少次写可以,多量多频次时要使用Doris提供的StreamLoad、BrokerLoad、SparkLoad或者Flink Connector方式。 【建议】执行特殊的长SQL操作时,可以使用SELECT /*+ SET_VAR(query_timeout = xxx*/ from table类似这样通过Hint方式去设置Session会话变量,不要设置全局的系统变量。
-
Unique模型 在某些多维分析场景下,用户更关注的是如何保证Key的唯一性,即如何获得Primary Key唯一性约束。因此,我们引入了Unique的数据模型。该模型本质上是聚合模型的一个特例,也是一种简化的表结构表示方式。举例说明: Unique模型表,不推荐开启merge-on-write属性,默认使用merge-on-read。 表11 参数说明 ColumnName Type IsKey Comment user_id BIGINT Yes 用户ID username VARCHAR(50) Yes 用户昵称 city VARCHAR(20) No 用户所在城市 age SMALLINT No 用户年龄 sex TINYINT No 用户性别 phone LARGEINT No 用户电话 address VARCHAR(500) No 用户住址 register_time DATETIME No 用户注册时间 这是一个典型的用户基础信息表。这类数据没有聚合需求,只需保证主键唯一性。(这里的主键为user_id+username)。那么我们的建表语句如下: CREATE TABLE IF NOT EXISTS example_db.expamle_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`phone` LARGEINT COMMENT "用户电话",
`address` VARCHAR(500) COMMENT "用户地址",
`register_time` DATETIME COMMENT "用户注册时间"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
); 而这个表结构,完全同等于以下使用聚合模型描述的表结构: 表12 参数说明 ColumnName Type AggregationType Comment user_id BIGINT - 用户ID username VARCHAR(50) - 用户昵称 city VARCHAR(20) REPLACE 用户所在城市 age SMALLINT REPLACE 用户年龄 sex TINYINT REPLACE 用户性别 phone LARGEINT REPLACE 用户电话 address VARCHAR(500) REPLACE 用户住址 register_time DATETIME REPLACE 用户注册时间 建表语句。 CREATE TABLE IF NOT EXISTS example_db.expamle_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) REPLACE COMMENT "用户所在城市",
`age` SMALLINT REPLACE COMMENT "用户年龄",
`sex` TINYINT REPLACE COMMENT "用户性别",
`phone` LARGEINT REPLACE COMMENT "用户电话",
`address` VARCHAR(500) REPLACE COMMENT "用户地址",
`register_time` DATETIME REPLACE COMMENT "用户注册时间"
)
AGGREGATE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
); 即Unique模型完全可以用聚合模型中的REPLACE方式替代。其内部的实现方式和数据存储方式也完全一样。
-
Duplicate模型 在某些多维分析场景下,数据既没有主键,也没有聚合需求。因此,我们引入Duplicate数据模型来满足这类需求。 表13 数据 ColumnName Type SortKey Comment timestamp DATETIME Yes 日志时间 type INT Yes 日志类型 error_code INT Yes 错误码 error_msg VARCHAR(1024) No 错误详细信息 op_id BIGINT No 负责人ID op_time DATETIME No 处理时间 建表语句。 CREATE TABLE IF NOT EXISTS example_db.expamle_tbl
(
`timestamp` DATETIME NOT NULL COMMENT "日志时间",
`type` INT NOT NULL COMMENT "日志类型",
`error_code` INT COMMENT "错误码",
`error_msg` VARCHAR(1024) COMMENT "错误详细信息",
`op_id` BIGINT COMMENT "负责人id",
`op_time` DATETIME COMMENT "处理时间"
)
DUPLICATE KEY(`timestamp`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
); 这种数据模型区别于Aggregate和Unique模型。数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。 而在建表语句中指定的DUPLICATE KEY,只是用来指明底层数据按照哪些列进行排序。更贴切的名称应该为“Sorted Column”,这里取名“DUPLICATE KEY”只是用以明确表示所用的数据模型。在DUPLICATE KEY的选择上,我们建议适当地选择前2-4列就可以。 这种数据模型适用于既没有聚合需求,又没有主键唯一性约束的原始数据的存储。更多使用场景,可参见聚合模型的局限性。
-
Aggregate模型 以实际的例子来说明什么是聚合模型,以及如何正确的使用聚合模型。 示例1:导入数据聚合 假设业务有以下模式: 表1 参数说明 ColumnName Type AggregationType Comment user_id LARGEINT - 用户ID date DATE - 数据导入日期 city VARCHAR(20) - 用户所在城市 age SMALLINT - 用户年龄 sex TINYINT - 用户性别 last_visit_date DATETIME REPLACE 用户最后一次访问时间 cost BIGINT SUM 用户总消费 max_dwell_time INT MAX 用户最大停留时间 min_dwell_time INT MIN 用户最小停留时间 转换成建表语句,如下所示。 CREATE TABLE IF NOT EXISTS demo.example_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
); 可以看到,这是一个典型的用户信息和访问行为的事实表。在一般星型模型中,用户信息和访问行为一般分别存放在维度表和事实表中。这里我们为了更加方便的解释Doris的数据模型,将两部分信息统一存放在一张表中。 表中的列按照是否设置了AggregationType,分为Key(维度列)和Value(指标列)。没有设置AggregationType的,如user_id、date、age、sex称为Key,而设置了AggregationType的称为Value。 当导入数据时,对于Key列相同的行会聚合成一行,而Value列会按照设置的AggregationType进行聚合。AggregationType目前有以下四种聚合方式: SUM:求和,多行的Value进行累加。 REPLACE:替代,下一批数据中的Value会替换之前导入过的行中的Value。 MAX:保留最大值。 MIN:保留最小值。 表2 原始数据 user_id date city age sex last_visit_date cost max_dwell_time min_dwell_time 10000 2017-10-01 A 20 0 2017-10-01 06:00:00 20 10 10 10000 2017-10-01 A 20 0 2017-10-01 07:00:00 15 2 2 10001 2017-10-01 A 30 1 2017-10-01 17:05:45 2 22 22 10002 2017-10-02 B 20 1 2017-10-02 12:59:12 200 5 5 10003 2017-10-02 C 32 0 2017-10-02 11:20:00 30 11 11 10004 2017-10-01 D 35 0 2017-10-01 10:00:15 100 3 3 10004 2017-10-03 D 35 0 2017-10-03 10:20:22 11 6 6 我们假设这是一张记录用户访问某商品页面行为的表。我们以第一行数据为例,解释如下: 表3 参数说明 数据 说明 10000 用户id,每个用户唯一识别id 2017-10-01 数据入库时间,精确到日期 A 用户所在城市 20 用户年龄 0 性别男(1 代表女性) 2017-10-01 06:00:00 用户本次访问该页面的时间,精确到秒 20 用户本次访问产生的消费 10 用户本次访问,驻留该页面的时间 10 用户本次访问,驻留该页面的时间(冗余) 那么当这批数据正确导入到Doris中后,Doris中最终存储如下: 表4 插入数据 user_id date city age sex last_visit_date cost max_dwell_time min_dwell_time 10000 2017-10-01 A 20 0 2017-10-01 07:00:00 35 10 2 10001 2017-10-01 A 30 1 2017-10-01 17:05:45 2 22 22 10002 2017-10-02 B 20 1 2017-10-02 12:59:12 200 5 5 10003 2017-10-02 C 32 0 2017-10-02 11:20:00 30 11 11 10004 2017-10-01 D 35 0 2017-10-01 10:00:15 100 3 3 10004 2017-10-03 D 35 0 2017-10-03 10:20:22 11 6 6 可以看到,用户10000只剩下了一行聚合后的数据。而其余用户的数据和原始数据保持一致。这里先解释下用户10000 聚合后的数据: 前5列没有变化,从第6列last_visit_date开始: 2017-10-01 07:00:00:因为last_visit_date列的聚合方式为REPLACE,所以2017-10-01 07:00:00替换了2017-10-01 06:00:00保存了下来。 在同一个导入批次中的数据,对于REPLACE这种聚合方式,替换顺序不做保证。如在这个例子中,最终保存下来的,也有可能是2017-10-01 06:00:00。而对于不同导入批次中的数据,可以保证,后一批次的数据会替换前一批次。 35:因为cost列的聚合类型为SUM,所以由20+15累加获得35。 10:因为max_dwell_time列的聚合类型为MAX,所以10和2取最大值,获得10。 2:因为min_dwell_time列的聚合类型为MIN,所以10和2取最小值,获得2。 经过聚合,Doris中最终只会存储聚合后的数据。换句话说,即明细数据会丢失,用户不能够再查询到聚合前的明细数据了。 示例2:保留,明细数据。 接示例1,将表结构修改如下: 表5 参数说明 ColumnName Type AggregationType Comment user_id LARGEINT - 用户ID date DATE - 数据导入日期 timestamp DATETIME - 数据导入时间,精确到秒 city VARCHAR(20) - 用户所在城市 age SMALLINT - 用户年龄 sex TINYINT - 用户性别 last_visit_date DATETIME REPLACE 用户最后一次访问时间 cost BIGINT SUM 用户总消费 max_dwell_time INT MAX 用户最大停留时间 min_dwell_time INT MIN 用户最小停留时间 即增加了一列timestamp,记录精确到秒的数据导入时间。 同时,将AGGREGATE KEY设置为AGGREGATE KEY(user_id, date, timestamp, city, age, sex)。 导入数据如下: 表6 原始数据 user_id date timestamp city age sex last_visit_date cost max_dwell_time min_dwell_time 10000 2017-10-01 2017-10-01 08:00:05 A 20 0 2017-10-01 06:00:00 20 10 10 10000 2017-10-01 2017-10-01 09:00:05 A 20 0 2017-10-01 07:00:00 15 2 2 10001 2017-10-01 2017-10-01 18:12:10 A 30 1 2017-10-01 17:05:45 2 22 22 10002 2017-10-02 2017-10-02 13:10:00 B 20 1 2017-10-02 12:59:12 200 5 5 10003 2017-10-02 2017-10-02 13:15:00 C 32 0 2017-10-02 11:20:00 30 11 11 10004 2017-10-01 2017-10-01 12:12:48 D 35 0 2017-10-01 10:00:15 100 3 3 10004 2017-10-03 2017-10-03 12:38:20 D 35 0 2017-10-03 10:20:22 11 6 6 那么当这批数据正确导入到Doris中后,Doris中最终存储如下: 表7 数据结果 user_id date timestamp city age sex last_visit_date cost max_dwell_time min_dwell_time 10000 2017-10-01 2017-10-01 08:00:05 A 20 0 2017-10-01 06:00:00 20 10 10 10000 2017-10-01 2017-10-01 09:00:05 A 20 0 2017-10-01 07:00:00 15 2 2 10001 2017-10-01 2017-10-01 18:12:10 A 30 1 2017-10-01 17:05:45 2 22 22 10002 2017-10-02 2017-10-02 13:10:00 B 20 1 2017-10-02 12:59:12 200 5 5 10003 2017-10-02 2017-10-02 13:15:00 C 32 0 2017-10-02 11:20:00 30 11 11 10004 2017-10-01 2017-10-01 12:12:48 D 35 0 2017-10-01 10:00:15 100 3 3 10004 2017-10-03 2017-10-03 12:38:20 D 35 0 2017-10-03 10:20:22 11 6 6 示例3:导入数据与已有数据聚合。 接示例1中的参数列表,插入以下表中数据。 表8 原始数据 user_id date city age sex last_visit_date cost max_dwell_time min_dwell_time 10000 2017-10-01 A 20 0 2017-10-01 07:00:00 35 10 2 10001 2017-10-01 A 30 1 2017-10-01 17:05:45 2 22 22 10002 2017-10-02 B 20 1 2017-10-02 12:59:12 200 5 5 10003 2017-10-02 C 32 0 2017-10-02 11:20:00 30 11 11 10004 2017-10-01 D 35 0 2017-10-01 10:00:15 100 3 3 10004 2017-10-03 D 35 0 2017-10-03 10:20:22 11 6 6 在导入一批新的数据: 表9 新数据 user_id date city age sex last_visit_date cost max_dwell_time min_dwell_time 10004 2017-10-03 D 35 0 2017-10-03 11:22:00 44 19 19 10005 2017-10-03 E 29 1 2017-10-03 18:11:02 3 1 1 那么当这批数据正确导入到Doris中后,Doris中最终存储如下: 表10 user_id date city age sex last_visit_date cost max_dwell_time min_dwell_time 10000 2017-10-01 A 20 0 2017-10-01 07:00:00 35 10 2 10001 2017-10-01 A 30 1 2017-10-01 17:05:45 2 22 22 10002 2017-10-02 B 20 1 2017-10-02 12:59:12 200 5 5 10003 2017-10-02 C 32 0 2017-10-02 11:20:00 30 11 11 10004 2017-10-01 D 35 0 2017-10-01 10:00:15 100 3 3 10004 2017-10-03 D 35 0 2017-10-03 11:22:00 55 19 6 10005 2017-10-03 E 29 1 2017-10-03 18:11:02 3 1 1 可以看到,用户10004的已有数据和新导入的数据发生了聚合。同时新增了10005用户的数据。 数据的聚合,在Doris中有如下三个阶段发生: 每一批次数据导入的ETL阶段。该阶段会在每一批次导入的数据内部进行聚合。 底层BE进行数据Compaction的阶段。该阶段,BE会对已导入的不同批次的数据进行进一步的聚合。 数据查询阶段。在数据查询时,对于查询涉及到的数据,会进行对应的聚合。 数据在不同时间,可能聚合的程度不一致。例如一批数据刚导入时,可能还未与之前已存在的数据进行聚合。但是对于用户而言,用户只能查询到聚合后的数据。即不同的聚合程度对于用户查询而言是透明的。用户需始终认为数据以最终的完成的聚合程度存在,而不应假设某些聚合还未发生。
-
JDBC通过ssl方式连接doris(无需验证证书) 在应用层进行代码重试和负载均衡时,代码重试需要应用自己多个配置doris前端节点地址。比如发现一个连接异常退出,就自动在其他连接上进行重试。 前提条件:集群必须开启HTTPS。 下载证书请在集群详情页面下载。 样例代码: public class Main {
private static String URL = "jdbc:mysql:loadbalance://" +
"[FE1_host]:[FE1_port],[FE2_host]:[FE2_port],[FE3_host]:[FE3_port]/[your_database]?" +
"loadBalanceConnectionGroup=first&ha.enableJMX=true";
static Connection getNewConnection() throws SQLException, ClassNotFoundException {
Class.forName("com.mysql.cj.jdbc.Driver");
// 认证用的密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
// 本示例以密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量
String password = System.getenv("USER_PASSWORD");
String user = "your_username";
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("useSSL", "true");
props.setProperty("requireSSL", "true");
return DriverManager.getConnection(URL, props);
}
public static void main(String[] args) throws Exception {
Connection c = getNewConnection();
try {
System.out.println("begin print");
String query = "your sqlString";
c.setAutoCommit(false);
Statement s = c.createStatement();
ResultSet resultSet = s.executeQuery(query);
while(resultSet.next()) {
int id = resultSet.getInt(1);
System.out.println("id is: "+id);
}
System.out.println("end print");
Thread.sleep(Math.round(100 * Math.random()));
c.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} 父主题: 通过JDBC方式连接Doris
-
Python代码样例 # -*- coding: utf-8 -*-
# 引入公共模块
import sys
import os
# 引入Thrift自带模块,如果不存在则需要执行pip install thrift安装
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.transport import THttpClient
from thrift.transport import TSocket
# 引入通过hbase.thrift生成的模块
gen_py_path = os.path.abspath('gen-py')
sys.path.append(gen_py_path)
from hbase import THBaseService
from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TGet, TPut, TScan
# 配置CloudTable HBase集群的ThriftServer的IP,可以通过集群的详情页面获取
host = "x.x.x.x"
socket = TSocket.TSocket(host, 9090)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
transport.open()
# 测试表名
tableNameInBytes = "test".encode("utf8")
tableName = TTableName(ns="default".encode("utf8"), qualifier=tableNameInBytes)
# 预分region的split key
splitKeys=[]
splitKeys.append("row3".encode("utf8"))
splitKeys.append("row5".encode("utf8"))
# 建表操作
client.createTable(TTableDescriptor(tableName=tableName, columns=[TColumnFamilyDescriptor(name="cf1".encode("utf8"))]), splitKeys)
print("Create table %s success." % tableName)
# Put单条数据
put = TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])
client.put(tableNameInBytes, put)
print("Put single row success.")
# Put多条数据
puts = []
puts.append(TPut(row="row4".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row6".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row8".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
client.putMultiple(tableNameInBytes, puts)
print("Put rows success.")
# Get单条数据
get = TGet(row="row1".encode("utf8"))
result = client.get(tableNameInBytes, get)
print("Get Result: ", result)
# Get多条数据
gets = []
gets.append(TGet(row="row4".encode("utf8")))
gets.append(TGet(row="row8".encode("utf8")))
results = client.getMultiple(tableNameInBytes, gets)
print("Get multiple rows: ", results)
# Scan数据
startRow, stopRow = "row4".encode("utf8"), "row9".encode("utf8")
scan = TScan(startRow=startRow, stopRow=stopRow)
caching=1
results = []
while True:
scannerResult = client.getScannerResults(tableNameInBytes, scan, caching)
lastOne = None
for result in scannerResult:
results.append(result)
print("Scan Result: ", result)
lastOne = result
# 没有更多数据,退出
if lastOne is None:
break
else:
# 重新生成下一次scan的startRow
newStartRow = bytearray(lastOne.row)
newStartRow.append(0x00)
scan = TScan(startRow=newStartRow, stopRow=stopRow)
# 禁用和删除表
client.disableTable(tableName)
print("Disable table %s success." % tableName)
client.deleteTable(tableName)
print("Delete table %s success." % tableName)
# 所有操作都结束后,关闭连接
transport.close()
-
数据查询 【规则】不要使用select *,只查询需要的字段,减少机器负载,提升查询性能。 OLAP分析场景,一张大宽表通常能有几百甚至上千列,选择其中少数的几列做维度列、指标列计算。在这种场景下,ClickHouse的数据也是按照列存储。如果使用select *,会加重系统的压力。 【规则】通过limit限制查询返回的数据量,节省计算资源、减少网络开销。 如果返回的数据量过大,客户端有可能出现内存溢出等服务异常。在前端使用ClickHouse的场景下,如果要查询的数据量比较大,建议每次可适当的进行分页查询返回数据,以减少查询数据量对网络带宽和计算资源的占用。 【规则】关联查询必须大表join小表。 对于ClickHouse来说,原则上需要把多表join模型提前加工为宽表模型,多个表以及维度表变化比较频繁情况下,不适合进行宽表加工处理,必须使用Join模型以实时查询到最新数据。两个表做join操作,建议大表join小表,必须使用关联条件。小表的数据量控制在百万~千万行级别,且需要在join前把小表数据通过条件进行有效过滤。 【建议】使用GLOBAL JOIN/IN替换普通的JOIN。 ClickHouse基于分布式表查询会转换成所有分片的本地表操作,再汇总结果。实际使用中,join和global join的执行逻辑差别很大,建议使用global join做分布式表查询。 【规则】合理使用数据表的分区字段和索引字段。 MergeTree引擎,数据是以分区目录形式进行组织存储的,在进行数据查询时,使用分区可以有效跳过无用的数据文件,减少数据的读取。 MergeTree引擎会根据索引字段进行数据排序,并且根据index_granularity的配置生成稀疏索引。根据索引字段查询,能快速过滤数据,减少数据的读取,大幅度提升查询性能。 【建议】明确数据查询的范围。 增加条件过滤和查询数据周期过滤,缩小数据查询范围。例如查询指定分区,通过指定分区字段会减少底层数据库扫描的文件数量,提升查询性能。例如:700个分区的千列大表,需要查询一个分区中有7000万数据,其他699个分区中无数据,虽然只有一个分区有数据,其他分区无数据,但是查询指定分区为百毫秒级性能,没有指定分区查询性能为1~2秒左右,性能相差20倍。 【建议】慎用final查询。 在查询语句的最后跟上final,通常是对于ReplacingMergeTree引擎,数据不能完全去重情况下,少数开发人员习惯使用final关键字进行实时合并去重操作(merge-on-read),保证查询数据无重复数据。可以通过argMax函数或其他方式规避此问题。 【建议】使用物化视图加速查询。 对于固定查询方式的场景,建议使用物化视图,提前做好数据聚合,相对于查询明细表,性能有数量级的提升。 【建议】物化视图创建时不会进行语法校验,只有发生实际数据插入与查询时才会出错。物化视图上线前,需做好充分验证。
-
建表规范 【规则】不要在system库中创建业务表。system数据库是ClickHouse默认的系统数据库,默认数据库中的系统表记录的是系统的配置、元数据等信息数据。业务在使用ClickHouse的时候,需要指定自己业务的数据库进行连接和使用,业务相关的表创建在自己业务库中,不要将业务表创建在系统数据库中,避免对系统数据库造成不必要的影响。 【规则】数据库和表的命名尽量不要使用SQL保留字,请注意大小写敏感。如果必须使用一些保留关键字,请使用双引号或者反引号进行转义。 【规则】不允许使用字符类型存放时间或日期类数据,尤其是日期字段进行运算或比较的时候。 【规则】不允许使用字符类型存放数值类型的数据,尤其是数值字段进行运算或者比较的时候。 【建议】不建议表使用Nullable列,可以考虑使用字符串“NA”。 Nullable类型的列在做查询条件判断时,会进一步做判空等处理,防止造成额外的计算开销。根据现网的历史经验,Nullable类型的字符串查询性能比String慢20%至30%左右,从性能方面考虑,非必要不使用Nullable类型。
-
数据写入 【规则】外部模块保证数据导入的幂等性。 ClickHouse不支持数据写入的事务保证。通过外部导入数据模块控制数据的幂等性,比如某个批次的数据导入异常,则drop对应分区数据或清理掉导入的数据后,重新导入该分区或批次数据。 【规则】大批量少频次的写入数据。 ClickHouse每次插入数据时,都会生成一到多个part文件,如果data part过多,merge压力会变大,甚至出现各种异常情况影响数据插入。建议每个批次5k到100k行,根据写入字段数量调整写入行数,降低写入节点内存和CPU的压力,每秒不超过1次插入。 【建议】一次只插入一个分区内的数据。 如果数据属于不同的分区,则每次插入不同分区的数据会独立生成part文件,导致part总数量膨胀,建议一批插入的数据属于同一个分区。 【建议】慎用分布式表批量插入。 写分布式表时,数据会分发到集群的所有本地表,每个本地表插入的数据量是总插入量的1/N,batch size可能比较小,会导致data part过多,merge压力变大,甚至出现异常影响数据插入。 数据的一致性问题:数据先在分布式表写入节点的主机落盘,然后数据被异步地发送到本地表所在主机进行存储,中间没有一致性的校验,如果分布式表写入数据的主机出现异常,会存在数据丢失风险。 对于数据写分布式表和数据写本地表相比,分布式表数据写入性能会变慢,单批次分布式表写入节点的磁盘和网络IO会成为性能的瓶颈点。 分布式表转发给各个shard成功与否,插入数据的客户端是无法感知,转发失败的数据会不断重试转发消耗CPU。 只有在数据去重的场景下,可以使用分布式表插入,通过sharding key将要去重的数据转发到同一个shard,方便后续去重查询。 【建议】慎用delete、update操作。 标准SQL的更新、删除操作是同步的,即客户端等服务端返回执行结果在执行。而Clickhouse的update、delete是通过异步方式实现的,当执行update语句时,服务端立即响应,实际上此时数据还没变,而是排队等待,可能会出现操作覆盖的情况,也无法保证操作的原子性。如果业务场景有update、delete等操作,建议使用ReplacingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree引擎。 【建议】谨慎执行optimize操作。 Optimize一般会对表做重写操作,建议在业务压力小时候进行操作,否则对IO/MEM/CPU资源有较大消耗,导致业务查询变慢或不可用。 【建议】ALTER TABLE修改表数据。 建议慎用delete、update的mutation操作。 标准SQL的更新、删除操作是同步的,即客户端要等服务端返回执行结果(通常是int值);而ClickHouse的update、delete是通过异步方式实现的,当执行update语句时,服务端立即返回执行成功还是失败结果,但是实际上此时数据还没有修改完成,而是在后台排队等着进行真正的修改,可能会出现操作覆盖的情况,也无法保证操作的原子性。 业务场景要求有update、delete等操作,建议使用ReplacingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree引擎,使用方式参见表引擎。 建议少或不增删数据列。 业务提前规划列个数,如果将来有更多列要使用,可以规划预留多列,避免在生产系统跑业务过程中进行大量的alter table modify列操作,导致不可以预知的性能、数据一致性问题。
-
ClickHouse通过MySQL引擎对接RDS服务 MySQL引擎用于将远程的MySQL服务器中的表映射到ClickHouse中,并允许您对表进行INSERT和SELECT查询,以方便您在ClickHouse与MySQL之间进行数据交换。 MySQL引擎使用语法: CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MySQL('host:port', ['database' | database], 'user', 'password') 表1 MySQL数据库引擎参数说明 参数 描述 hostport RDS服务MySQL数据库实例IP地址和端口。 database RDS服务MySQL数据库名。 user RDS服务MySQL数据库用户名。 password RDS服务MySQL数据库用户密码。 MySQL引擎使用示例: 连接到RDS服务的MySQL数据库。详细操作可以参考RDS服务MySQ
L实例 连接。 在MySQL数据库上创建表,并插入数据。 使用客户端命令连接ClickHouse。 非安全集群连接命令 ./clickhouse client --host 集群内网地址 --port 9000 --user admin --password password 安全集群连接命令,详细操作请参见ClickHouse安全通道章节。 ./clickhouse client --host 集群内网地址 --port 9440 --user admin --password password --secure --config-file /root/config.xml 集群内网地址:集群详情页面中集群访问地址,这里替换成您自己购买的集群的访问地址。 在ClickHouse中创建MySQL引擎的数据库,创建成功后自动与MySQL服务器交换数据。 CREATE DATABASE mysql_db ENGINE = MySQL('RDS服务MySQL数据库实例IP地址:MySQL数据库实例端口', 'MySQL数据库名', 'MySQL数据库用户名', 'MySQL数据库用户名密码'); 切换到新建的数据库mysql_db,并查询表数据。 USE mysql_db; 在ClickHouse中查询MySQL数据库表数据。 SELECT * FROM mysql_table;
┌─int_id─┬─float─┐
│ 1 │ 2 │
└─────┴──── ┘ 新增插入数据后也可以正常进行查询。 INSERT INTO mysql_table VALUES (3,4); SELECT * FROM mysql_table;
┌─int_id─┬─float─┐
│ 1 │ 2 │
│ 3 │ 4 │
└─────┴──── ┘
-
操作步骤 确认CloudTable集群已经安装,并正常运行。 准备Windows弹性云服务器。 具体操作请参见准备弹性云服务器章节。 请在Windows的弹性云服务器上安装JDK1.7及以上版本,强烈推荐使用JDK1.8及以上版本,并且安装Eclipse,Eclipse使用JDK1.7及以上的版本。 如果使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 如果使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。
-
基本原理 下图展示了Stream load的主要流程,省略了一些导入细节。 ^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+ Stream load中,Doris会选定一个节点作为Coordinator节点。该节点负责接数据并分发数据到其他数据节点。您可以通过HTTP协议提交导入命令。如果提交到FE,则FE会通过HTTP redirect指令将请求转发给某一个BE。用户也可以直接提交导入命令给某一指定BE。导入的最终结果由Coordinator BE返回给用户。