华为云用户手册

  • 配置场景 在实际应用中,需要存储大大小小的数据,比如图像数据、文档。小于10MB的数据一般都可以存储在HBase上,对于小于100KB的数据,HBase的读写性能是更优的。如果存放在HBase的数据大于100KB甚至到10MB大小时,插入同样个数的数据文件,但是总的数据量会很大,会导致频繁的compaction和split,占用很多CPU,磁盘IO频率很高,性能严重下降。 通过将MOB(Medium-sized Objects)数据(即100KB到10MB大小的数据)直接以HFile的格式存储在文件系统上(例如HDFS文件系统),通过expiredMobFileCleaner和Sweeper工具集中管理这些文件,然后把这些文件的地址信息及大小信息作为value存储在普通HBase的store上。这样就可以大大降低HBase的compaction和split频率,提升性能。 HBase当前默认开启MOB功能,相关配置项如表1所示。如果需要使用MOB功能,用户需要在创建表或者修改表属性时在指定的列族上指定使用mob方式存储数据。 本章节适用于 MRS 3.x及之后版本。
  • 操作场景 您可以按照自定义的方式,通过命令批量导入数据到HBase中。 您可以在“configuration.xml”文件中定义多个方式来批量导入数据。导入数据时可不创建索引。 列的名称不能包含特殊字符,只能由字母、数字和下划线组成。 大任务下MR任务运行失败,请参考MapReduce任务运行失败,ApplicationMaster出现物理内存溢出异常进行处理。 BulkLoad支持的数据源格式为带分隔符的文本文件。 已安装客户端。例如安装目录为“/opt/hadoopclient”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
  • 操作步骤 直接执行如下命令统计满足如下条件的行数。rowkey在从“row_start”到“row_stop”的范围,字段“f3:age”的值为“25”,rowkey的前两个字符为“mi”的行数。 hbase com.huawei.hadoop.hbase.tools.bulkload.RowCounter -Dcounter.rowkey.start="row_start" -Dcounter.rowkey.stop="row_stop" -Dcounter.qualifier="f3:age:25" -Dcounter.rowkey.value="substring(0,2) == 'mi'" table1 -Dcounter.rowkey.start="row_start":表示开始的rowkey为"row_start"。 -Dcounter.rowkey.stop="row_stop":表示结束的rowkey为"row_stop"。 -Dcounter.qualifier="f3:age:25":表示列族f3中列为age的列值为25。 -Dcounter.rowkey.value="substring(0,2) == 'mi'":表示rowkey的值中前两个为mi。 如果指定了“row_start”和“row_stop”,则统计的为大于等于“row_start”并且小于“row_stop”的数据。
  • 配置方法 用户编写自定义rowkey的实现类,需要继承接口,该接口所在的Jar包的路径为“客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar”: [com.huawei.hadoop.hbase.tools.bulkload.RowkeyHandlerInterface] 实现接口中方法: byte[] getRowkeyBytes(String[] colsValues, RegulationDomain regulation) 其中: 传入参数“colsValues”为原始数据中的一行数据集合,每个元素为一列。 传入参数“regulation”为配置导入文件信息(一般情况下并不需要使用)。 将该实现类与其依赖包同时打包成Jar文件,保存到HBase客户端所在节点的任意位置并确保执行命令的用户具有读取和执行该Jar包的权限。 在执行导入命令时,增加两个参数配置项: -Dimport.rowkey.jar="第二步中Jar包的全路径" -Dimport.rowkey.class="用户实现类的全类名"
  • 配置方法 用户编写自定义rowkey的实现类,需要继承接口,该接口所在的Jar包路径为“客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar”: [com.huawei.hadoop.hbase.tools.bulkload.RowkeyHandlerInterface], 实现接口中方法: byte[] getRowkeyBytes(String[] colsValues, RegulationDomain regulation) 其中: 传入参数“colsValues”为原始数据中的一行数据集合,每个元素为一列。 传入参数“regulation”为配置导入文件信息(一般情况下并不需要使用)。 将该实现类与其依赖包同时打包成Jar文件,保存到HBase客户端所在节点的任意位置并确保执行命令的用户具有读取和执行该Jar包的权限。 在执行导入命令时,增加两个参数配置项: -Dimport.rowkey.jar="第二步中Jar包的全路径" -Dimport.rowkey.class="用户实现类的全类名"
  • 操作步骤 以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。 “taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。 “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。 “taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。 “taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。 “taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
  • 回答 在Spark配置中,“spark.yarn.executor.memoryOverhead”参数的值应大于CarbonData配置参数“sort.inmemory.size.inmb” 与“Netty offheapmemory required”参数值的总和,或者“carbon.unsafe.working.memory.in.mb” 、 “carbon.sort.inememory.storage.size.in.mb” 与 “Netty offheapmemory required”参数值的总和。否则,如果堆外(off heap)访问超出配置的executor内存,则YARN可能会停止executor。 “Netty offheapmemory required”说明:当“spark.shuffle.io.preferDirectBufs”设为true时,Spark中netty 传输服务从"spark.yarn.executor.memoryOverhead"中拿掉部分堆内存[~ 384 MB or 0.1 x 执行器内存]。 详细信息请参考常见配置executor堆外内存大小。
  • 操作步骤 根据前提条件,创建一个满足要求的弹性云服务器。 登录集群详情页面,选择“组件管理”。 若集群详情页面没有“组件管理”页签,请先完成 IAM 用户同步(在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“同步”进行IAM用户同步)。 单击“下载客户端”。 在“客户端类型”选择“完整客户端”。 在“下载路径”选择“远端主机”。 将“主机IP”设置为E CS 的IP地址,设置“主机端口”为“22”,并将“保存路径”设置为“/tmp”。 如果使用SSH登录ECS的默认端口“22”被修改,请将“主机端口”设置为新端口。 “保存路径”最多可以包含256个字符。 “登录用户”设置为“root”。 如果使用其他用户,请确保该用户对保存目录拥有读取、写入和执行权限。 在“登录方式”选择“密码”或“SSH私钥”。 密码:输入创建集群时设置的root用户密码。 SSH私钥:选择并上传创建集群时使用的密钥文件。 单击“确定”开始生成客户端文件。 若界面显示以下提示信息表示客户端包已经成功保存。 下载客户端文件到远端主机成功。 若界面显示以下提示信息,请检查用户名密码及远端主机的安全组配置,确保用户名密码正确,及远端主机的安全组已增加SSH(22)端口的入方向规则。然后从3执行重新下载客户端。 连接到服务器失败,请检查网络连接或参数设置。 图1 下载客户端 选择“Flume”服务,单击“实例”,查看任意一个Flume实例和两个MonitorServer实例的“业务IP”。 使用VNC方式,登录弹性云服务器。参见远程登录(VNC方式))。 所有镜像均支持Cloud-init特性。Cloud-init预配置的用户名“root”,密码为创建集群时设置的密码。首次登录建议修改。 在弹性云服务器,切换到root用户,并将安装包复制到目录“/opt”。 sudo su - root cp /tmp/MRS_Flume_Client.tar /opt 在“/opt”目录执行以下命令,解压压缩包获取校验文件与客户端配置包。 tar -xvf MRS_Flume_Client.tar 执行以下命令,校验文件包。 sha256sum -c MRS_Flume_ClientConfig.tar.sha256 界面显示如下信息,表明文件包校验成功: MRS_Flume_ClientConfig.tar: OK 执行以下命令,解压“MRS_Flume_ClientConfig.tar”。 tar -xvf MRS_Flume_ClientConfig.tar 执行以下命令,安装客户端运行环境到新的目录,例如“/opt/Flumeenv”。安装时自动生成目录。 sh /opt/MRS_Flume_ClientConfig/install.sh /opt/Flumeenv 查看安装输出信息,如有以下结果表示客户端运行环境安装成功: Components client installation is complete. 执行以下命令,配置环境变量。 source /opt/Flumeenv/bigdata_env 执行以下命令,解压Flume客户端。 cd /opt/MRS_Flume_ClientConfig/Flume tar -xvf FusionInsight -Flume-1.6.0.tar.gz 执行以下命令,查看当前用户密码是否过期。 chage -l root “Password expires”时间早于当前则表示过期。此时需要修改密码,或执行chage -M -1 root设置密码为未过期状态。 执行以下命令,安装Flume客户端到新目录,例如“/opt/FlumeClient”。安装时自动生成目录。 sh /opt/MRS_Flume_ClientConfig/Flume/install.sh -d /opt/FlumeClient -f MonitorServer实例的业务IP地址 -c Flume配置文件路径 -l /var/log/ -e Flume的业务IP地址 -n Flume客户端名称 各参数说明如下: “-d”:表示Flume客户端安装路径。 “-f”:可选参数,表示两个MonitorServer角色的业务IP地址,中间用英文逗号分隔,若不设置则Flume客户端将不向MonitorServer发送告警信息,同时在MRS Manager界面上看不到该客户端的相关信息。 “-c”:可选参数,表示Flume客户端在安装后默认加载的配置文件“properties.properties”。如不添加参数,默认使用客户端安装目录的“fusioninsight-flume-1.6.0/conf/properties.properties”。客户端中配置文件为空白模板,根据业务需要修改后Flume客户端将自动加载。 “-l”:可选参数,表示日志目录,默认值为“/var/log/Bigdata”。 “-e”:可选参数,表示Flume实例的业务IP地址,主要用于接收客户端上报的监控指标信息。 “-n”:可选参数,表示自定义的Flume客户端的名称。 IBM的JDK不支持“-Xloggc”,需要修改“flume/conf/flume-env.sh”,将“-Xloggc”修改为“-Xverbosegclog”,若JDK为32位,“-Xmx”不能大于3.25GB。 “flume/conf/flume-env.sh”中,“-Xmx”默认为4GB。若客户端机器内存过小,可调整为512M甚至1GB。 例如执行:sh install.sh -d /opt/FlumeClient 系统显示以下结果表示客户端运行环境安装成功: install flume client successfully.
  • 操作步骤 配置JobManager内存。 JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务平行度增大时,JobManager内存都需要相应增大。 您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。 在使用yarn-session命令时,添加“-jm MEM”参数设置内存。 在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。 配置TaskManager个数。 每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。 配置TaskManager Slot数。 每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。 在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。 在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。 配置TaskManager内存。 TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。 将在使用yarn-session命令时,添加“-tm MEM”参数设置内存。 将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。
  • 缓冲区超时设置 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下: env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  • Web安全 Flink Web安全加固,支持白名单过滤,Flink Web只能通过YARN代理访问,支持安全头域增强。在Flink集群中,各部件的监听端口支持范围可配置。 编码规范: 说明:Web Service客户端和服务器间使用相同的编码方式,是为了防止出现乱码现象,也是实施输入校验的基础。 安全加固:web server响应消息统一采用UTF-8字符编码。 支持IP白名单过滤: 说明:防止非法用户登录,需在web server侧添加IP Filter过滤源IP非法的请求。 安全加固:支持IP Filter实现Web白名单配置,配置项是“jobmanager.web.allow-access-address”,默认情况下只支持YARN用户接入。 安装客户端之后需要将客户端节点IP追加到jobmanager.web.allow-access-address配置项中。 禁止将文件绝对路径发送到客户端: 说明:文件绝对路径发送到客户端会暴露服务端的目录结构信息,有助于攻击者遍历了解系统,为攻击者攻击提供帮助。 安全加固:Flink配置文件中所有配置项中如果包含以/开头的,则删掉第一级目录。 同源策略: 适用于MRS 3.x及之后版本。 说明:如果两个URL的协议,主机和端口均相同,则它们同源;如果不同源,默认不能相互访问;除非被访问者在其服务端显示指定访问者的来源。 安全加固:响应头“Access-Control-Allow-Origin”头域默认配置为YARN集群ResourceManager的IP地址,如果源不是来自YARN的,则不能互相访问。 防范敏感信息泄露: 适用于MRS 3.x及之后版本。 说明:带有敏感数据的Web页面都应该禁止缓存,以防止敏感信息泄漏或通过代理服务器上网的用户数据互窜现象。 安全加固:添加“Cache-control”、“Pragma”、“Expires”安全头域,默认值为:“Cache-Control:no-store”,“Pragma :no-cache”,“Expires : 0”。实现了安全加固,Flink和web server交互的内容将不会被缓存。 防止劫持: 适用于MRS 3.x及之后版本。 说明:由于点击劫持(ClickJacking)和框架盗链都利用到框架技术,所以需要采用安全措施。 安全加固:添加“X-Frame-Options”安全头域,给浏览器提供允许一个页面可否在“iframe”、“frame”或“object”网站中的展现页面的指示,如果默认配置为“X-Frame-Options: DENY”,则确保任何页面都不能被嵌入到别的“iframe”、“frame”或“object”网站中,从而避免了点击劫持 (clickjacking) 的攻击。 对Web Service接口调用记录日志: 适用于MRS 3.x及之后版本。 说明:对“Flink webmonitor restful”接口调用进行日志记录。 安全加固:“access log”支持配置:“jobmanager.web.accesslog.enable”,默认为“true”。且日志保存在单独的“webaccess.log”文件中。 跨站请求(CSRF)伪造防范: 适用于MRS 3.x及之后版本。 说明:在B/S应用中,对于涉及服务器端数据改动(如增加、修改、删除)的操作必须进行跨站请求伪造的防范。跨站请求伪造是一种挟制终端用户在当前已登录的Web应用程序上执行非本意的操作的攻击方法。 安全加固:现有请求修改的接口有2个post,1个delete,其余均是get请求,非get请求的接口均已删除。 异常处理: 适用于MRS 3.x及之后版本。 说明:应用程序出现异常时,捕获异常,过滤返回给客户端的信息,并在日志中记录详细的错误信息。 安全加固:默认的错误提示页面,进行信息过滤,并在日志中记录详细的错误信息。新加四个配置项,默认配置为FusionInsight提供的跳转URL,错误提示页面跳转到固定配置的URL中,防止暴露不必要的信息。 表1 四个配置项参数介绍 参数 描述 默认值 是否必选配置 jobmanager.web.403-redirect-url web403页面,访问若遇到403错误,则会重定向到配置的页面。 - 是 jobmanager.web.404-redirect-url web404页面,访问若遇到404错误,则会重定向到配置的页面。 - 是 jobmanager.web.415-redirect-url web415页面,访问若遇到415错误,则会重定向到配置的页面。 - 是 jobmanager.web.500-redirect-url web500页面,访问若遇到500错误,则会重定向到配置的页面。 - 是 HTML5安全: 适用于MRS 3.x及之后版本。 说明:HTML5是下一代的Web开发规范,为开发者提供了许多新的功能并扩展了标签。这些新的标签及功能增加了攻击面,存在被攻击的风险(例如跨域资源共享、客户端存储、WebWorker、WebRTC、WebSocket等)。 安全加固:添加“Access-Control-Allow-Origin”配置,如运用到跨域资源共享功能,可对HTTP响应头的“Access-Control-Allow-Origin”属性进行控制。 Flink不涉及如客户端存储、WebWorker、WebRTC、WebSocket等安全风险。
  • Flink认证和加密 Flink集群中,各部件支持认证。 Flink集群内部各部件和外部部件之间,支持和外部部件如YARN、HDFS、ZooKeeprer进行kerberors认证。 Flink集群内部各部件之间,如Flink client和JobManager、JobManager和TaskManager、TaskManager和TaskManager之间支持security cookie认证。 Flink集群中,各部件支持SSL加密传输;集群内部各部件之间,如Flink client和JobManager、JobManager和TaskManager、TaskManager和TaskManager之间支持SSL加密传输。 详情可参考认证和加密。
  • ACL控制 在HA模式下,支持ACL控制。 Flink在HA模式下,支持用ZooKeeper来管理集群和发现服务。ZooKeeper支持SASL ACL控制,即只有通过SASL(kerberos)认证的用户,才有往ZK上操作文件的权限。如果要在Flink上使用SASL ACL控制,需要在Flink配置文件中设置如下配置: high-availability.zookeeper.client.acl: creatorzookeeper.sasl.disable: false 具体配置项介绍请参考HA。
  • 操作步骤 优化GC。 调整老年代和新生代的比值。在客户端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置项中添加参数:“-XX:NewRatio”。如“ -XX:NewRatio=2”,则表示老年代与新生代的比值为2:1,新生代占整个堆空间的1/3,老年代占2/3。 开发Flink应用程序时,优化DataStream的数据分区或分组操作。 当分区导致数据倾斜时,需要考虑优化分区。 避免非并行度操作,有些对DataStream的操作会导致无法并行,例如WindowAll。 keyBy尽量不要使用String。
  • 配置Pipeline 本章节适用于MRS 3.x及之后版本。 配置文件。 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28843 nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如: nettyconnector.ssl.enabled: true 安全认证配置。 Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置。 SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见加密传输。 父主题: 配置Flink安全特性
  • 参数描述 表1 CREATE SECONDARY INDEX参数 参数 描述 index_name 索引表的名称。表名称应由字母数字字符和下划线(_)特殊字符组成。 db_name 数据库的名称。数据库名称应由字母数字字符和下划线(_)特殊字符组成。 table_name 数据库中的表名称。表名称应由字母数字字符和下划线(_)特殊字符组成。 col_name 表中的列名称。支持多列。列名称应由字母数字字符和下划线(_)特殊字符组成。 table_blocksize 数据文件的block大小。更多详细信息,请参考•Block大小。
  • 示例 create table productdb.productSalesTable(id int,price int,productName string,city string) stored as carbondata; CREATE INDEX productNameIndexTable on table productdb.productSalesTable (productName,city) as 'carbondata' ; 上述示例将创建名为“productdb.productNameIndexTable”的二级表并加载所提供列的索引信息。
  • 回答 当源表或子查询具有大数据量的Partition时,创建Hive表失败。执行查询需要很多的task,此时输出的文件数就会很多,从而导致driver OOM。 可以在创建Hive表的语句中增加distribute by子句来解决这个问题,其中distribute by的字段要选取合适的cardinality(即distinct值的个数)。 distribute by子句限制了Hive表的Partition数量。增加distribute by 子句后,最终的输出文件数取决于指定列的cardinality和“spark.sql.shuffle.partitions”参数值。但如果distribute by的字段的cardinality值很小,例如,“spark.sql.shuffle.partitions”参数值为200,但distribute by字段的cardinality只有100,则输出的200个文件中,只有其中100个文件有数据,剩下的100个文件为空文件。也就是说,如果选取的字段的cardinality过低,如1,则会造成严重的数据倾斜,从而严重影响查询性能。 因此,建议选取的distribute by字段的cardinality个数要大于“spark.sql.shuffle.partitions”参数,可大于2~3倍。 示例: create table hivetable1 as select * from sourcetable1 distribute by col_age;
  • 回答 在这种场景下,CarbonData会给每个节点分配一个INSERT INTO或LOAD DATA任务。如果Executor不是不同的节点分配的,CarbonData将会启动较少的task。 解决措施: 您可以适当增大Executor内存和Executor核数,以便YARN可以在每个节点上启动一个Executor。具体的配置方法如下: 配置Executor核数。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 回答 转义字符以反斜线"\"开头,后跟一个或几个字符。如果输入记录包含类似\t,\b,\n,\r,\f,\',\",\\的转义字符,Java将把转义符'\'和它后面的字符一起处理得到转义后的值。 例如:如果CSV数据类似“2010\\10,test”,将这两列插入“String,int”类型时,因为“test”无法转换为int类型,表会将这条记录重定向到Bad Records中。但记录到Bad Records中的值为“2010\10”,Java会将原始数据中的“\\”转义为“\”。
  • 回答 在executor核数等于1的情况下,遵循以下规则对调优Spark Streaming运行参数有所帮助。 Spark任务处理速度和Kafka上partition个数有关,当partition个数小于给定executor个数时,实际使用的executor个数和partition个数相同,其余的将会被空闲。所以应该使得executor个数小于或者等于partition个数。 当Kafka上不同partition数据有倾斜时,数据较多的partition对应的executor将成为数据处理的瓶颈,所以在执行Producer程序时,数据平均发送到每个partition可以提升处理的速度。 在partition数据均匀分布的情况下,同时提高partition和executor个数,将会提升Spark处理速度(当partition个数和executor个数保持一致时,处理速度是最快的)。 在partition数据均匀分布的情况下,尽量保持partition个数是executor个数的整数倍,这样将会使资源得到合理利用。
  • 示例 create table productdb.productSalesTable(id int,price int,productName string,city string) stored as carbondata; CREATE INDEX productNameIndexTable on table productdb.productSalesTable (productName,city) as 'carbondata' ; SHOW INDEXES ON productdb.productSalesTable;
  • 系统响应 +-----+----------+--------------------------+------------------+------------+------------+-------------+--------------+--+| ID | Status | Load Start Time | Load Time Taken | Partition | Data Size | Index Size | File Format |+-----+----------+--------------------------+------------------+------------+------------+-------------+--------------+--+| 3 | Success | 2020-09-28 22:53:26.336 | 3.726S | {} | 6.47KB | 3.30KB | columnar_v3 || 2 | Success | 2020-09-28 22:53:01.702 | 6.688S | {} | 6.47KB | 3.30KB | columnar_v3 |+-----+----------+--------------------------+------------------+------------+------------+-------------+--------------+--+
  • 示例 添加carbon配置参数 carbon.clean.file.force.allowed = true create table carbon01(a int,b string,c string) stored as carbondata; insert into table carbon01 select 1,'a','aa'; insert into table carbon01 select 2,'b','bb'; delete from table carbon01 where segment.id in (0); show segments for table carbon01; CLEAN FILES FOR TABLE carbon01 options('force'='true'); show segments for table carbon01; 上述命令将从物理上删除所有DELETE SEGMENT命令删除的segment和合并后的旧的segment。
  • 示例 create table carbon01(a int,b string,c string) stored as carbondata; insert into table carbon01 select 1,'a','aa'; insert into table carbon01 select 2,'b','bb'; insert into table carbon01 select 3,'c','cc'; SHOW SEGMENTS FOR TABLE carbon01 LIMIT 2;
  • 示例 data.csv源文件数据如下所示: ID,date,country,name,phonetype,serialname,salary4,2014-01-21 00:00:00,xxx,aaa4,phone2435,ASD66902,150035,2014-01-22 00:00:00,xxx,aaa5,phone2441,ASD90633,150046,2014-03-07 00:00:00,xxx,aaa6,phone294,ASD59961,15005 CREATE TABLE carbontable(ID int, date Timestamp, country String, name String, phonetype String, serialname String,salary int) STORED AS carbondata; LOAD DATA inpath 'hdfs://hacluster/tmp/data.csv' INTO table carbontable options('DELIMITER'=',');
  • 注意事项 以下是可以在加载数据时使用的配置选项: DELIMITER:可以在加载命令中提供分隔符和引号字符。默认值为,。 OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"') 可使用'DELIMITER'='\t'来表示用制表符tab对CSV数据进行分隔。 OPTIONS('DELIMITER'='\t') CarbonData也支持\001和\017作为分隔符。 对于CSV数据,分隔符为单引号(')时,单引号必须在双引号(" ")内。例如:'DELIMITER'= "'"。 QUOTECHAR:可以在加载命令中提供分隔符和引号字符。默认值为"。 OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"') COMMENTCHAR:可以在加载命令中提供注释字符。在加载操作期间,如果在行的开头遇到注释字符,那么该行将被视为注释,并且不会被加载。默认值为#。 OPTIONS('COMMENTCHAR'='#') FILEHEADER:如果源文件中没有表头,可在LOAD DATA命令中提供表头。 OPTIONS('FILEHEADER'='column1,column2') ESCAPECHAR:如果用户想在CSV上对Escape字符进行严格验证,可以提供Escape字符。默认值为\。 OPTIONS('ESCAPECHAR'='\') 如果在CSV数据中输入ESCAPECHAR,该ESCAPECHAR必须在双引号(" ")内。例如:"a\b"。 Bad Records处理: 为了使数据处理应用程序为用户增值,不可避免地需要对数据进行某种程度的集成。在大多数情况下,数据质量问题源于生成源数据的上游(主要)系统。 有两种完全不同的方式处理Bad Data: 按照原始数据加载所有数据,之后进行除错处理。 在进入数据源的过程中,可以清理或擦除Bad Data,或者在发现Bad Data时让数据加载失败。 有多个选项可用于在CarbonData数据加载过程中清除源数据。对于CarbonData数据中的Bad Records管理,请参见表2。 表2 Bad Records Logger 配置项 默认值 描述 BAD_RECORDS_ LOG GER_ENABLE false 若设置为true,则将创建Bad Records日志文件,其中包含Bad Records的详细信息。 BAD_RECORDS_ACTION FAIL 以下为Bad Records的四种操作类型: FORCE:通过将Bad Records存储为NULL来自动校正数据。 REDIRECT:无法加载Bad Records,并将其写入BAD_RECORD_PATH下的CSV文件中,默认不开启该类型,如需使用该类型,需要设置参数carbon.enable.badrecord.action.redirect为true。 IGNORE:既不加载Bad Records也不将其写入CSV文件。 FAIL:如果发现存在Bad Records,数据加载将会失败。 说明: 在加载数据时,如果所有记录都是Bad Records,则参数BAD_RECORDS_ACTION将不起作用,加载数据操作将会失败。 IS_EMPTY_DATA_BAD_RECORD false 如果设置为“false”,则空(""或''或,,)数据将不被视为Bad Records,如果设置为“true”,则空数据将被视为Bad Records。 BAD_RECORD_PATH - 指定存储Bad Records的HDFS路径。默认值为Null。 如果启用了Bad Records日志记录或者Bad Records操作重定向,则该路径必须由用户进行配置。 示例: LOAD DATA INPATH 'filepath.csv' INTO TABLE tablename OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true', 'BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon', 'BAD_RECORDS_ACTION'='REDIRECT', 'IS_EMPTY_DATA_BAD_RECORD'='false'); 使用“REDIRECT”选项,CarbonData会将所有的Bad Records添加到单独的CSV文件中,但是该文件内容不能用于后续的数据加载,因为其内容可能无法与源记录完全匹配。用户必须清理原始源记录以便于进一步的数据提取。该选项的目的只是让用户知道哪些记录被视为Bad Records。 MAXCOLUMNS:该可选参数指定了在一行中,由CSV解析器解析的最大列数。 OPTIONS('MAXCOLUMNS'='400') 表3 MAXCOLUMNS 可选参数名称 默认值 最大值 MAXCOLUMNS 2000 20000 表4 MAXCOLUMNS可选参数的行为图 MAXCOLUMNS值 在文件Header选项中的列数 考虑的最终值 在加载项中未指定 5 2000 在加载项中未指定 6000 6000 40 7 文件header列数与MAXCOLUMNS值,两者中的最大值 22000 40 20000 60 在加载项中未指定 CSV文件第一行的列数与MAXCOLUMNS值,两者中的最大值 对于设置MAXCOLUMNS Option的最大值,要求executor具有足够的内存,否则,数据加载会由于内存不足的错误而失败。
  • 注意事项 在执行此命令之前,应将旧表的表结构定义schema和数据复制到新数据库位置。 对于旧版本仓库,源集群和目的集群的时区应该相同。 新的数据库和旧数据库的名字应该相同。 执行命令前,旧表的表结构定义schema和数据应该复制到新的数据库位置。 如果表是聚合表,则应将所有聚合表复制到新的数据库位置。 如果旧集群使用HIVE元数据库来存储表结构,则刷新将不起作用,因为文件系统中不存在表结构定义schema文件。
  • 注意事项 表必须已经存在。 用户应属于数据加载组以执行数据加载操作。默认情况下,数据加载组被命名为“ficommon”。 CarbonData表不支持Overwrite。 源表和目标表的数据类型应该相同,否则原表中的数据将被视为Bad Records。 INSERT INTO命令不支持部分成功(partial success),如果存在Bad Records,该命令会失败。 在从源表插入数据到目标表的过程中,无法在源表中加载或更新数据。 若要在INSERT操作期间启用数据加载或更新,请将以下参数配置为“true”。 “carbon.insert.persist.enable”=“true” 默认上述参数配置为“false”。 启用该参数将降低INSERT操作的性能。
  • 操作步骤 用于CarbonData查询的配置介绍,详情请参见表1和表2。 表1 Shuffle过程中,启动Task的个数 参数 spark.sql.shuffle.partitions 所属配置文件 spark-defaults.conf 适用于 数据查询 场景描述 Spark shuffle时启动的Task个数。 如何调优 一般建议将该参数值设置为执行器核数的1到2倍。例如,在聚合场景中,将task个数从200减少到32,有些查询的性能可提升2倍。 表2 设置用于CarbonData查询的Executor个数、CPU核数以及内存大小 参数 spark.executor.cores spark.executor.instances spark.executor.memory 所属配置文件 spark-defaults.conf 适用于 数据查询 场景描述 设置用于CarbonData查询的Executor个数、CPU核数以及内存大小。 如何调优 在银行方案中,为每个执行器提供4个CPU内核和15GB内存,可以获得良好的性能。这2个值并不意味着越多越好,在资源有限的情况下,需要正确配置。例如,在银行方案中,每个节点有足够的32个CPU核,而只有64GB的内存,这个内存是不够的。例如,当每个执行器有4个内核和12GB内存,有时在查询期间发生垃圾收集(GC),会导致查询时间从3秒增加到超过15秒。在这种情况下需要增加内存或减少CPU内核。 用于CarbonData数据加载的配置参数,详情请参见表3、表4和表5。 表3 设置数据加载使用的CPU core数量 参数 carbon.number.of.cores.while.loading 所属配置文件 carbon.properties 适用于 数据加载 场景描述 数据加载过程中,设置处理数据使用的CPU core数量。 如何调优 如果有更多的CPU个数,那么可以增加CPU值来提高性能。例如,将该参数值从2增加到4,那么CSV文件读取性能可以增加大约1倍。 表4 是否使用YARN本地目录进行多磁盘数据加载 参数 carbon.use.local.dir 所属配置文件 carbon.properties 适用于 数据加载 场景描述 是否使用YARN本地目录进行多磁盘数据加载。 如何调优 如果将该参数值设置为“true”,CarbonData将使用YARN本地目录进行多表加载磁盘负载平衡,以提高数据加载性能。 表5 加载时是否使用多路径 参数 carbon.use.multiple.temp.dir 所属配置文件 carbon.properties 适用于 数据加载 场景描述 是否使用多个临时目录存储sort临时文件。 如何调优 设置为true,则数据加载时使用多个临时目录存储sort临时文件。此配置能提高数据加载性能并避免磁盘单点故障。 用于CarbonData数据加载和数据查询的配置参数,详情请参见表6。 表6 设置数据加载和查询使用的CPU core数量 参数 carbon.compaction.level.threshold 所属配置文件 carbon.properties 适用于 数据加载和查询 场景描述 对于minor压缩,在阶段1中要合并的segment数量和在阶段2中要合并的已压缩的segment数量。 如何调优 每次CarbonData加载创建一个segment,如果每次加载的数据量较小,将在一段时间内生成许多小文件,影响查询性能。配置该参数将小的segment合并为一个大的segment,然后对数据进行排序,可提高查询性能。 压缩的策略根据实际的数据大小和可用资源决定。如某银行1天加载一次数据,且加载数据选择在晚上无查询时进行,有足够的资源,压缩策略可选择为6、5。 表7 使用索引缓存服务器时是否开启数据预加载 参数 carbon.indexserver.enable.prepriming 所属配置文件 carbon.properties 适用于 数据加载 场景描述 使用索引缓存服务器过程中开启数据预加载可以提升首次查询的性能。 如何调优 用户可以将该参数设置为true来开启预加载。默认情况,该参数为false。
共100000条