华为云用户手册

  • 多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; }
  • Hive JDBC驱动的加载 客户端程序以JDBC的形式连接HiveServer时,需要首先加载Hive的JDBC驱动类org.apache.hive.jdbc.HiveDriver。 故在客户端程序的开始,必须先使用当前类加载器加载该驱动类。 如果classpath下没有相应的jar包,则客户端程序抛出Class Not Found异常并退出。 如下: Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
  • 关闭数据库连接 客户端程序在执行完HQL之后,注意关闭数据库连接,以免内存泄露,同时这是一个良好的编程习惯。 需要关闭JDK的两个对象statement和connection。 如下: finally { if (null != statement) { statement.close(); } // 关闭JDBC连接 if (null != connection) { connection.close(); } }
  • 使用WebHCat的REST接口以Streaming方式提交MR任务的前置条件 本接口需要依赖hadoop的streaming包,在以Streaming方式提交MR任务给WebHCat前,需要将“hadoop-streaming-2.7.0.jar”包上传到HDFS的指定路径下:“hdfs:///apps/templeton/hadoop-streaming-2.7.0.jar”。首先登录到安装有客户端和Hive服务的节点上,以客户端安装路径为“/opt/client”为例: source /opt/client/bigdata_env 使用kinit登录人机用户或者机机用户。 hdfs dfs -put ${BIGDATA_HOME}/ FusionInsight _HD_8.1.0.1/FusionInsight-Hadoop-*/hadoop/share/hadoop/tools/lib/hadoop-streaming-*.jar /apps/templeton/ 其中/apps/templeton/需要根据不同的实例进行修改,默认实例使用/apps/templeton/,Hive1实例使用/apps1/templeton/,以此类推。
  • HQL语法规则之判空 判断字段是否为“空”,即没有值,使用“is null”;判断不为空,即有值,使用“is not null”。 要注意的是,在HQL中String类型的字段若是空字符串, 即长度为0,那么对它进行IS NULL的判断结果是False。此时应该使用“col = '' ”来判断空字符串;使用“col != '' ”来判断非空字符串。 正确示例: select * from default.tbl_src where id is null;select * from default.tbl_src where id is not null;select * from default.tbl_src where name = '';select * from default.tbl_src where name != ''; 错误示例: select * from default.tbl_src where id = null;select * from default.tbl_src where id != null;select * from default.tbl_src where name is null;select * from default.tbl_src where name is not null; 注:表tbl_src的id字段为Int类型,name字段为String类型。
  • Spark应用中,需引入Spark的类 对于Java开发语言,正确示例: // 创建SparkContext时所需引入的类。import org.apache.spark.api.java.JavaSparkContext// RDD操作时引入的类。import org.apache.spark.api.java.JavaRDD// 创建SparkConf时引入的类。import org.apache.spark.SparkConf 对于Scala开发语言,正确示例: // 创建SparkContext时所需引入的类。import org.apache.spark.SparkContext// RDD操作时引入的类。import org.apache.spark.SparkContext._// 创建SparkConf时引入的类。import org.apache.spark.SparkConf
  • 应用程序结束之前必须调用SparkContext.stop 利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。 利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。 利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。 以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。 正确示例: //提交spark作业val sc = new SparkContext(conf)//具体的任务...//应用程序结束sc.stop() 错误示例: //提交spark作业val sc = new SparkContext(conf)//具体的任务... 如果不添加SparkContext.stop,YARN界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。 图1 添加SparkContext.stop()和不添加的区别
  • 客户端配置参数需要与服务端保持一致 当集群的Hive、YARN、HDFS服务端配置参数发生变化时,客户端程序对应的参数会被改变,用户需要重新审视在配置参数变更之前提交到HiveServer的配置参数是否和服务端配置参数一致,如果不一致,需要用户在客户端重新调整并提交到HiveServer。例如下面的示例中,如果修改了集群中的YARN配置参数时,Hive客户端、示例程序都需要审视并修改之前已经提交到HiveServer的配置参数: 初始状态: 集群YARN的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 客户端的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 集群YARN修改后,参数配置如下: mapreduce.reduce.java.opts=-Xmx1024M 如果此时客户端程序不做调整修改,则客户端参数仍旧有效,会导致Reducer内存不足而使任务运行失败。
  • 在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
  • 调用Kafka API(AdminZkClient.createTopic)创建Topic 对于Java开发语言,正确示例: import kafka.zk.AdminZkClient;import kafka.zk.KafkaZkClient;import kafka.admin.RackAwareMode;… KafkaZkClient kafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue(), Time.SYSTEM, "", "", null);AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.createTopic(topic, partitions, replicas, new Properties(), RackAwareMode.Enforced$.MODULE$);… 对于Scala开发语言,正确示例: import kafka.zk.AdminZkClient;import kafka.zk.KafkaZkClient;… val kafkaZkClient: KafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue, Time.SYSTEM, "", "")val adminZkClient: AdminZkClient = new AdminZkClient(kafkaZkClient)adminZkClient.createTopic(topic, partitions, replicas)
  • 在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
  • 使用说明 分布式表名称:default.my_table_dis。 本地表名称:default.my_table_local。 通过“AS”关联分布式表和本地表,保证分布式表的字段定义跟本地表一致。 分布式表引擎的参数说明: default_cluster:集群名称。 default:本地表所在库名。 my_table_local:本地表名。 rand():可选参数,分片键(sharding key),可以是表中一列的原始数据(如did),也可以是函数调用的结果。 如轮训方式:rand(),表示在写入数据时直接将数据插入到分布式表,分布式表引擎会按轮训算法将数据发送到各个分片。 该键是写分布式表保证数据均匀分布在各分片的唯一方式。
  • ClickHouse日志详细信息 日志类型 日志文件名 描述 ClickHouse相关日志 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse-server.err.log ClickHouseServer服务运行错误日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/checkService.log ClickHouseServer服务运行关键日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse-server.log /var/log/Bigdata/clickhouse/clickhouseServer/ugsync.log 用户角色同步工具打印日志。 /var/log/Bigdata/clickhouse/clickhouseServer/prestart.log ClickHouse预启动日志。 /var/log/Bigdata/clickhouse/clickhouseServer/start.log ClickHouse启动日志。 /var/log/Bigdata/clickhouse/clickhouseServer/checkServiceHealthCheck.log ClickHouse健康检查日志。 /var/log/Bigdata/clickhouse/clickhouseServer/checkugsync.log 用户角色同步检查日志。 /var/log/Bigdata/clickhouse/clickhouseServer/checkDisk.log ClickHouse磁盘检测日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/backup.log ClickHouse在Manager上执行备份恢复操作的日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/stop.log ClickHouse停止日志。 /var/log/Bigdata/clickhouse/clickhouseServer/postinstall.log ClickHouse的postinstall.sh脚本调用日志。 /var/log/Bigdata/clickhouse/balance/start.log ClickHouseBalancer服务启动日志文件路径。 /var/log/Bigdata/clickhouse/balance/error.log ClickHouseBalancer服务运行错误日志文件路径。 /var/log/Bigdata/clickhouse/balance/access_http.log ClickHouseBalancer服务运行http日志文件路径。 /var/log/Bigdata/clickhouse/balance/access_tcp.log ClickHouseBalancer服务运行tcp日志文件路径。 /var/log/Bigdata/clickhouse/balance/checkService.log ClickHouseBalancer服务检查日志。 /var/log/Bigdata/clickhouse/balance/postinstall.log ClickHouseBalancer的postinstall.sh脚本调用日志。 /var/log/Bigdata/clickhouse/balance/prestart.log ClickHouseBalancer服务预启动日志文件路径。 /var/log/Bigdata/clickhouse/balance/stop.log ClickHouseBalancer服务关闭日志文件路径。 /var/log/Bigdata/clickhouse/clickhouseServer/auth.log ClickHouse服务认证日志。 /var/log/Bigdata/clickhouse/clickhouseServer/cleanService.log 重装实例异常产生的记录日志。 /var/log/Bigdata/clickhouse/clickhouseServer/offline_shard_table_manager.log ClickHouse入服/退服日志。 /var/log/Bigdata/clickhouse/clickhouseServer/traffic_control.log ClickHouse主备容灾流量控制日志。 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse_migrate_metadata.log ClickHouse元数据搬迁日志。 /var/log/Bigdata/clickhouse/clickhouseServer/clickhouse_migrate_data.log ClickHouse业务数据搬迁日志。 /var/log/Bigdata/clickhouse/clickhouseServer/changePassword.log ClickHouse修改用户密码日志。 数据迁移日志 /var/log/Bigdata/clickhouse/migration/数据迁移任务名/clickhouse-copier_{timestamp}_{processId}/copier.log 参考使用ClickHouse数据迁移工具,使用迁移工具时产生的运行日志。 /var/log/Bigdata/clickhouse/migration/数据迁移任务名/clickhouse-copier_{timestamp}_{processId}/copier.err.log 参考使用ClickHouse数据迁移工具,使用迁移工具时产生的错误日志。 /var/log/Bigdata/tomcat/clickhouse/auto_balance/数据迁移任务名/balance_manager.log 参考使用ClickHouse数据迁移工具,勾选一键均衡产生的运行日志。 clickhouse-tomcat日志 /var/log/Bigdata/tomcat/clickhouse/web_clickhouse.log ClickHouse自定义UI运行日志。 /var/log/Bigdata/tomcat/audit/clickhouse/clickhouse_web_audit.log clickhouse的数据迁移审计日志。 ClickHouse审计日志 /var/log/Bigdata/audit/clickhouse/clickhouse-server-audit.log ClickHouse的审计日志文件路径。 父主题: ClickHouse数据库运维
  • UDF管理 建议由管理员创建永久UDF,避免每次使用时都去add jar,和重新定义UDF。 Hive的UDF会有一些默认属性,比如“deterministic”默认为“true”(同一个输入会返回同一个结果),“stateful”(是否有状态,默认为“true”)。当用户实现的自定义UDF内部实现了汇总等,需要在类上加上相应的注解,例如如下类: @UDFType(deterministic = false)Public class MyGenericUDAFEvaluator implements Closeable {
  • Doris数据查询规则 在数据查询业务代码中建议查询失败时进行重试,再次下发查询。 in中常量枚举值超过1000后,必须修改为子查询。 禁止使用REST API(Statement Execution Action)执行大量SQL查询,该接口仅用于集群维护。 query查询条件返回结果超过5万条,则使用JDBC Catalog或者OUTFILE方式导出查询数据,否则FE上大量数据传输将占用FE资源,影响集群稳定性。 如果是交互式查询,建议使用分页方式(offset limit)导出数据,分页命令为Order by。 如果数据导出提供给第三方使用,建议使用outfile或者export方式 2个以上大于3亿的表JOIN使用Colocation Join。 亿级别大表禁止使用select *查询数据,查询时需明确要查询的字段。 使用SQL Block方式禁止select *操作。 如果是高并发点查询,建议开启行存储(Doris 2.x版本支持),并且使用PreparedStatement查询。 亿级以上表数据查询必须设置分区分桶条件。 禁止对分区表执行全分区数据扫描操作。
  • Doris数据查询建议 一次insert into select数据超过1亿条后,建议拆分为多个insert into select语句执行,分成多个批次来执行。 不要使用OR作为JOIN条件。 不建议频繁的数据delete修改,将要删除的数据攒批,偶尔进行批量删除,且需要带上条件,提升系统稳定性和删除效率。 大量数据排序(5亿以上)后返回部分数据,建议先减少数据范围再执行排序,否则大量排序会影响性能。例如: 将from table order by datatime desc limit 10优化为from table where datatime='2023-10-20' order by datatime desc limit 10。 查询任务性能调优参数parallel_fragment_exec_instance_num使用注意事项: 此参数是session级别设置,表示可并发执行的fragment数量,对CPU消耗较大,因此一般情况下不需要设置此参数。如果需要设置此参数来加速查询性能,必须遵循以下规则: 切勿设置该参数为全局生效,禁止使用set global方式进行设置。 设置参数值建议为偶数2或4(最大值不要超过单节点CPU核数的一半)。 设置此参数值时需要观察CPU使用率,CPU使用率小于50%时方可考虑设置。 如果查询SQL是insert into select大数据量的方式,不建议设置此参数。
  • HQL编写之隐式类型转换 查询语句使用字段的值做过滤时,不建议通过Hive自身的隐式类型转换来编写HQL。因为隐式类型转换不利于代码的阅读和移植。 建议示例: select * from default.tbl_src where id = 10001;select * from default.tbl_src where name = 'TestName'; 不建议示例: select * from default.tbl_src where id = '10001';select * from default.tbl_src where name = TestName; 表tbl_src的id字段为Int类型,name字段为String类型。
  • 配置多个ClickHouseBalancer实例IP 配置多个ClickHouseBalancer实例IP可以避免ClickHouseBalancer实例单点故障。相关配置(with属性)如下: 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default',
  • Doris UDF开发规则 UDF中方法调用必须是线程安全的。 UDF实现中禁止读取外部大文件到内存中,如果文件过大可能会导致内存耗尽。 需避免大量递归调用,否则容易造成栈溢出或oom。 需避免不断创建对象或数组,否则容易造成内存耗尽。 Java UDF应该捕获和处理可能发生的异常,不能将异常给服务处理,以避免程序出现未知异常。可以使用try-catch块来处理异常,并在必要时记录异常信息。 UDF中应避免定义静态集合类用于临时数据的存储,或查询外部数据存在较大对象,否则会导致内存占用过高。 应该避免类中import的包和服务侧包冲突,可通过grep -lr "完全限定类名"命令来检查冲突的Jar包。如果发生类名冲突,可通过完全限定类名方式来避免。
  • Doris UDF开发建议 不要执行大量数据的复制操作,防止堆栈内存溢出。 应避免使用大量字符串拼接操作,否则会导致内存占用过高。 Java UDF应该使用有意义的名称,以便其他开发人员能够轻松理解其用途。建议使用驼峰式命名法,并以UDF结尾,例如:MyFunctionUDF。 Java UDF应该指定返回值的数据类型,并且必须具有返回值,返回值默认或异常时不要设置为NULL。建议使用基本数据类型或Java类作为返回值类型。
  • Sink表配置合适的攒批参数 攒批写参数: Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。 相关配置如下: sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。 sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。 两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。 示例1:60秒sink一次 'sink.buffer-flush.max-rows' = '0','sink.buffer-flush.interval' = '60s' 示例2:100条sink一次 'sink.buffer-flush.max-rows' = '100','sink.buffer-flush.interval' = '0s' 示例3:数据不sink 'sink.buffer-flush.max-rows' = '0','sink.buffer-flush.interval' = '0s'
  • 建议 不建议表中存储过多的Nullable列,可以考虑字符串使用“NA”,数值型用0作为缺省值。过多使用Nullable将消耗更多内存。 建议规划好业务所需的列,必要时可提前预置一些属性列,避免频繁的增删列。 数值类型:UInt8/UInt16/UInt32/UInt64、Int8/Int16/Int32/Int64, Float32/Float64等,选择不同长度,性能差别较大。 建议根据业务场景所需选择最小满足的类型使用。 示例 CREATE TABLE counter ON CLUSTER default_cluster(`when` DateTime DEFAULT now(),`device` UInt32,`value` Float32,`value64` Float64)ENGINE = MergeTreePARTITION BY toYYYYMM(when)ORDER BY (device, when) 表中有Float32类型的字段value和Float64的字段value64插入数据的查询表现如下: INSERT INTO counterSELECTtoDateTime('2019-01-01 00:00:00') + toInt64(number / 10) AS when,(number % 10) + 1 AS device,(device * 3) + (number / 10000) AS value,valueFROM system.numbersLIMIT 100000000; 往value和value64插入相同的数据,总数据量1亿条。 查询Float32字段 耗时:0.750秒。 查询Float64字段 耗时:0.929秒。 结果:Float32类型的查询时间比Float64更快。 低基数维度(基数1万内),建议使用LowCardinality修饰符,提升查询性能。 维度的基数(Cardinality):指的是该维度在数据集中出现的不同值的个数。例如“国家”是一个维度,如果有200个不同的值,那么此维度的基数就是200。 根据官方建议和实践经验,在维度基数小于1万的时候,对维度字段做LowCardinality编码,导入性能会有略微下降,查询性能提升明显,数据存储空间下降明显。 在默认的情况下,声明了LowCardinality的字段会基于数据生成一个全局字典,并利用倒排索引建立Key和位置的对应关系。如果数据的基数大于8192,也就是说不同的值多于8192个,则会将一个全局字典拆分成多个局部字典(low_cardinality_max_dictionary_size参数控制,默认8192)。 示例 CREATE TABLE test_codecs ON CLUSTER default_cluster(`a` String,`a_low_card` LowCardinality(String) DEFAULT a)ENGINE = MergeTreePARTITION BY tuple()ORDER BY tuple(); 其中,字段a是原生字符串,字段a_low_card基于a做了低基维编码。 数据存储的对比 查询性能对比 查询性能有5倍的提升。
  • 规则 Hudi表必须执行Clean。 对于Hudi的MOR、COW表,都需要开启Clean。 Hudi表在写入数据时会自动判断是否需要执行Clean,因为Clean的开关默认打开(hoodie.clean.automatic默认为true)。 Clean操作并不是每次写数据时都会触发,至少需要满足两个条件: Hudi表中需要有旧版本的文件。对于COW表来说,只要保证数据被更新过就一定存在旧版本的文件。对于MOR表来说,要保证数据被更新过并且做过Compaction才能有旧版本的文件。 Hudi表满足hoodie.cleaner.commits.retained设置的阈值。如果是Flink写hudi,则至少提交的checkpoint要超过这个阈值;如果是批写Hudi,则批写次数要超过这个阈值。
  • 建议 MOR表下游采用批量读模式,采用clean的版本数为compaction版本数+1。 MOR表一定要保证Compaction Plan能够被成功执行,Compaction Plan只是记录了Hudi表中哪些Log文件要和哪些Parquet文件合并,所以最重要的地方在于保证Compaction Plan在被执行的时候它需要合并的文件都存在。而Hudi表中只有Clean操作可以清理文件,所以建议Clean的触发阈值(hoodie.cleaner.commits.retained的值)至少要大于Compaction的触发阈值(对于Flink任务来说就是compaction.delta_commits的值)。 MOR表下游采用流式计算,历史版本保留小时级。 如果MOR表的下游是流式计算,例如Flink流读,可以按照业务需要保留小时级的历史版本,这样的话近几个小时之内的增量数据可以通过log文件读出,如果保留时长过短,下游flink作业在重启或者异常中断阻塞的情况下,上游增量数据已经Clean掉了,flink需要从parquet文件读增量数据,性能会有下降;如果保留时间过长,会导致log里面的历史数据冗余存储。 具体可以按照下面的计算公式来保留2个小时的历史版本数据: 版本数设置为3600*2/版本interval时间,版本interval时间来自于flink作业的checkpoint周期,或者上游批量写入的周期。 COW表如果业务没有历史版本数据保留的特殊要求,保留版本数设置为1。 COW表的每个版本都是表的全量数据,保留几个版本就会冗余多少个版本。因此如果业务无历史数据回溯的需求,保留版本数设置为1,也就是保留当前最新版本 clean作业每天至少执行一次,可以2~4小时执行一次。 Hudi的MOR表和COW表都需要保证每天至少1次Clean,MOR表的Clean可以参考2.2.1.6小节和Compaction放在一起异步去执行。COW的Clean可以在写数据时自动判断是否执行。
  • 数据查询建议 建议查询指定分区 通过指定分区字段会减少底层数据库扫描的文件数量,提升查询性能,实际经验:700个分区的千列大表,需要查询一个分区中有7000万数据,其他699个分区中无数据,虽然只有一个分区有数据,其他分区无数据,但是查询指定分区为百毫秒级性能,没有指定分区查询性能为1~2秒左右,性能相差20倍。 慎用final查询 在查询语句的最后跟上final,通常是对于ReplacingMergeTree引擎,数据不能完全去重情况下,有些开发人员习惯写final关键字进行实时合并去重操作(merge-on-read),保证查询数据无重复数据。可以通过argMax函数或其他方式规避此问题。
  • 数据修改 建议慎用delete、update的mutation操作 标准SQL的更新、删除操作是同步的,即客户端要等服务端返回执行结果(通常是int值);而ClickHouse的update、delete是通过异步方式实现的,当执行update语句时,服务端立即返回执行成功还是失败结果,但是实际上此时数据还没有修改完成,而是在后台排队等着进行真正的修改,可能会出现操作覆盖的情况,也无法保证操作的原子性。 业务场景要求有update、delete等操作,建议使用ReplacingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree引擎,使用方式参见:https://clickhouse.tech/docs/zh/engines/table-engines/mergetree-family/collapsingmergetree/。 建议少或不增删数据列 业务提前规划列个数,如果将来有更多列要使用,可以规划预留多列,避免在生产系统跑业务过程中进行大量的alter table modify列操作,导致不可以预知的性能、数据一致性问题。 对于批量数据清理,建议根据分区来操作: ALTER TABLE table_name DROP PARTITION partition_name; 禁止修改索引列 对索引列的修改会导致现有索引失效,触发重建索引,期间查询数据不准确。 如果业务场景必须修改索引列,推荐用ReplacingMergeTree引擎建表,使用数据写入+去重引擎代替数据更新场景:https://clickhouse.tech/docs/zh/engines/table-engines/mergetree-family/collapsingmergetree/。
  • ClickHouse调优思路 ClickHouse的总体性能调优思路为性能瓶颈点分析、关键参数调整以及SQL调优。在调优过程中,需要综合系统资源、吞吐量、集群负载等各种因素来分析,定位性能问题,设定调优目标,调优达到客户所需目标即可。 ClickHouse调优人员需要系统软件架构、软硬件配置、数据库架构原理及配置参数、并发控制、查询处理和数据库应用有广泛而深刻的理解和认识,才能在调优过程中找到关键瓶颈点,解决性能问题。 图1 调优流程 表1 调优流程说明 流程 描述 系统调优 对OS操作系统级参数和数据库的调优,充分地利用主机的CPU、内存、I/O和网络资源,提升整个系统查询的吞吐量,同时数据库参数也调整到最优状态。 SQL调优 审视业务所用SQL语句是否存在可优化空间,包括: 分析数据分布是否有倾斜,对于大表数据是否平均分布在各个shard。 分析建表语句,查看是否有建立分区、一级索引、二级索引、排序键是否指定等。 分析查询SQL是否使用了分区和索引,检查查询过滤条件比较频繁的列是否安排在建表时指定的索引及排序键的靠前位置。 数据库参数调优 通过调优数据参数,提升数据库性能,保障数据库稳定运行。 更多信息可参考ClickHouse社区文档相关调优内容https://clickhouse.com/docs/en/intro。 父主题: ClickHouse数据库调优
  • 日志路径 ClickHouse相关日志的默认存储路径为:“${BIGDATA_ LOG _HOME}/clickhouse”。 ClickHouseServer运行相关日志:“/var/log/Bigdata/clickhouse/clickhouseServer/ *.log”。 ClickHouseBalancer运行日志:“/var/log/Bigdata/clickhouse/balance/*.log”。 ClickHouseServer审计日志:“/var/log/Bigdata/audit/clickhouse/clickhouse-server-audit.log”。 ClickHouse数据迁移日志:“/var/log/Bigdata/clickhouse/migration/${task_name}/clickhouse-copier_{timestamp}_{processId}/copier.log”。
  • 避免写入单条记录超大的数据 单条记录超大的数据在影响处理效率的同时还可能写入失败,此时需要在初始化Kafka生产者实例时根据情况调整“max.request.size ”值,在初始化消费者实例时调整“max.partition.fetch.bytes”值。 例如,参考本例,可以将max.request.size 、max.partition.fetch.bytes配置项设置为“5252880”: // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); // 服务名 props.put(saslKerberosServiceName, "kafka"); props.put("max.request.size", "5252880"); // 安全协议类型 props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); // 服务名 props.put(saslKerberosServiceName, "kafka"); props.put("max.partition.fetch.bytes","5252880");
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全