华为云用户手册

  • 问题 Spark Streaming应用创建1个输入流,但该输入流无输出逻辑。应用从checkpoint恢复启动失败,报错如下: 17/04/24 10:13:57 ERROR Utils: Exception encounteredjava.lang.NullPointerExceptionat org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:125)at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123)at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:515)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:510)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:191)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:186)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:142)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1230)at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:143)at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566)at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:612)at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:611)at com.spark.test.kafka08LifoTwoInkfk$.main(kafka08LifoTwoInkfk.scala:21)at com.spark.test.kafka08LifoTwoInkfk.main(kafka08LifoTwoInkfk.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:772)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  • 回答 问题分析 创建表。 CREATE TABLE TEST_TABLE(DATE varchar not null,NUM integer not null,SEQ_NUM integer not null,ACCOUNT1 varchar not null,ACCOUNTDES varchar,FLAG varchar,SALL double,CONSTRAINT PK PRIMARY KEY (DATE,NUM,SEQ_NUM,ACCOUNT1)); 创建全局索引 CREATE INDEX TEST_TABLE_INDEX ON TEST_TABLE(ACCOUNT1,DATE,NUM,ACCOUNTDES,SEQ_NUM); 插入数据 UPSERT INTO TEST_TABLE (DATE,NUM,SEQ_NUM,ACCOUNT1,ACCOUNTDES,FLAG,SALL) values ('20201001',30201001,13,'367392332','sffa1','',''); 执行BulkLoad任务更新数据 hbase org.apache.phoenix.mapreduce.CsvBulkLoadTool -t TEST_TABLE -i /tmp/test.csv,test.csv内容如下: 20201001 30201001 13 367392332 sffa888 1231243 23 问题现象:无法直接更新之前存在的索引数据,导致存在两条索引数据。 +------------+-----------+-----------+---------------+----------------+| :ACCOUNT1 | :DATE | :NUM | 0:ACCOUNTDES | :SEQ_NUM |+------------+-----------+-----------+---------------+----------------+| 367392332 | 20201001 | 30201001 | sffa1 | 13 || 367392332 | 20201001 | 30201001 | sffa888 | 13 |+------------+-----------+-----------+---------------+----------------+ 解决方法 删除旧的索引表。 DROP INDEX TEST_TABLE_INDEX ON TEST_TABLE; 异步方式创建新的索引表。 CREATE INDEX TEST_TABLE_INDEX ON TEST_TABLE(ACCOUNT1,DATE,NUM,ACCOUNTDES,SEQ_NUM) ASYNC; 索引重建。 hbase org.apache.phoenix.mapreduce.index.IndexTool --data-table TEST_TABLE --index-table TEST_TABLE_INDEX --output-path /user/test_table
  • 使用 MRS 客户端 在已安装客户端的节点,执行sudo su - omm命令切换用户。执行以下命令切换到客户端目录: cd /opt/client 执行以下命令配置环境变量: source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit MRS集群用户 例如,kinit admin。 启用Kerberos认证的MRS集群默认创建“admin”用户账号,用于集群管理员维护集群。 直接执行组件的客户端命令。 例如:使用HDFS客户端命令查看HDFS根目录文件,执行hdfs dfs -ls /。
  • 在集群外节点上安装客户端 创建一个满足要求的弹性云服务器,要求如下: 针对MRS 3.x之前版本的集群,需要先确认当前MRS集群节点的CPU架构。针对MRS 3.x之前版本的集群,该弹性云服务器的CPU架构请和MRS集群节点保持一致,MRS 3.x及之后版本MRS客户端兼容两种CPU架构。 已准备一个弹性云服务器,主机操作系统及版本请参见表1。 表1 参考列表 CPU架构 操作系统 支持的版本号 x86计算 Euler 可用:Euler OS 2.2 可用:Euler OS 2.3 可用:Euler OS 2.5 鲲鹏计算(ARM) Euler 可用:Euler OS 2.8 例如,用户可以选择操作系统为Euler的弹性云服务器准备操作。 同时为弹性云服务分配足够的磁盘空间,例如“40GB”。 弹性云服务器的VPC需要与MRS集群在同一个VPC中。 弹性云服务器的安全组需要和MRS集群Master节点的安全组相同。 如果不同,请修改弹性云服务器安全组或配置弹性云服务器安全组的出入规则允许MRS集群所有安全组的访问。 需要允许用户使用密码方式登录Linux弹性云服务器(SSH方式),请参见SSH密码方式登录。 MRS集群安全组入方向将所有端口对客户端节点放开,具体操作请参考添加安全组规则。 登录MRS Manager页面,具体请参见访问MRS Manager(MRS 3.x之前版本),然后选择“服务管理”。 单击“下载客户端”。 在“客户端类型”选择“完整客户端”。 在“下载路径”选择“远端主机”。 将“主机IP”设置为E CS 的IP地址,设置“主机端口”为“22”,并将“存放路径”设置为“/tmp”。 如果使用SSH登录ECS的默认端口“22”被修改,请将“主机端口”设置为新端口。 “存放路径”最多可以包含256个字符。 “登录用户”设置为“root”。 如果使用其他用户,请确保该用户对保存目录拥有读取、写入和执行权限。 在“登录方式”选择“密码”或“SSH私钥”。 密码:输入创建集群时设置的root用户密码。 SSH私钥:选择并上传创建集群时使用的密钥文件。 单击“确定”开始生成客户端文件。 若界面显示以下提示信息表示客户端包已经成功保存。单击“关闭”。客户端文件请到下载客户端时设置的远端主机的“存放路径”中获取。 下载客户端文件到远端主机成功。 若界面显示以下提示信息,请检查用户名密码及远端主机的安全组配置,确保用户名密码正确,及远端主机的安全组已增加SSH(22)端口的入方向规则。然后从2执行重新下载客户端。 连接到服务器失败,请检查网络连接或参数设置。 生成客户端会占用大量的磁盘IO,不建议在集群处于安装中、启动中、打补丁中等非稳态场景下载客户端。 使用VNC方式,登录弹性云服务器。参见远程登录(VNC方式)。 所有镜像均支持Cloud-init特性。Cloud-init预配置的用户名“root”,密码为创建集群时设置的密码。首次登录建议修改。 执行ntp时间同步,使集群外节点的时间与MRS集群时间同步。 检查安装NTP服务有没有安装,未安装请执行yum install ntp -y命令自行安装。 执行vim /etc/ntp.conf命令编辑NTP客户端配置文件,并增加MRS集群中Master节点的IP并注释掉其他server的地址。 server master1_ip preferserver master2_ip 图1 增加Master节点的IP 执行service ntpd stop命令关闭NTP服务。 执行如下命令,手动同步一次时间: /usr/sbin/ntpdate 192.168.10.8 192.168.10.8为主Master节点的IP地址。 执行service ntpd start或systemctl restart ntpd命令启动NTP服务。 执行ntpstat命令查看时间同步结果。 在弹性云服务器,切换到root用户,并将6中“存放路径”中的安装包复制到目录“/opt”,例如“存放路径”设置为“/tmp”时命令如下。 sudo su - root cp /tmp/MRS_Services_Client.tar /opt 在“/opt”目录执行以下命令,解压压缩包获取校验文件与客户端配置包。 tar -xvf MRS_Services_Client.tar 执行以下命令,校验文件包。 sha256sum -c MRS_Services_ClientConfig.tar.sha256 界面显示如下: MRS_Services_ClientConfig.tar: OK 执行以下命令,解压“MRS_Services_ClientConfig.tar”。 tar -xvf MRS_Services_ClientConfig.tar 执行以下命令,安装客户端到新的目录,例如“/opt/Bigdata/client”。安装时自动生成目录。 sh /opt/MRS_Services_ClientConfig/install.sh /opt/Bigdata/client 查看安装输出信息,如有以下结果表示客户端安装成功: Components client installation is complete. 验证弹性云服务器节点是否与集群Master节点的IP是否连通? 例如,执行以下命令:ping Master节点IP地址 是,执行18。 否,检查VPC、安全组是否正确,是否与MRS集群在相同VPC和安全组,然后执行18。 执行以下命令配置环境变量: source /opt/Bigdata/client/bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit MRS集群用户 例如, kinit admin 执行组件的客户端命令。 例如,执行以下命令查看HDFS目录: hdfs dfs -ls /
  • ClickHouse表数据操作 客户端登录ClickHouse节点。例如: clickhouse client --host node-master3QsRI --multiline --port 9440 --secure; node-master3QsRI 参数为查看ClickHouse服务cluster等环境参数信息中2对应的host_name参数的值。 参考创建本地复制表和分布式表创建表后,可以插入数据到本地表。 例如插入数据到本地表:test insert into test values(toDateTime(now()), rand()); 查询本地表信息。 例如查询2中的表test数据信息: select * from test; SELECT *FROM test┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:10:42 │ 1596238076 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.002 sec. 查询Distributed分布式表。 例如3中因为分布式表test_all基于test创建,所以test_all表也能查询到和test相同的数据。 select * from test_all; SELECT *FROM test_all┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:10:42 │ 1596238076 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.004 sec. 切换登录节点为相同shard_num的shard节点,并且查询当前表信息,能查询到相同的表数据。 例如,退出原有登录节点:exit; 切换到节点node-group-1tXED0003: clickhouse client --host node-group-1tXED0003 --multiline --port 9440 --secure; 通过2可以看到node-group-1tXED0003和node-master3QsRI的shard_num值相同。 show tables; SHOW TABLES┌─name─────┐│ test ││ test_all │└────────┘ 查询本地表数据。例如在节点node-group-1tXED0003查询test表数据。 select * from test; SELECT *FROM test┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:10:42 │ 1596238076 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.005 sec. 切换到不同shard_num的shard节点,并且查询之前创建的表数据信息。 例如退出之前的登录节点node-group-1tXED0003: exit; 切换到node-group-1tXED0001节点。通过2可以看到node-group-1tXED0001和node-master3QsRI的shard_num值不相同。 clickhouse client --host node-group-1tXED0001 --multiline --port 9440 --secure; 查询test本地表数据,因为test是本地表所以在不同分片节点上查询不到数据。 select * from test; SELECT *FROM testOk. 查询test_all分布式表数据,能正常查询到数据信息。 select * from test_all; SELECT *FROM test┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:12:19 │ 3686805070 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.002 sec.
  • 查看ClickHouse服务cluster等环境参数信息 使用ClickHouse客户端连接到ClickHouse服务端,具体请参考使用ClickHouse客户端。 查询集群标识符cluster等其他环境参数信息。 select cluster,shard_num,replica_num,host_name from system.clusters; SELECT cluster, shard_num, replica_num, host_nameFROM system.clusters┌─cluster───────────┬─shard_num─┬─replica_num─┬─host_name──────── ┐│ default_cluster_1 │ 1 │ 1 │ node-master1dOnG ││ default_cluster_1 │ 1 │ 2 │ node-group-1tXED0001 ││ default_cluster_1 │ 2 │ 1 │ node-master2OXQS ││ default_cluster_1 │ 2 │ 2 │ node-group-1tXED0002 ││ default_cluster_1 │ 3 │ 1 │ node-master3QsRI ││ default_cluster_1 │ 3 │ 2 │ node-group-1tXED0003 │└─────────────── ┴────── ┴─────── ┴──────────────┘6 rows in set. Elapsed: 0.001 sec. 查询分片标识符shard和副本标识符replica。 select * from system.macros; SELECT *FROM system.macros┌─macro───┬─substitution─────┐│ id │ 76 ││ replica │ 2 ││ shard │ 3 │└────── ┴────────────┘3 rows in set. Elapsed: 0.001 sec.
  • 创建本地复制表和分布式表 客户端登录ClickHouse节点,例如:clickhouse client --host node-master3QsRI --multiline --port 9440 --secure; node-master3QsRI 参数为查看ClickHouse服务cluster等环境参数信息中2对应的host_name参数的值。 使用ReplicatedMergeTree引擎创建复制表。 详细的语法说明请参考:https://clickhouse.tech/docs/zh/engines/table-engines/mergetree-family/replication/#creating-replicated-tables。 例如,如下在default_cluster_1集群节点上和default数据库下创建表名为test的ReplicatedMergeTree表: CREATE TABLE default.test ON CLUSTER default_cluster_1 ( `EventDate` DateTime, `id` UInt64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/test', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY id; 参数说明如下: ON CLUSTER语法表示分布式DDL,即执行一次就可在集群所有实例上创建同样的本地表。 default_cluster_1为查看ClickHouse服务cluster等环境参数信息中2查询到的cluster集群标识符。 ReplicatedMergeTree引擎族接收两个参数: ZooKeeper中该表相关数据的存储路径。 该路径必须在/clickhouse目录下,否则后续可能因为ZooKeeper配额不够导致数据插入失败。 为了避免不同表在ZooKeeper上数据冲突,目录格式必须按照如下规范填写: /clickhouse/tables/{shard}/default/test,其中/clickhouse/tables/{shard}为固定值,default为数据库名,test为创建的表名。 副本名称,一般用{replica}即可。 CREATE TABLE default.test ON CLUSTER default_cluster_1( `EventDate` DateTime, `id` UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/test', '{replica}')PARTITION BY toYYYYMM(EventDate)ORDER BY id┌─host─────────────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐│ node-group-1tXED0002 │ 9000 │ 0 │ │ 5 │ 3 ││ node-group-1tXED0003 │ 9000 │ 0 │ │ 4 │ 3 ││ node-master1dOnG │ 9000 │ 0 │ │ 3 │ 3 │└────────────────────┴────┴─────┴──── ┴─────────── ┴──────────┘┌─host─────────────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐│ node-master3QsRI │ 9000 │ 0 │ │ 2 │ 0 ││ node-group-1tXED0001 │ 9000 │ 0 │ │ 1 │ 0 ││ node-master2OXQS │ 9000 │ 0 │ │ 0 │ 0 │└────────────────────┴────┴─────┴──── ┴─────────── ┴──────────┘6 rows in set. Elapsed: 0.189 sec. 使用Distributed引擎创建分布式表。 例如,以下将在default_cluster_1集群节点上和default数据库下创建名为test_all 的Distributed表: CREATE TABLE default.test_all ON CLUSTER default_cluster_1 ( `EventDate` DateTime, `id` UInt64 ) ENGINE = Distributed(default_cluster_1, default, test, rand()); CREATE TABLE default.test_all ON CLUSTER default_cluster_1( `EventDate` DateTime, `id` UInt64)ENGINE = Distributed(default_cluster_1, default, test, rand())┌─host─────────────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐│ node-group-1tXED0002 │ 9000 │ 0 │ │ 5 │ 0 ││ node-master3QsRI │ 9000 │ 0 │ │ 4 │ 0 ││ node-group-1tXED0003 │ 9000 │ 0 │ │ 3 │ 0 ││ node-group-1tXED0001 │ 9000 │ 0 │ │ 2 │ 0 ││ node-master1dOnG │ 9000 │ 0 │ │ 1 │ 0 ││ node-master2OXQS │ 9000 │ 0 │ │ 0 │ 0 │└────────────────────┴────┴─────┴──── ┴─────────── ┴──────────┘6 rows in set. Elapsed: 0.115 sec. Distributed引擎需要以下几个参数: default_cluster_1为查看ClickHouse服务cluster等环境参数信息中2查询到的cluster集群标识符。 default本地表所在的数据库名称。 test为本地表名称,该例中为2中创建的表名。 (可选的)分片键(sharding key) 该键与config.xml中配置的分片权重(weight)一同决定写入分布式表时的路由,即数据最终落到哪个物理表上。它可以是表中一列的原始数据(如site_id),也可以是函数调用的结果,如上面的SQL语句采用了随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)。
  • 同步Hive表配置 参数 描述 默认值 hoodie.datasource.hive_sync.enable 是否同步hudi表信息到hive metastore。 注意: 建议该值设置为true,统一使用hive管理hudi表。 false hoodie.datasource.hive_sync.database 要同步给hive的数据库名。 default hoodie.datasource.hive_sync.table 要同步给hive的表名,建议这个值和hoodie.datasource.write.table.name保证一致。 unknown hoodie.datasource.hive_sync.username 同步hive时,指定的用户名。 hive hoodie.datasource.hive_sync.password 同步hive时,指定的密码。 hive hoodie.datasource.hive_sync.jdbcurl 连接hive jdbc指定的连接。 "" hoodie.datasource.hive_sync.use_jdbc 是否使用hive jdbc方式连接hive同步hudi表信息。建议该值设置为false,设置为false后 jdbc连接相关配置无效。 true hoodie.datasource.hive_sync.partition_fields 用于决定hive分区列。 "" hoodie.datasource.hive_sync.partition_extractor_class 用于提取hudi分区列值,将其转换成hive分区列。 org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor hoodie.datasource.hive_sync.support_timestamp 当hudi表存在timestamp类型字段时,需指定此参数为true,以实现同步timestamp类型到hive元数据中。该值默认为false,默认将timestamp类型同步为bigInt,默认情况可能导致使用sql查询包含timestamp类型字段的hudi表出现错误。 true 父主题: Hudi常见配置参数
  • 前提条件 ClickHouse服务运行正常,Zookeeper服务运行正常,迁入、迁出节点的ClickHouseServer实例状态正常。 请确保迁入节点已有待迁移数据表,且确保该表是MergeTree系列引擎的分区表。 创建迁移任务前请确保所有对待迁移数据表的写入任务已停止,且任务启动后,只允许对待迁移数据表进行查询操作,禁止对该表进行写入、删除等操作,否则可能会造成迁移前后数据不一致。 迁入节点的ClickHouse数据目录有足够的空间。
  • 使用示例 例如查询表t1的表结构: desc t1; ┌─name────┬─type─┬─default_type─┬─default_expression ┬─comment─┬─codec_expression─┬─ttl_expression─┐│ id │ UInt8 │ │ │ │ │ │ │ name │ UInt8 │ │ │ │ │ │ │ address │ String │ │ │ │ │ │└───────┴────┴────────┴────────── ┴───── ┴──────────┴─────────┘
  • 回答 将https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hive/hcatalog/hive-hcatalog-core/源下的jar包替换到mrs客户端的hcatalog的目录下,并重命名之前的同名hcatalog的jar包。如图302002就是替换后的包,310001-SNAPSHOT.jar.bak就是加了.bak后缀的原包。 图1 hcatalog目录 图2 替Jar包
  • 参数说明 表1 HDFS参数说明 参数 参数说明 默认值 fs.obs.security.provider 指定获取访问OBS文件系统密钥的实现方式。 参数取值: com.huawei.mrs.MrsObsCredentialsProvider:通过MRS云服务委托获取凭证。 com.obs.services.EcsObsCredentialsProvider:通过ECS云服务获取AK/SK信息。 com.obs.services.BasicObsCredentialsProvider:使用用户传入OBS的AK/SK信息。 com.obs.services.EnvironmentVariableObsCredentialsProvider:从环境变量中读取AK/SK信息。 com.huawei.mrs.MrsObsCredentialsProvider
  • 工具介绍 在Hadoop大规模生产集群中,由于HDFS的元数据都保存在NameNode的内存中,集群规模受制于NameNode单点的内存限制。如果HDFS中有大量的小文件,会消耗NameNode大量内存,还会大幅降低读写性能,延长作业运行时间。因此,小文件问题是制约Hadoop集群规模扩展的关键问题。 本工具主要有如下两个功能: 扫描表中有多少低于用户设定阈值的小文件,返回该表目录中所有数据文件的平均大小。 对表文件提供合并功能,用户可设置合并后的平均文件大小。
  • 问题 执行超过50T数据的shuffle过程时,出现部分Executor注册shuffle service超时然后丢失从而导致任务失败的问题。错误日志如下所示: 2016-10-19 01:33:34,030 | WARN | ContainersLauncher #14 | Exception from container-launch with container ID: container_e1452_1476801295027_2003_01_004512 and exit code: 1 | LinuxContainerExecutor.java:397ExitCodeException exitCode=1:at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)at org.apache.hadoop.util.Shell.run(Shell.java:472)at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:381)at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:312)at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:88)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)2016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Exception from container-launch. | ContainerExecutor.java:3002016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Container id: container_e1452_1476801295027_2003_01_004512 | ContainerExecutor.java:3002016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Exit code: 1 | ContainerExecutor.java:3002016-10-19 01:33:34,031 | INFO | ContainersLauncher #14 | Stack trace: ExitCodeException exitCode=1: | ContainerExecutor.java:300
  • 回答 由于当前数据量较大,有50T数据导入,超过了shuffle的规格,shuffle负载过高,shuffle service服务处于过载状态,可能无法及时响应Executor的注册请求,从而出现上面的问题。 Executor注册shuffle service的超时时间是5秒,最多重试3次,该参数目前不可配。 建议适当调大task retry次数和Executor失败次数。 在客户端的“spark-defaults.conf”配置文件中配置如下参数。“spark.yarn.max.executor.failures”若不存在,则手动添加该参数项。 表1 参数说明 参数 描述 默认值 spark.task.maxFailures task retry次数。 4 spark.yarn.max.executor.failures Executor失败次数。 关闭Executor个数动态分配功能的场景即“spark.dynamicAllocation.enabled”参数设为“false”时。 numExecutors * 2, with minimum of 3 Executor失败次数。 开启Executor个数动态分配功能的场景即“spark.dynamicAllocation.enabled”参数设为“true”时。 3
  • 操作步骤 要使用CBO优化,可以按照以下步骤进行优化。 需要先执行特定的SQL语句来收集所需的表和列的统计信息。 SQL命令如下(根据具体情况选择需要执行的SQL命令): 生成表级别统计信息(扫表): ANALYZE TABLE src COMPUTE STATIS TICS 生成sizeInBytes和rowCount。 使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。 生成表级别统计信息(不扫表): ANALYZE TABLE src COMPUTE STATISTICS NOSCAN 只生成sizeInBytes,如果原来已经生成过sizeInBytes和rowCount,而本次生成的sizeInBytes和原来的大小一样,则保留rowCount(若存在),否则清除rowCount。 生成列级别统计信息 ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS a, b, c 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如Seq, Map等)和HiveStringType的统计信息生成。 显示统计信息 DESC FORMATTED src 在Statistics中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。也可以通过如下命令显示列统计信息: DESC FORMATTED src a 使用限制:当前统计信息收集不支持针对分区表的分区级别的统计信息。 在Spark客户端的“spark-defaults.conf”配置文件中进行表1设置。 表1 参数介绍 参数 描述 默认值 spark.sql.cbo.enabled CBO总开关。 true表示打开, false表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 false spark.sql.cbo.joinReorder.enabled 使用CBO来自动调整连续的inner join的顺序。 true:表示打开 false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且CBO总开关打开。 false spark.sql.cbo.joinReorder.dp.threshold 使用CBO来自动调整连续inner join的表的个数阈值。 如果超出该阈值,则不会调整join顺序。 12
  • 回答 当nodeSelectPolicy为SEQUENCE,且第一个连接到RM的NM不可用时,RM会在“yarn.nm.liveness-monitor.expiry-interval-ms”属性中指定的周期内,一直尝试为同一个NM分配任务。 可以通过两种方式来避免上述问题: 使用其他的nodeSelectPolicy,如RANDOM。 参考修改集群服务配置参数,进入Yarn“全部配置”页面。在搜索框搜索以下参数,通过“yarn-site.xml”文件更改以下属性: “yarn.resourcemanager.am-scheduling.node-blacklisting-enabled” = “true”; “yarn.resourcemanager.am-scheduling.node-blacklisting-disable-threshold” = “0.5”。
  • 回答 问题分析 当HBase服务端出现问题,HBase客户端进行表操作的时候,会进行重试,并等待超时。该超时默认值为Integer.MAX_VALUE (2147483647 ms),所以HBase客户端会在这么长的时间内一直重试,造成挂起表象。 解决方法 HBase客户端提供两个配置项来控制客户端的重试超时方式,如表1。 在“客户端安装路径/HBase/hbase/conf/hbase-site.xml”配置文件中配置如下参数。 表1 HBase客户端操作重试超时相关配置 配置参数 描述 默认值 hbase.client.operation.timeout 客户端操作超时时间。需在配置文件中手动添加。 2147483647 ms hbase.client.retries.number 最大重试次数。用于表示所有可重试操作所支持的最大重试次数。 35 这两个参数的重试超时的配合方式如图1所示。 图1 HBase客户端操作重试超时流程 从该流程可以看出,如果未对这两个配置参数根据具体使用场景进行配置,会造成挂起迹象。建议根据使用场景,配置合适的超时时间,如果是长时间操作,则把超时时间设置长一点;如果是短时间操作,则把超时时间设置短一点。而重试次数可以设置为:“(hbase.client.retries.number)*60*1000(ms)”。刚好大于“hbase.client.operation.timeout”设置的超时时间。
  • 操作步骤 根据业务情况,准备好客户端,登录安装客户端的节点。 请根据客户端所在位置,参考安装客户端章节,登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/hadoopclient 执行以下命令配置环境变量。 source bigdata_env 执行以下命令,进行用户认证。(普通模式跳过此步骤) kinit 组件业务用户 执行命令进行客户端操作。 例如执行以下命令: cql storm 同一个storm客户端不能同时连接安全和非安全的ZooKeeper。
  • 推荐资源配置 mor表: 由于其本质上是写增量文件,调优可以直接根据hudi的数据大小(dataSize)进行调整。 dataSize如果只有几个G,推荐跑单节点运行spark,或者yarn模式但是只分配一个container。 入湖程序的并行度p设置:建议 p = (dataSize)/128M, 程序分配core的数量保持和p一致即可。内存设置建议内存大小和core的比例大于1.5:1 即一个core配1.5G内存, 堆外内存设置建议内存大小和core的比例大于0.5:1。 cow表: cow表的原理是重写原始数据,因此这种表的调优,要兼顾dataSize和最后重写的文件数量。总体来说core数量越大越好(和最后重写多少个文件数直接相关),并行度p和内存大小和mor设置类似。
  • 回答 这是一种误操作的异常情况,需要手动删除对应表的元数据后重试。 例如: 执行以下命令进入控制台: source ${BIGDATA_HOME}/ FusionInsight _BASE_8.1.0.1/install/FusionInsight-dbservice-2.7.0/.dbservice_profile gsql -p 20051 -U hive -d hivemeta -W HiveUser@ 执行 delete from tbls where tbl_id='xxx';
  • 回答 在这种场景下,CarbonData会给每个节点分配一个INSERT INTO或LOAD DATA任务。如果Executor不是不同的节点分配的,CarbonData将会启动较少的task。 解决措施: 您可以适当增大Executor内存和Executor核数,以便YARN可以在每个节点上启动一个Executor。具体的配置方法如下: 配置Executor核数。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 操作步骤 以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。 “taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。 “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。 “taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。 “taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。 “taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
  • 命令语法 ALTER TABLE [db_name.]table_name ADD COLUMNS (col_name data_type,...) TBLPROPERTIES(''COLUMNPROPERTIES.columnName.shared_column'='sharedFolder.sharedColumnName,...', 'DEFAULT.VALUE.COLUMN_NAME'='default_value');
  • 参数描述 表1 ADD COLUMNS参数描述 参数 描述 db_name 数据库名。若未指定,则选择当前数据库。 table_name 表名。 col_name data_type 带数据类型且用逗号分隔的列的名称。列名称包含字母,数字和下划线(_)。 说明: 创建CarbonData表时,不要将列名命名为tupleId,PositionId和PositionReference,因为将在UPDATE,DELETE和二级索引命令内部使用这些名称。
  • 示例 ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING); ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('COLUMNPROPERTIES.b1.shared_column'='sharedFolder.b1'); ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('DEFAULT.VALUE.a1'='10');
  • ZooKeeper常用配置参数 参数入口: 请参考修改集群服务配置参数,进入ZooKeeper“全部配置”页面。在搜索框中输入参数名称。 表1 参数说明 配置参数 说明 默认值 skipACL 是否跳过ZooKeeper节点的权限检查。 no maxClientCnxns ZooKeeper的最大连接数,在连接数多的情况下,建议增加。 2000 LOG _LEVEL 日志级别,在调试的时候,可以改为DEBUG。 INFO acl.compare.shortName 当Znode的ACL权限认证类型为SASL时,是否仅使用principal的用户名部分进行ACL权限认证。 true synclimit Follower与leader进行同步的时间间隔(单位为tick)。如果在指定的时间内leader没响应,连接将不能被建立。 15 tickTime 一次tick的时间(毫秒),它是ZooKeeper使用的基本时间单位,心跳、超时的时间都由它来规定。 4000 ZooKeeper内部时间由参数ticktime和参数synclimit控制,如需调大ZooKeeper内部超时时间,需要调大客户端连接ZooKeeper的超时时间。 父主题: 使用ZooKeeper
  • Flink用户权限说明 访问并使用Flink WebUI进行业务操作需为用户赋予FlinkServer相关权限,Manager的admin用户没有FlinkServer的业务操作权限。 FlinkServer中应用(租户)是最大管理范围,包含集群连接管理、数据连接管理、应用管理、流表和作业管理等。 FlinkServer中有如表1所示三种资源权限: 表1 FlinkServer资源权限 权限名称 权限描述 备注 FlinkServer管理员权限 具有所有应用的编辑、查看权限。 是FlinkServer的最高权限。如果已经具有FlinkServer管理员权限,则会自动具备所有应用的权限。 应用编辑权限 具有当前应用编辑权限的用户,可以执行创建、编辑和删除集群连接、数据连接,创建流表、创建作业及运行作业等操作。 同时具有当前应用查看权限。 应用查看权限 具有当前应用查看权限的用户,可以查看应用。 - 父主题: Flink用户权限管理
  • Clustering架构 Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。 为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi 数据湖 文件的布局。 Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。 Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。 总体而言Clustering分为两个部分: 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。 将Clustering计划以avro元数据格式保存到时间线。 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。
  • 回答 运行包含Reduce的Mapreduce任务时,通过-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.nativetask.NativeMapOutputCollectorDelegator命令开启Native Task特性,任务在部分操作系统运行失败,日志中提示错误“version 'GLIBCXX_3.4.20' not found”。该问题原因是操作系统的GLIBCXX版本较低,导致该特性依赖的libnativetask.so.1.0.0库无法加载,进而导致任务失败。 规避手段: 设置配置项mapreduce.job.map.output.collector.class的值为org.apache.hadoop.mapred.MapTask$MapOutputBuffer。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全