华为云用户手册

  • 建议 一次只插入一个分区内的数据 如果数据属于不同的分区,则每次插入,不同分区的数据会独立生成part文件,导致part总数量膨胀,建议一批插入的数据属于同一个分区。 写入速率 单节点写入速度为50~200MB/S,如果写入的数据每行为1Kb,那么写入的速度为50,000到200,000行每秒,如果行数据容量更小,那么写入速度将更高,如果写入性能不够,可以使用多个副本同时写入,同一时间每个副本写入的数据保持均衡。 慎用分布式表批量插入 写分布式表,数据会分发到集群的所有本地表,每个本地表插入的数据量是总插入量的1/N,batch size可能比较小,导致data part过多,merge压力变大,甚至出现异常影响数据插入; 数据的一致性问题:数据先在分布式表写入节点的主机落盘,然后数据被异步地发送到本地表所在主机进行存储,中间没有一致性的校验,如果分布式表写入数据的主机出现异常,会存在数据丢失风险; 对于数据写分布式表和数据写本地表相比,分布式表数据写入性能也会变慢,单批次分布式表写,写入节点的磁盘和网络IO会成为性能瓶颈点。 分布式表转发给各个shard成功与否,插入数据的客户端无法感知,转发失败的数据会不断重试转发,消耗CPU。 大批量数据导入要分时、分节点、扩容 如果数据盘为SATA盘,当大批量数据集中插入时候,会抢占磁盘,使得磁盘长时间处于繁忙状态,影响其他alter类操作的效率。 尽量避免批量导数据的SQL并发执行,会给磁盘和ClickHouse并发能力带来冲击。 Kafka数据入库 不建议建ClickHouse kafka表引擎,进行数据同步到ClickHouse中,当前CK的kafka引擎有会导致kafka引擎数据入库产生性能等诸多问题,通过用户使用经验,需要应用侧自己写kafka的数据消费,攒批写入ClickHouse,提升ClickHouse的入库性能。 使用分区替换或增加的方式写入数据 为避免目标表写入脏数据导致的删改,先将数据写入临时表,再从临时表写入目标表。 操作步骤如下: 创建一张与目标表table_dest结构、分区键、排序键、主键、存储策略、引擎都一致的临时表table_source。 先把数据写到临时表,一次只写入一个分区的数据,检查临时表的数据准确无误。 使用以下SQL查看目标表的分区: SELECT partition AS `partition`,sum(rows) AS `count` FROM system.parts WHERE active AND database=='数据库名' AND table=='表名' GROUP BY partition ORDER BY partition ASC; 如果目标表存在该分区,将分区替换到目标表,到集群的每个节点上执行如下语法: ALTER TABLE table_dest REPLACE PARTITION partition_expr FROM table_source; 如果目标表不存在该分区,将分区增加到目标表,到集群的每个节点上执行如下语法: ALTER TABLE table_dest REPLACE PARTITION tuple() partition_expr FROM table_source;
  • 规则 写本地表,查询分布式表,提升写入和查询性能,保证写入和查询的数据一致性。 只有在去重诉求的场景下,可以使用分布式表插入,通过sharding key将要去重的数据转发到同一个shard,便于后续去重查询。 外部模块保证数据导入的幂等性。 ClickHouse不支持数据写入的事务保证。通过外部导入数据模块控制数据的幂等性,比如某个批次的数据导入异常,则drop对应的分区数据或清理掉导入的数据后,重新导入该分区或批次数据。 大批量少频次的写入。 ClickHouse的每次数据插入,都会生成一到多个part文件,如果data part过多, merge压力会变大,甚至出现各种异常影响数据插入。建议每个批次5k到100k行,写入字段不能太多,太多字段情况下要减少写入行数,以降低对写入节点的内存和CPU压力,每秒不超过1次插入。 多副本并行导入。 有大数据的导入场景,建议将数据提前拆分成多份,在一个shard内的多个副本同时导入,以分摊一个节点导入数据的压力,同时能提升数据入库的性能,缩短入库时间。 常见错误: Too many parts(304). Merges are processing significantly slower than inserts 原因分析:MergeTree的merge的速度跟不上目录生成的速度,数据目录越来越多就会抛出这个异常。
  • ClickHouse系统调优 通过 FusionInsight Manager查看主机上的CPU、内存、I/O和网络资源使用情况,确认这些资源是否已被充分利用,分以下几种情况: 每个节点资源占用都比较均匀 通过观察资源在每个节点都使用比较均匀,说明系统资源使用比较正常,可以先不关注,可以去分析SQL语句是否有进一步优化的余地。 有个别节点资源占用比较高 如果观察到个别节点占用资源较高,需要针对占用资源较高的节点分析,分析当前的SQL语句是什么原因导致部分节点占用比其他节点更多资源,是计算还是数据存储倾斜导致,或者是软件bug导致。 每个节点资源占用都比较高 如果集群所有节点资源占用都比较高,说明集群整体比较忙,需要单独确认需要调优的SQL语句,单独调优。如果SQL也无调优余地,集群资源达到瓶颈,需要通过扩容来提升查询性能,达到调优目标。 父主题: ClickHouse数据库调优
  • 一级索引设计 在建表设计时指定主键字段的建议:按查询时最常使用且过滤性最高的字段作为主键。依次按照访问频度从高到低、维度基数从小到大来排列。数据是按照主键排序存储的,查询的时候,通过主键可以快速筛选数据,合理的主键设计,能够大大减少读取的数据量,提升查询性能。例如所有的分析,都需要指定业务的id,则可以将业务id字段作为主键的第一个字段顺序。 根据业务场景合理设计稀疏索引粒度 ClickHouse的主键索引采用的是稀疏索引存储,稀疏索引的默认采样粒度是8192行,即每8192行取一条记录在索引文件中,实践建议: 索引粒度越小,对于小范围的查询更有效,避免查询资源的浪费; 索引粒度越大,则索引文件越小,索引文件的处理会更快; 超过10亿的表索引粒度可设为16384,其他设为8192或者更小值。
  • 规则 物化视图(Materialized View)显式指定聚合表。 在创建物化视图时,使用TO关键字为物化视图指定数据存储表。 如果不显示指定聚合表,则会创建隐式表.inner.mv1,与物化视图绑定。 用于数据预聚合的物化视图,聚合表使用聚合引擎。 如果不用聚合引擎,则每次数据插入,会对明细表的全量数据重新计算,而不是只处理增量数据。 聚合表中,聚合指标定义成聚合类型(AggregateFunction)。 物化视图的指标列与聚合表中对应字段名称一致,命名规范如下: {aggrateFunction}_{columnName}_state 聚合表创建样例: CREATE TABLE counter_daily_agg ON CLUSTER default_cluster(day DateTime,device UInt32,count UInt64,max_value_state AggregateFunction(max, Float32),min_value_state AggregateFunction(min, Float32),avg_value_state AggregateFunction(avg, Float32))ENGINE = SummingMergeTree()PARTITION BY tuple()ORDER BY (device, day);
  • 二级跳数索引设计 跳数索引使用参考: 使用说明 对于*MergeTree引擎,支持配置跳数索引,即一种数据局部聚合的粗糙索引,对数据块创建索引,选择性的保留一部分原始数据(minmax、set), 或者是保留计算后的中间数据(bloomfilter)。在查询时,选择忽略加载不会包含结果的数据块,从而达到加速查询的效果。 索引定义 INDEX index_name expr TYPE type(...) GRANULARITY granularity_value Expr:属性表达式,基于字段或者字段的表达式来创建索引; type(...):支持的索引类型,minmax、set等; Granularity:创建索引的记录粒度。比如index_granularity = 8192,granularity配置为3,则使用8192*3条记录创建一条索引数据。 创建索引样例 CREATE TABLE skip_index_test ON CLUSTER default_cluster(ID String,URL String,Code String,EventTime Date,INDEX a ID TYPE minmax GRANULARITY 5,INDEX b (length(ID) * 8) TYPE set(100) GRANULARITY 5,INDEX c (ID, Code) TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 5,INDEX d ID TYPE tokenbf_v1(256, 2, 0) GRANULARITY 5,INDEX e ID TYPE bloom_filter(0.025) GRANULARITY 5) ENGINE = MergeTree()ORDER BY ID ; minmax索引 记录了一段数据范围内的最小和最大极值,其索引的作用类似分区目录的minmax索引,能够快速跳过无用的数据区间。 INDEX a ID TYPE minmax GRANULARITY 5 上述示例中minmax索引会记录这段数据区间内ID字段的极值。极值的计算涉及每5个index_granularity区间中的数据。 set索引 直接记录了声明字段或表达式的取值(唯一值,无重复),其完整形式为set(max_rows),其中max_rows是一个阈值,表示在一个index_granularity内,索引最多记录的数据行数。如果max_rows=0,则表示无限制。 INDEX b (length(ID) * 8) TYPE set(100) GRANULARITY 5 上述示例中set索引会记录数据中ID的长度*8后的取值。其中,每个index_granularity内最多记录100条。 布隆过滤器 bloom_filter索引 为指定的列存储布隆过滤器。 可选的参数false_positive用来指定从布隆过滤器收到错误响应的几率。取值范围是 (0,1),默认值:0.025。 支持的数据类型:Int*,UInt*,Float*,Enum,Date,DateTime,String,FixedString,Array,LowCardinality,Nullable。 ngrambf_v1索引 记录的是数据短语的布隆表过滤器,只支持String和FixedString数据类型。只能够提升in、notIn、like、equals和notEquals查询的性能,其完整形式为: ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed) 这些参数是一个布隆过滤器的标准输入,如果接触过布隆过滤器,应该会对此十分熟悉。 具体的含义如下: n:token长度,依据n的长度将数据切割为token短语。 size_of_bloom_filter_in_bytes:布隆过滤器的大小。 number_of_hash_functions:布隆过滤器中使用Hash函数的个数。 random_seed:Hash函数的随机种子。 tokenbf_v1索引 是ngrambf_v1的变种,同样也是一种布隆过滤器索引。tokenbf_v1除了短语token的处理方法外,其他与ngrambf_v1是完全一样的。tokenbf_v1会自动按照非字符的、数字的字符串分割token。 INDEX d ID TYPE tokenbf_v1(256,2,0) GRANULARITY 5 索引创建详见官方文档 https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/#table_engine-mergetree-data_skipping-indexes 建表后再创建索引 ALTER TABLE table_name add INDEX min_max_index (etl_time) TYPE minmax GRANULARITY 3; 删除索引 ALTER TABLE table_name DROP INDEX min_max_index; 单表跳数索引数量 由于索引的创建对数据导入性能有影响,建议单表跳数索引的总数量控制在5个以内。
  • ClickHouse容量规划设计 为了能够更好的发挥ClickHouse分布式查询能力,在集群规划阶段需要合理设计集群数据分布存储。 当前ClickHouse能力为单机磁盘容量达到80%后会上报告警信息,磁盘容量达90%后集群会处于只读状态。 出现磁盘告警信息后需要考虑是否是容量不足问题,如果是容量不足问题需要尽快考虑集群扩容,提升集群整体容量存储。 ClickHouse节点及容量规划如下: 磁盘规划 由于ClickHouseServer业务数据主要存储在本地磁盘上,数据量可能会随着集群使用时间增长而增长,通常建议ClickHouse数据盘单独挂载,元数据盘共享第一个数据盘目录。 磁盘实际容量 由于磁盘存在1MB = 1024KB或者1000KB的不同算法,一般来说,磁盘实际可用容量 = 磁盘标注容量 * 0.9。 例如磁盘标注容量为1.2 TB,实际容量为1200 * 0.9 = 1080 GB。 计算公式 假设历史数据量为H,每日增量为A,单节点磁盘容量为C,数据保留M天,集群副本数为R,则ClickHouseServer物理节点数计算公式如下: ClickHouseServer物理节点数N = [R * (H + A * M)] / C 父主题: ClickHouse集群规划
  • Hive JDBC驱动的加载 客户端程序以JDBC的形式连接HiveServer时,需要首先加载Hive的JDBC驱动类org.apache.hive.jdbc.HiveDriver。 故在客户端程序的开始,必须先使用当前类加载器加载该驱动类。 如果classpath下没有相应的jar包,则客户端程序抛出Class Not Found异常并退出。 如下: Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
  • 关闭数据库连接 客户端程序在执行完HQL之后,注意关闭数据库连接,以免内存泄露,同时这是一个良好的编程习惯。 需要关闭JDK的两个对象statement和connection。 如下: finally { if (null != statement) { statement.close(); } // 关闭JDBC连接 if (null != connection) { connection.close(); } }
  • 使用WebHCat的REST接口以Streaming方式提交MR任务的前置条件 本接口需要依赖hadoop的streaming包,在以Streaming方式提交MR任务给WebHCat前,需要将“hadoop-streaming-2.7.0.jar”包上传到HDFS的指定路径下:“hdfs:///apps/templeton/hadoop-streaming-2.7.0.jar”。首先登录到安装有客户端和Hive服务的节点上,以客户端安装路径为“/opt/client”为例: source /opt/client/bigdata_env 使用kinit登录人机用户或者机机用户。 hdfs dfs -put ${BIGDATA_HOME}/FusionInsight_HD_8.1.0.1/FusionInsight-Hadoop-*/hadoop/share/hadoop/tools/lib/hadoop-streaming-*.jar /apps/templeton/ 其中/apps/templeton/需要根据不同的实例进行修改,默认实例使用/apps/templeton/,Hive1实例使用/apps1/templeton/,以此类推。
  • Spark应用中,需引入Spark的类 对于Java开发语言,正确示例: // 创建SparkContext时所需引入的类。import org.apache.spark.api.java.JavaSparkContext// RDD操作时引入的类。import org.apache.spark.api.java.JavaRDD// 创建SparkConf时引入的类。import org.apache.spark.SparkConf 对于Scala开发语言,正确示例: // 创建SparkContext时所需引入的类。import org.apache.spark.SparkContext// RDD操作时引入的类。import org.apache.spark.SparkContext._// 创建SparkConf时引入的类。import org.apache.spark.SparkConf
  • 应用程序结束之前必须调用SparkContext.stop 利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。 利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。 利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。 以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。 正确示例: //提交spark作业val sc = new SparkContext(conf)//具体的任务...//应用程序结束sc.stop() 错误示例: //提交spark作业val sc = new SparkContext(conf)//具体的任务... 如果不添加SparkContext.stop,YARN界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。 图1 添加SparkContext.stop()和不添加的区别
  • 客户端配置参数需要与服务端保持一致 当集群的Hive、YARN、HDFS服务端配置参数发生变化时,客户端程序对应的参数会被改变,用户需要重新审视在配置参数变更之前提交到HiveServer的配置参数是否和服务端配置参数一致,如果不一致,需要用户在客户端重新调整并提交到HiveServer。例如下面的示例中,如果修改了集群中的YARN配置参数时,Hive客户端、示例程序都需要审视并修改之前已经提交到HiveServer的配置参数: 初始状态: 集群YARN的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 客户端的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 集群YARN修改后,参数配置如下: mapreduce.reduce.java.opts=-Xmx1024M 如果此时客户端程序不做调整修改,则客户端参数仍旧有效,会导致Reducer内存不足而使任务运行失败。
  • HQL语法规则之判空 判断字段是否为“空”,即没有值,使用“is null”;判断不为空,即有值,使用“is not null”。 要注意的是,在HQL中String类型的字段若是空字符串, 即长度为0,那么对它进行IS NULL的判断结果是False。此时应该使用“col = '' ”来判断空字符串;使用“col != '' ”来判断非空字符串。 正确示例: select * from default.tbl_src where id is null;select * from default.tbl_src where id is not null;select * from default.tbl_src where name = '';select * from default.tbl_src where name != ''; 错误示例: select * from default.tbl_src where id = null;select * from default.tbl_src where id != null;select * from default.tbl_src where name is null;select * from default.tbl_src where name is not null; 注:表tbl_src的id字段为Int类型,name字段为String类型。
  • 获取数据库连接 使用JDK的驱动管理类java.sql.DriverManager来获取一个Hive的数据库连接。 Hive的数据库URL为url="jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.hadoop.com@HADOOP.COM;user.principal=hive/hadoop.hadoop.com;user.keytab=conf/hive.keytab"; 以上已经经过安全认证,所以Hive数据库的用户名和密码为null或者空。 如下: // 建立连接connection = DriverManager.getConnection(url, "", "");
  • 执行HQL 执行HQL,注意HQL不能以";"结尾。 正确示例: String sql = "SELECT COUNT(*) FROM employees_info";Connection connection = DriverManager.getConnection(url, "", "");PreparedStatement statement = connection.prepareStatement(sql);resultSet = statement.executeQuery(); 错误示例: String sql = "SELECT COUNT(*) FROM employees_info;";Connection connection = DriverManager.getConnection(url, "", "");PreparedStatement statement = connection.prepareStatement(sql);resultSet = statement.executeQuery();
  • 多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; }
  • 调用Kafka API(AdminZkClient.createTopic)创建Topic 对于Java开发语言,正确示例: import kafka.zk.AdminZkClient;import kafka.zk.KafkaZkClient;import kafka.admin.RackAwareMode;… KafkaZkClient kafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue(), Time.SYSTEM, "", "", null);AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.createTopic(topic, partitions, replicas, new Properties(), RackAwareMode.Enforced$.MODULE$);… 对于Scala开发语言,正确示例: import kafka.zk.AdminZkClient;import kafka.zk.KafkaZkClient;… val kafkaZkClient: KafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue, Time.SYSTEM, "", "")val adminZkClient: AdminZkClient = new AdminZkClient(kafkaZkClient)adminZkClient.createTopic(topic, partitions, replicas)
  • 在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
  • 使用说明 分布式表名称:default.my_table_dis。 本地表名称:default.my_table_local。 通过“AS”关联分布式表和本地表,保证分布式表的字段定义跟本地表一致。 分布式表引擎的参数说明: default_cluster:集群名称。 default:本地表所在库名。 my_table_local:本地表名。 rand():可选参数,分片键(sharding key),可以是表中一列的原始数据(如did),也可以是函数调用的结果。 如轮训方式:rand(),表示在写入数据时直接将数据插入到分布式表,分布式表引擎会按轮训算法将数据发送到各个分片。 该键是写分布式表保证数据均匀分布在各分片的唯一方式。
  • 在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
  • ClickHouse日志详细信息 日志类型 日志文件名 描述 ClickHouse相关日志 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse-server.err.log ClickHouseServer服务运行错误日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/checkService.log ClickHouseServer服务运行关键日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse-server.log /var/log/Bigdata/clickhouse/clickhouseServer/ugsync.log 用户角色同步工具打印日志。 /var/log/Bigdata/clickhouse/clickhouseServer/prestart.log ClickHouse预启动日志。 /var/log/Bigdata/clickhouse/clickhouseServer/start.log ClickHouse启动日志。 /var/log/Bigdata/clickhouse/clickhouseServer/checkServiceHealthCheck.log ClickHouse健康检查日志。 /var/log/Bigdata/clickhouse/clickhouseServer/checkugsync.log 用户角色同步检查日志。 /var/log/Bigdata/clickhouse/clickhouseServer/checkDisk.log ClickHouse磁盘检测日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/backup.log ClickHouse在Manager上执行备份恢复操作的日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/stop.log ClickHouse停止日志。 /var/log/Bigdata/clickhouse/clickhouseServer/postinstall.log ClickHouse的postinstall.sh脚本调用日志。 /var/log/Bigdata/clickhouse/balance/start.log ClickHouseBalancer服务启动日志文件路径。 /var/log/Bigdata/clickhouse/balance/error.log ClickHouseBalancer服务运行错误日志文件路径。 /var/log/Bigdata/clickhouse/balance/access_http.log ClickHouseBalancer服务运行http日志文件路径。 /var/log/Bigdata/clickhouse/balance/access_tcp.log ClickHouseBalancer服务运行tcp日志文件路径。 /var/log/Bigdata/clickhouse/balance/checkService.log ClickHouseBalancer服务检查日志。 /var/log/Bigdata/clickhouse/balance/postinstall.log ClickHouseBalancer的postinstall.sh脚本调用日志。 /var/log/Bigdata/clickhouse/balance/prestart.log ClickHouseBalancer服务预启动日志文件路径。 /var/log/Bigdata/clickhouse/balance/stop.log ClickHouseBalancer服务关闭日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/auth.log ClickHouse服务认证日志。 /var/log/Bigdata/clickhouse/clickhouseServer/cleanService.log 重装实例异常产生的记录日志。 /var/log/Bigdata/clickhouse/clickhouseServer/offline_shard_table_manager.log ClickHouse入服/退服日志。 /var/log/Bigdata/clickhouse/clickhouseServer/traffic_control.log ClickHouse主备容灾流量控制日志。 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse_migrate_metadata.log ClickHouse元数据搬迁日志。 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse_migrate_data.log ClickHouse业务数据搬迁日志。 /var/log/Bigdata/clickhouse/clickhouseServer/changePassword.log ClickHouse修改用户密码日志。 数据迁移日志 /var/log/Bigdata/clickhouse/migration/数据迁移任务名/clickhouse-copier_{timestamp}_{processId}/copier.log 参考使用ClickHouse数据迁移工具,使用迁移工具时产生的运行日志。 /var/log/Bigdata/clickhouse/migration/数据迁移任务名/clickhouse-copier_{timestamp}_{processId}/copier.err.log 参考使用ClickHouse数据迁移工具,使用迁移工具时产生的错误日志。 /var/log/Bigdata/tomcat/clickhouse/auto_balance/数据迁移任务名/balance_manager.log 参考使用ClickHouse数据迁移工具,勾选一键均衡产生的运行日志。 clickhouse-tomcat日志 /var/log/Bigdata/tomcat/clickhouse/web_clickhouse.log ClickHouse自定义UI运行日志。 /var/log/Bigdata/tomcat/audit/clickhouse/clickhouse_web_audit.log clickhouse的数据迁移审计日志。 ClickHouse审计日志 /var/log/Bigdata/audit/clickhouse/clickhouse-server-audit.log ClickHouse的审计日志文件路径。 父主题: ClickHouse数据库运维
  • UDF管理 建议由管理员创建永久UDF,避免每次使用时都去add jar,和重新定义UDF。 Hive的UDF会有一些默认属性,比如“deterministic”默认为“true”(同一个输入会返回同一个结果),“stateful”(是否有状态,默认为“true”)。当用户实现的自定义UDF内部实现了汇总等,需要在类上加上相应的注解,例如如下类: @UDFType(deterministic = false)Public class MyGenericUDAFEvaluator implements Closeable {
  • 配置多个ClickHouseBalancer实例IP 配置多个ClickHouseBalancer实例IP可以避免ClickHouseBalancer实例单点故障。相关配置(with属性)如下: 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default',
  • Doris UDF开发规则 UDF中方法调用必须是线程安全的。 UDF实现中禁止读取外部大文件到内存中,如果文件过大可能会导致内存耗尽。 需避免大量递归调用,否则容易造成栈溢出或oom。 需避免不断创建对象或数组,否则容易造成内存耗尽。 Java UDF应该捕获和处理可能发生的异常,不能将异常给服务处理,以避免程序出现未知异常。可以使用try-catch块来处理异常,并在必要时记录异常信息。 UDF中应避免定义静态集合类用于临时数据的存储,或查询外部数据存在较大对象,否则会导致内存占用过高。 应该避免类中import的包和服务侧包冲突,可通过grep -lr "完全限定类名"命令来检查冲突的Jar包。如果发生类名冲突,可通过完全限定类名方式来避免。
  • Doris UDF开发建议 不要执行大量数据的复制操作,防止堆栈内存溢出。 应避免使用大量字符串拼接操作,否则会导致内存占用过高。 Java UDF应该使用有意义的名称,以便其他开发人员能够轻松理解其用途。建议使用驼峰式命名法,并以UDF结尾,例如:MyFunctionUDF。 Java UDF应该指定返回值的数据类型,并且必须具有返回值,返回值默认或异常时不要设置为NULL。建议使用基本数据类型或Java类作为返回值类型。
  • Sink表配置合适的攒批参数 攒批写参数: Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。 相关配置如下: sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。 sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。 两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。 示例1:60秒sink一次 'sink.buffer-flush.max-rows' = '0','sink.buffer-flush.interval' = '60s' 示例2:100条sink一次 'sink.buffer-flush.max-rows' = '100','sink.buffer-flush.interval' = '0s' 示例3:数据不sink 'sink.buffer-flush.max-rows' = '0','sink.buffer-flush.interval' = '0s'
  • Doris数据查询规则 在数据查询业务代码中建议查询失败时进行重试,再次下发查询。 in中常量枚举值超过1000后,必须修改为子查询。 禁止使用REST API(Statement Execution Action)执行大量SQL查询,该接口仅用于集群维护。 query查询条件返回结果超过5万条,则使用JDBC Catalog或者OUTFILE方式导出查询数据,否则FE上大量数据传输将占用FE资源,影响集群稳定性。 如果是交互式查询,建议使用分页方式(offset limit)导出数据,分页命令为Order by。 如果数据导出提供给第三方使用,建议使用outfile或者export方式 2个以上大于3亿的表JOIN使用Colocation Join。 亿级别大表禁止使用select *查询数据,查询时需明确要查询的字段。 使用SQL Block方式禁止select *操作。 如果是高并发点查询,建议开启行存储(Doris 2.x版本支持),并且使用PreparedStatement查询。 亿级以上表数据查询必须设置分区分桶条件。 禁止对分区表执行全分区数据扫描操作。
  • Doris数据查询建议 一次insert into select数据超过1亿条后,建议拆分为多个insert into select语句执行,分成多个批次来执行。 不要使用OR作为JOIN条件。 不建议频繁的数据delete修改,将要删除的数据攒批,偶尔进行批量删除,且需要带上条件,提升系统稳定性和删除效率。 大量数据排序(5亿以上)后返回部分数据,建议先减少数据范围再执行排序,否则大量排序会影响性能。例如: 将from table order by datatime desc limit 10优化为from table where datatime='2023-10-20' order by datatime desc limit 10。 查询任务性能调优参数parallel_fragment_exec_instance_num使用注意事项: 此参数是session级别设置,表示可并发执行的fragment数量,对CPU消耗较大,因此一般情况下不需要设置此参数。如果需要设置此参数来加速查询性能,必须遵循以下规则: 切勿设置该参数为全局生效,禁止使用set global方式进行设置。 设置参数值建议为偶数2或4(最大值不要超过单节点CPU核数的一半)。 设置此参数值时需要观察CPU使用率,CPU使用率小于50%时方可考虑设置。 如果查询SQL是insert into select大数据量的方式,不建议设置此参数。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全