华为云用户手册

  • 操作场景 默认情况下关闭HDFS文件时需要等待所有的Block都上报成功(处于COMPLETED状态)。因此HDFS的一部分写性能消耗为等待DataNode块上报以及NameNode处理块上报。对于一个负载较大的集群,等待的消耗对集群影响较大。HDFS可以通过配置NameNode参数“dfs.namenode.file.close.num-committed-allowed”来提前关闭文件,提升写数据性能。但是由于提前关闭了文件,可能在读取数据的时候由于块找不到或者NameNode元数据中记录的数据块信息和DataNode中存储的真实副本不一致而失败。因此该特性不适用于写完数据即读的场景,请结合业务场景谨慎使用该特性。 该功能适用于 MRS 3.2.0-LTS.1及之后版本。
  • 容量规格 ConfigNode容量规格 当创建新的存储组时,IoTDB默认为该存储组分配10000个槽位,数据写入时根据写入的设备名和时间值,分配或创建一个data region并挂载在某个槽位上。所以ConfigNode的内存容量占用跟存储组个数和该存储组持续写入的时间相关。 槽位分配相关对象 对象大小(字节) TTimePartitionSlot 4 TSeriesPartitionSlot 8 TConsensusGroupId 4 根据上表计算可得一个ConfigNode,如果创建一个存储组,持续运行10年,大约需要0.68G内存 : 10000(槽位) * 10(年)* 53 (分区) * (TTimePartitionSlot size + TSeriesPartitionSlot size + TConsensusGroupId size)= 0.68G IoTDBServer容量规格 IoTDB中数据以region分配在IoTDBServer上,region副本数默认是“3”,最终在IoTDBServer文件系统上表现为3个文件。上限为操作系统可存储文件个数最大值,对于Linux系统即是inode个数。
  • 操作场景 TimelineServer作为Yarn服务的一个角色,当前版本开始支持HA模式。如果需要避免TimelineServer单点故障问题,可以通过开启TimelineServer HA来确保Yarn TimelineServer角色的高可用性。 TimelineServer会将数据保存到内存数据库LevelDB中,占用大量内存,安装TimelineServer的节点内存至少需要预留30GB。 该功能适用于MRS 3.2.0-LTS.1及之后版本。
  • 前提条件 集群已安装CDL、Hudi服务且运行正常。 ThirdKafka数据库的Topic需要能被MRS集群消费,操作步骤请参考ThirdPartyKafka前置准备。 在 FusionInsight Manager中创建一个人机用户,例如“cdluser”,加入用户组cdladmin、hadoop、kafka、supergroup,主组选择“cdladmin”组,关联角色“System_administrator”。
  • 回答 使用当前可得的最近的tablestatus文件进行恢复,分为如下两个场景来进行恢复: 场景一:当前批次的CarbonData数据文件和.segment文件损坏无法恢复。 进入客户端节点,执行如下命令,查看HDFS对应表的tablestatus文件,找到最近的tablestatus版本号。 cd 客户端安装路径 source bigdata_env source Spark/component_env kinit 组件业务用户 (普通集群无需执行kinit命令) hdfs dfs -ls /user/hive/warehouse/hrdb.db/car01/Metadata 上图中,当前批次文件tablestatus_1669028899548损坏,需要使用tablestatus_1669028852132文件。 进入spark sql,执行如下命令来修改表属性latestversion为当前最近的版本号。 alter table car01 set SERDEPROPERTIES ('latestversion'='1669082252132'); 需要退出当前session,重新连接后执行查询。该方式已尽可能恢复客户数据,一般现网情况下,如断电场景segment数据文件也会存在不可恢复情况。 场景二:当前批次的Carbondata数据文件和.segment文件完整,可恢复。 使用TableStatusRecovery恢复工具,当前工具仅针对非分区表进行恢复。进入Spark客户端节点,执行如下命令: cd 客户端安装路径 source bigdata_env source Spark/component_env kinit 组件业务用户 (普通集群无需执行kinit命令) spark-submit --master yarn --class org.apache.carbondata.recovery.tablestatus.TableStatusRecovery Spark/spark/carbonlib/carbondata-spark_*.jar hrdb car01 参数说明:hrdb car01表名称。 TableStatusRecovery恢复工具限制: 合并后,如果tablestatus文件丢失或损坏,使用该工具无法恢复合并状态的segment,因为丢失或损坏的tablestatus文件才存在该segment合并信息。 Delete segment by Id/Date后,如果tablestatus文件丢失或损坏,则无法恢复已删除的segment信息,因为只有丢失或损坏的tablestatus文件才存在该segment的删除信息。 不支持在mv表上使用该工具。 由于最新的tablestatus文件存在问题,使用该工具恢复后无法正常查询时,可以移除最新的tablestatus文件,使用上一个tablestatus文件进行恢复。
  • 使用方法 配置Flink作业时,可通过在FlinkServer Web UI的作业开发界面添加自定义参数“over.window.interval”,且值配置为大于或等于“0”时开启窗口支持数据超期功能,创建作业可参考如何创建FlinkServer作业。该设置会对作业中的所有over窗口生效,建议对单over窗口的作业使用此功能。 SQL示例: CREATE TABLE OverSource ( `rowtime` TIMESTAMP_LTZ(3), `groupId` INT, `value` INT, `name` STRING, `additional_field` STRING, `proctime` as PROCTIME(), WATERMARK FOR rowtime AS rowtime) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --FlinkServer所在集群为非安全模式去掉此参数 'properties.security.protocol' = 'SASL_PLAINTEXT', --FlinkServer所在集群为非安全模式去掉此参数 'properties.kerberos.domain.name' = 'hadoop.系统 域名 ' --FlinkServer所在集群为非安全模式去掉此参数);CREATE TABLE LD_SINK( `name` STRING, `groupId` INT, `rowtime` TIMESTAMP_LTZ(3),`count_zw` BIGINT,`sum_zw` BIGINT) WITH ( 'connector' = 'print');SELECT `name`, `groupId`, COUNT(`value`) OVER ( PARTITION BY groupId ORDER BY proctime RANGE BETWEEN INTERVAL '10' second PRECEDING AND CURRENT ROW ) as count_zw, SUM(`value`) OVER ( PARTITION BY groupId ORDER BY proctime RANGE BETWEEN INTERVAL '10' second PRECEDING AND CURRENT ROW ) as sum_zwFROM OverSource
  • 前提条件 在FusionInsight Manager创建一个角色,添加ClickHouse逻辑集群的管理权限以及Loader作业分组权限。创建Loader作业的业务用户,关联该角色和并添加用户组yarnviewgroup。 ClickHouse表已创建,确保用户已授权访问作业执行时操作该表的权限,参照ClickHouse客户端使用实践创建本地复制表和分布式表,导出时选择本地复制表。 确保没有出现ClickHouse相关告警。
  • 前提条件 需要MRS集群已安装HetuEngine组件并添加HSFabric实例。HSFabric实例的新增,删除,迁移和端口的修改,都需要重启Hue服务。 已在集群中创建HetuEngine管理员“人机”用户,如hetu_user,可参考创建HetuEngine权限角色。启用Ranger鉴权的集群需根据业务需求为该hetu_user添加Ranger权限,可参考添加HetuEngine的Ranger访问权限策略。 已创建计算实例并运行正常,可参考创建HetuEngine计算实例。
  • 前提条件 集群已安装CDL、Hudi服务且运行正常。 ThirdKafka数据库的Topic需要能被MRS集群消费,操作步骤请参考ThirdPartyKafka前置准备。 在FusionInsight Manager中创建一个人机用户,例如“cdluser”,加入用户组cdladmin、hadoop、kafka、supergroup,主组选择“cdladmin”组,关联角色“System_administrator”。
  • 处理步骤 如果运行任务的BE节点故障,需查看具体的故障原因再进行解决。 如果RPC源端有大量未发送的数据超过了阈值,可设置如下参数: brpc_socket_max_unwritten_bytes:用于设置未发送的数据量的阈值,默认为1GB。如果未发送数据量超过该值,则会报OVERCROWDED错,可适当调大该值。 tablet_writer_ignore_eovercrowded:是否忽略数据导入过程中出现的OVERCROWDED错误,默认值为“false”。该参数主要用于避免导入失败,以提高导入的稳定性。 max_body_size:用于设置RPC的包大小阈值 ,默认为3GB。如果查询中带有超大 String 类型,或者bitmap类型数据时,可以通过修改该参数规避。
  • 删除HBase全局二级索引 在HBase客户端执行以下命令可删除某个索引: hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalTableIndexer -Dtablename.to.index='table' -Dindexnames.to.drop='idx1#idx2' 相关参数介绍如下: tablename.to.index:表示需删除的索引所在的表名称。 indexnames.to.drop:表示需要删除的索引名称,可以同时指定多个,用#号分隔。
  • 调优案例 某用户使用Hudi MOR表存储其设备的订单出借信息,可通过订单号查询订单详细信息,每天订单量相对稳定,部分节假日可能存在小高峰,该场景存在以下特点: 订单号作为唯一值,并且80%以上的查询场景使用订单号进行等值查询,SQL形如select * from table where order_id = 'id1'; 每天订单量稳定,可采用天作为分区键。 历史分区更新不频繁,主要数据更新在新分区。 调优建议: 使用Bucket索引建表(Spark-SQL),并且索引键为订单ID, 分区键为日期。 定期使用compaction合并日志,提高查询性能。 SQL示例: set hoodie.compact.inline=true;set hoodie.schedule.compact.only.inline=true;set hoodie.run.compact.only.inline=false;create table hudi_mor (order_id int, comb int, col1 string, col2 string, dt int)using hudipartitioned by(dt)options(type='mor', primaryKey='order_id', preCombineField='comb',hoodie.index.type = 'BUCKET',hoodie.bucket.index.num.buckets=100,hoodie.bucket.index.hash.field = 'order_id')
  • 操作步骤 以root用户登录待安装MySQL客户端的节点。 执行以下命令查看MySQL客户端依赖库ncurses-libs的版本: rpm -qa | grep ncurses 从https://downloads.mysql.com/archives/community/下载MySQL客户端对应的软件包,建议安装8.x版本,以Red Hat发行版本为例: 如果2的依赖库是6.x建议下载对应OS Version为Red Hat 8的MySQL软件包。 如果2的依赖库是5.x建议下载对应OS Version为Red Hat 7的MySQL软件包。 例如需安装的MySQL 8.0.22客户端需下载如下四个软件包: 将下载的软件包上传到待安装MySQL客户端的节点上。 在上传的文件所在目录执行以下命令,安装MySQL客户端及对应的依赖包: rpm -ivh mysql-community-client-8.0.22-1.el8.x86_64.rpm --nodeps --force rpm -ivh mysql-community-client-plugins-8.0.22-1.el8.x86_64.rpm --nodeps --force rpm -ivh mysql-community-common-8.0.22-1.el8.x86_64.rpm --nodeps --force rpm -ivh mysql-community-libs-8.0.22-1.el8.x86_64.rpm --nodeps --force 执行以下命令查看MySQL客户端的版本: mysql --version MySQL客户端安装成功后,即可访问Doris,详细操作请参见快速使用Doris。
  • 使用方法 在创建的Source流表中添加“subtask.scan.records-per-second.limit”参数,该参数表示每秒消费Kafka单分区记录数,因此Source端整体限流速率为:min( source parallelism * subtask.scan.records-per-second.limit,kafka partitions num * subtask.scan.records-per-second.limit) SQL示例如下: CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'subtask.scan.records-per-second.limit' = '1000', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名');CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名');Insert into KafkaSinkselect *from KafkaSource;
  • 参数描述 ttl_policy_save命令: 参数 描述 是否必填 table String类型,表名或者库名.表名 否,table和path二选一 path String类型,表的绝对路径 否,table和path二选一 spec String类型,正则表达式 是 level String类型,TTL级别 是,PARTITION或者RECORD,目前仅支持PARTITION value String类型,保留的时长 是 units String类型,保留时长的单位 是,YEARS、MONTHS、WEEKS和DAYS。 ttl_policy_delete命令: 参数 描述 是否必填 table String类型,表名或者库名.表名 否,table和path二选一 path String类型,表的绝对路径 否,table和path二选一 spec String类型,正则表达式 是 ttl_policy_empty命令: 参数 描述 是否必填 table String类型,表名或者库名.表名 否,table和path二选一 path String类型,表的绝对路径 否,table和path二选一 ttl_policy_show命令: 参数 描述 是否必填 table String类型,表名或者库名.表名 否,table和path二选一 path String类型,表的绝对路径 否,table和path二选一
  • Iceberg常用参数 本章节主要介绍Iceberg重要配置的详细信息。Iceberg表支持使用表属性来配置表的行为,例如读取器的默认拆分大小。 表1 读属性 属性名称 默认值 描述 read.split.target-size 134217728(128MB) 用于设置数据被拆分后的大小。 read.split.metadata-target-size 33554432(32MB) 用于设置元数据被拆分后的大小。 read.split.planning-lookback 10 用于设置拆分数据时的bin数。 read.split.open-file-cost 4194304(4MB) 打开文件的估计使用成本,在合并拆分时作为最小权重使用。 read.parquet.vectorization.enabled true 用于控制是否使用Parquet矢量化读取。 read.parquet.vectorization.batch-size 5000 Parquet矢量化读取的批处理大小。 read.orc.vectorization.enabled false 用于控制是否使用ORC矢量化读取。 read.orc.vectorization.batch-size 5000 ORC矢量化读取的批处理大小。 表2 写入属性 属性名称 默认值 描述 write.format.default parquet 表的默认文件格式,包括:parquet、avro和orc。 write.delete.format.default data file format 该参数的默认值和“write.format.default”参数值保持一致,可以修改为parquet、avro或orc。 write.parquet.row-group-size-bytes 134217728(128MB) Parquet行组大小。 write.parquet.page-size-bytes 1048576(1MB) Parquet页面大小。 write.parquet.page-row-limit 20000 Parquet页面行数限制。 write.parquet.dict-size-bytes 2097152(2MB) Parquet字典页面大小。 write.target-file-size-bytes 536870912(512MB) 用于控制生成的文件大小,以目标字节数为准。 write.delete.target-file-size-bytes 67108864(64MB) 用于控制生成的删除文件大小,以目标字节数为准。 write.data.path table location + /data 数据文件的基本位置。 write.metadata.path table location + /metadata 元数据文件的基本位置。 write.delete.mode copy-on-write 删除命令的模式,包括: copy-on-write merge-on-read write.delete.isolation-level serializable 删除命令的隔离级别,包括serializable和snapshot。 write.update.mode copy-on-write 更新命令的模式,包括: copy-on-write merge-on-read write.update.isolation-level serializable 更新命令的隔离级别,包括serializable和snapshot。 write.merge.mode copy-on-write 合并命令的模式,包括: copy-on-write merge-on-read write.merge.isolation-level serializable 合并命令的隔离级别,包括serializable和snapshot。 表3 表行为属性 属性名 默认值 描述 commit.retry.num-retries 4 在失败之前重试提交的次数。 commit.retry.min-wait-ms 100 重试提交之前等待的最小时间,单位为毫秒。 commit.retry.max-wait-ms 60000(1分钟) 重试提交之前等待的最大时间,单位为毫秒。 commit.retry.total-timeout-ms 1800000(30分钟) 提交的总重试超时时间,单位为毫秒。 commit.status-check.num-retries 3 在连接丢失后检查提交是否成功的次数,在未知提交状态下失败。 commit.status-check.min-wait-ms 1000(1秒) 重试状态检查之前等待的最小时间,单位为毫秒。 commit.status-check.max-wait-ms 60000(1分钟) 重试状态检查之前等待的最大时间,单位为毫秒。 commit.manifest.min-count-to-merge 100 累积清单文件进行合并之前的最小数量。 commit.manifest-merge.enabled true 控制在写入时是否自动合并清单。 父主题: 使用Iceberg
  • 基本语法 INSERT INTO tableIndentifier [VALUES(value)][SELECT query]; INSERT OVERWRITE tableIndentifier SELECT; MERGE INTO prod.db.target t USING (SELECT ...) s ON t.id = s.id WHEN ...; 其中: MERGE INTO: 目标表。 USING: 源表查询语句。 ON:源表和目标表匹配的条件。 WHEN: 数据更新条件。 insert into ... select ...;语法中不支持可空字段类型的数据插入到非空字段类型。
  • 使用不同用户执行yarn-session创建Flink集群失败 使用Flink过程中,具有两个相同权限用户testuser和bdpuser。使用用户testuser创建Flink集群正常,但是切换至bdpuser用户创建Fllink集群时,执行yarn-session.sh命令报错: 2019-01-02 14:28:09,098 | ERROR | [main] | Ensure path threw exception | org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl (CuratorFrameworkImpl.java:566) org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /flink/application_1545397824912_0022 原因是高可用配置项未修改。由于在Flink的配置文件中,“high-availability.zookeeper.client.acl”默认为“creator”,仅创建者有权限访问,新用户无法访问ZooKeeper上的目录导致yarn-session.sh执行失败。 解决方法如下: 修改客户端配置文件“conf/flink-conf.yaml”中配置项“high-availability.zookeeper.path.root”,例如: high-availability.zookeeper.path.root: flink2 重新提交Flink任务。
  • Flink客户端执行命令报错security.kerberos.login.keytab 客户端安装成功,执行客户端命令例如yarn-session.sh时报错,提示如下: [root@host01 bin]# yarn-session.sh2018-10-25 01:22:06,454 | ERROR | [main] | Error while trying to split key and value in configuration file /opt/flinkclient/Flink/flink/conf/flink-conf.yaml:80: "security.kerberos.login.keytab: " | org.apache.flink.configuration.GlobalConfiguration (GlobalConfiguration.java:160)Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Error while parsing YAML configuration file :80: "security.kerberos.login.keytab: " 在安全集群环境下,Flink需要进行安全认证。当前客户端未进行相关安全认证设置。 Flink整个系统有两种认证方式: 使用kerberos认证:Flink yarn client、Yarn Resource Manager、JobManager、HDFS、TaskManager、Kafka和Zookeeper。 使用YARN内部的认证机制:Yarn Resource Manager与Application Master(简称AM)。 如果用户安装安全集群需要使用kerberos认证和security cookie认证。根据日志提示,发现配置文件中“security.kerberos.login.keytab :”配置项错误,未进行安全配置。 解决方法如下: 从MRS上下载用户的keytab认证文件,并放置到Flink客户端所在节点的某个目录下。 在“flink-conf.yaml”文件中配置: keytab路径。 security.kerberos.login.keytab: /home/flinkuser/keytab/abc222.keytab “/home/flinkuser/keytab/abc222.keytab”表示的是用户目录,为1中放置目录。 请确保客户端用户具备对应目录权限。 principal名。 security.kerberos.login.principal: abc222 对于HA模式,如果配置了ZooKeeper,还需要设置ZooKeeper Kerberos认证相关的配置。 zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka Client和Kafka Broker之间也需要做Kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient
  • 示例 创建一个新表orders,使用子句with指定创建表的存储格式、存储位置、以及是否为外表。 通过“auto.purge”参数可以指定涉及到数据移除操作(如DROP、DELETE、INSERT OVERWRITE、TRUNCATE TABLE)时是否清除相关数据: "auto.purge"='true'时,清除元数据和数据文件。 "auto.purge"='false'时,仅清除元数据,数据文件会移入HDFS回收站。默认值为“false”,且不建议用户修改此属性,避免数据删除后无法恢复。 CREATE TABLE orders (orderkey bigint,orderstatus varchar,totalprice double,orderdate date)WITH (format = 'ORC', location='/user',orc_compress='ZLIB',external=true, "auto.purge"=false);-- 通过DESC FORMATTED 语句,可以查看建表的详细信息desc formatted orders ; Describe Formatted Table ------------------------------------------------------------------------------ # col_name data_type comment orderkey bigint orderstatus varchar totalprice double orderdate date # Detailed Table Information Database: default Owner: admintest LastAccessTime: 0 Location: hdfs://hacluster/user Table Type: EXTERNAL_TABLE # Table Parameters: EXTERNAL TRUE auto.purge false orc.compress.size 262144 orc.compression.codec ZLIB orc.row.index.stride 10000 orc.stripe.size 67108864 presto_query_id 20220812_084110_00050_srknk@default@HetuEngine presto_version 1.2.0-h0.cbu.mrs.320.r1-SNAPSHOT transient_lastDdlTime 1660293670 # Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: serialization.format 1 (1 row) 创建一个新表,指定Row format: --建表时,指定表的字段分隔符为‘,’号(如果创建外表,要求数据文件中的每条记录的字段是以逗号进行分隔)CREATE TABLE student(id string,birthday string,grade int,memo string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';--建表时,指定字段分隔符为'\t',换行符为'\n'CREATE TABLE test(id int, name string ,tel string)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LINES TERMINATED BY '\n'STORED AS TEXTFILE; 如果表orders不存在,则创建表orders,并且增加表注释和列注释: CREATE TABLE IF NOT EXISTS orders (orderkey bigint,orderstatus varchar,totalprice double COMMENT 'Price in cents.',orderdate date)COMMENT 'A table to keep track of orders.';insert into orders values(202011181113,'online',9527,date '2020-11-11'),(202011181114,'online',666,date '2020-11-11'),(202011181115,'online',443,date '2020-11-11'),(202011181115,'offline',2896,date '2020-11-11'); 使用表orders的列定义创建表bigger_orders: CREATE TABLE bigger_orders (another_orderkey bigint,LIKE orders,another_orderdate date);SHOW CREATE TABLE bigger_orders ; Create Table --------------------------------------------------------------------- CREATE TABLE hive.default.bigger_orders ( another_orderkey bigint, orderkey bigint, orderstatus varchar, totalprice double, ordersdate date, another_orderdate date ) WITH ( external = false, format = 'ORC', location = 'hdfs://hacluster/user/hive/warehouse/bigger_orders', orc_compress = 'GZIP', orc_compress_size = 262144, orc_row_index_stride = 10000, orc_stripe_size = 67108864 ) (1 row) 标号① 建表示例: CREATE EXTERNAL TABLE hetu_test (orderkey bigint, orderstatus varchar, totalprice double, orderdate date) PARTITIONED BY(ds int) SORT BY (orderkey, orderstatus) COMMENT 'test' STORED AS ORC LOCATION '/user' TBLPROPERTIES (orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = 'orderstatus,totalprice'); 标号② 建表示例: CREATE EXTERNAL TABLE hetu_test1 (orderkey bigint, orderstatus varchar, totalprice double, orderdate date) COMMENT 'test' PARTITIONED BY(ds int) CLUSTERED BY (orderkey, orderstatus) SORTED BY (orderkey, orderstatus) INTO 16 BUCKETS STORED AS ORC LOCATION '/user' TBLPROPERTIES (orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = 'orderstatus,totalprice'); 标号③ 建表示例: CREATE TABLE hetu_test2 (orderkey bigint, orderstatus varchar, totalprice double, orderdate date, ds int) COMMENT 'This table is in Hetu syntax' WITH (partitioned_by = ARRAY['ds'], bucketed_by = ARRAY['orderkey', 'orderstatus'], sorted_by = ARRAY['orderkey', 'orderstatus'], bucket_count = 16, orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = ARRAY['orderstatus', 'totalprice'], external = true, format = 'orc', location = '/user');
  • 语法 ① CREATE TABLE [ IF NOT EXISTS ] [catalog_name.][db_name.]table_name ( { column_name data_type [ NOT NULL ] [ COMMENT col_comment] [ WITH ( property_name = expression [, ...] ) ] | LIKE existing_table_name [ { INCLUDING | EXCLUDING } PROPERTIES ] } [, ...] ) [ COMMENT table_comment ] [ WITH ( property_name = expression [, ...] ) ] ② CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { column_name data_type [ NOT NULL ] [ COMMENT comment ] [ WITH ( property_name = expression [, ...] ) ] | LIKE existing_table_name [ { INCLUDING | EXCLUDING } PROPERTIES ] } [, ...] ) [COMMENT 'table_comment'] [PARTITIONED BY(col_name data_type, ....)] [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name, col_name, ...)] INTO num_buckets BUCKETS] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION 'hdfs_path'] [TBLPROPERTIES (orc_table_property = value [, ...] ) ] ③ CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { column_name data_type [ NOT NULL ] [ COMMENT comment ] [ WITH ( property_name = expression [, ...] ) ] | LIKE existing_table_name [ { INCLUDING | EXCLUDING } PROPERTIES ] } [, ...] ) [PARTITIONED BY(col_name data_type, ....)] [SORT BY ([column [, column ...]])] [COMMENT 'table_comment'] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION 'hdfs_path'] [TBLPROPERTIES (orc_table_property = value [, ...] ) ]
  • 描述 使用CREATE TABLE创建一个具有指定列的、新的空表。使用CREATE TABLE AS创建带数据的表。 使用可选参数IF NOT EXISTS,如果表已经存在则不会报错。 WITH子句可用于在新创建的表或单列上设置属性,如表的存储位置(location)、是不是外表(external)等。 LIKE子句用于在新表中包含来自现有表的所有列定义。可以指定多个LIKE子句,从而允许从多个表中复制列。如果指定了INCLUDING PROPERTIES,则将所有表属性复制到新表中。如果WITH子句指定的属性名称与复制的属性名称相同,则将使用WITH子句中的值。默认是EXCLUDING PROPERTIES属性,而且最多只能为一个表指定INCLUDING PROPERTIES属性。 PARTITIONED BY能够用于指定分区的列;CLUSTERED BY能够被用于指定分桶的列;SORT BY和SORTED BY能够用于给指定的分桶列进行排序;BUCKETS能够被用于指定分桶数;EXTERNAL可用于指定创建外部表;STORED AS能被用于指定文件存储的格式;LOCATION能被用于指定在HDFS上存储的路径。 想要查看支持哪些column属性,可以运行以下命令,会显示当前对接的catalog分别支持哪些列属性。 SELECT * FROM system.metadata.column_properties; 想要查看支持哪些table属性,可以运行以下命令: SELECT * FROM system.metadata.table_properties; 下表为catalog为hive时的查询结果。 SELECT * FROM system.metadata.table_properties where catalog_name = 'hive'; catalog_name property_name default_value type description hive auto_purge false boolean Skip trash when table or partition is deleted hive avro_schema_url - varchar URI pointing to Avro schema for the table hive bucket_count 0 integer Number of buckets hive bucketed_by [] array(varchar) Bucketing columns hive bucketing_version - integer Bucketing version hive csv_escape - varchar CS V escape character hive csv_quote - varchar CSV quote character hive csv_separator - varchar CSV separator character hive external_location - varchar File system location URI for external table hive format ORC varchar Hive storage format for the table. Possible values: [ORC, PARQUET, AVRO, RCBINARY, RCTEXT, SEQUENCEFILE, JSON, TEXTFILE, TEXTFILE_MULTIDELIM, CSV] hive orc_compress GZIP varchar Compression codec used. Possible values: [NONE, SNAPPY, LZ4, ZSTD, GZIP, ZLIB] hive orc_compress_size 262144 bigint orc compression size hive orc_row_index_stride 10000 integer no. of row index strides hive orc_stripe_size 67108864 bigint orc stripe size hive orc_bloom_filter_columns [] array(varchar) ORC Bloom filter index columns hive orc_bloom_filter_fpp 0.05 double ORC Bloom filter false positive probability hive partitioned_by [] array(varchar) Partition columns hive sorted_by [] array(varchar) Bucket sorting columns hive textfile_skip_footer_line_count - integer Number of footer lines hive textfile_skip_header_line_count - integer Number of header lines hive transactional false boolean Is transactional property enabled
  • 限制 session属性可以设置bucket_count,默认值为-1,表示未设置。创建分区表时,如果bucket_count为-1且建表语句中未设置buckets,则使用默认值16。 默认外部表存储位置/user/hive/warehouse/{schema_name}/{table_name},其中{schema_name}为建表时使用的schema,{table_name}为表名。 指定属性“transactional=true”可以让表支持“原子性、一致性、隔离性、持久性”写入的事务能力,但是将表定义为事务表后,无法通过设置“transactional=false”将其退化为非事务表。 transactional='true'或 '0'在执行过程中不会进行类型转换,所以这种写法会抛出异常: Cannot convert ['true'] to boolean Cannot convert ['0'] to boolean 默认不允许向托管表(表属性external = true)插入数据,如需使用该功能,可参考注意事项,添加hive自定义属性:hive.non-managed-table-writes-enabled=true。 Mppdb有一个限制,数据库的标识符的最大长度为63,如果把标识符命名超过了最大长度,那么会被自动截取掉超出的部分,只留下最大长度的标识符。 跨域场景不支持建表。
  • 现象描述 MRS 3.3.1及之后版本,HBase默认适配开启HDFS多路读功能(HDFS多路读相关介绍请参见配置HDFS多路读)以降低读取延迟及适应网络变化,相关参数如表1所示。 表1 HBase适配HDFS多路读相关参数 参数名称 参数描述 默认值 取值范围 dfs.client.hedged.read.threshold.millis HDFS客户端决定是否启动多路读取之前等待第一个数据块的第一个字节的时间,单位:毫秒。 250 大于等于1 dfs.client.hedged.read.threadpool.size 多路读取线程池的大小,设置参数值大于0时启用多路读功能。 200 大于等于0 由于HDFS多路读功能在磁盘IO负载高的情况下可能导致性能劣化,在此场景下,HBase侧需要参考操作步骤关闭HDFS多路读功能。
  • 原因分析 HDFS进入安全模式后HBase服务异常,导致meta表下线;HDFS退出安全模式后,下线的meta表未上线,查看RegionServer日志存在“No namenode available to invoke create /hbase/WALs/xxxx.meta”报错。 由于meta表在HDFS故障恢复后的上线过程中无法记录上线状态,导致meta表无法正常上线,且Manager实例健康检查自动恢复重试存在重试次数限制,最终导致meta表上线失败。因此,HDFS退出安全模式后,需要手动介入进行恢复。
  • 示例 创建一个名为web的schema: CREATE SCHEMA web; 在指定路径创建schema,兼容写法示例: CREATE SCHEMA test_schema_5 LOCATION '/user/hive'; 在名为Hive的CATA LOG 下创建一个名为sales的schema: CREATE SCHEMA hive.sales; 如果当前catalogs下名为traffic的schema不存在时,则创建一个名为traffic的schema: CREATE SCHEMA IF NOT EXISTS traffic; 创建一个带属性的schema: CREATE DATABASE createtestwithlocation COMMENT 'Holds all values' LOCATION '/user/hive/warehouse/create_new' WITH dbproperties('name'='akku', 'id' ='9'); --通过describe schema|database 语句来查看刚创建的schemadescribe schema createtestwithlocation;
  • 使用方法 在创建的JDBC Connector Sink流表中添加“filter.record.enabled”和“filter.row-kinds”参数。 “filter.record.enabled”默认值为“false”。 “filter.row-kinds”默认值为“UPDATE_BEFORE, DELETE”。 SQL示例如下: CREATE TABLE user_score ( idx varchar(20), user_id varchar(20), score bigint) WITH ( 'connector' = 'kafka', 'topic' = 'topic-qk', 'properties.bootstrap.servers' = 'xxxx:21005', 'properties.group.id' = 'test_qk', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv');CREATE TABLE mysql_output ( idx varchar(20), user_id varchar(20), all_score bigint, PRIMARY KEY(idx, user_id) NOT ENFORCED) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQL的服务器IP:MySQL的服务器端口/mysql', 'table-name' = 'customer_t1', 'username' = 'username', --连接MySQL数据库的用户名 'password' = 'password',--连接MySQL数据库的密码 'filter.record.enabled' = 'true', 'filter.row-kinds' = 'UPDATE_BEFORE');insert into mysql_outputselect idx, user_id, sum(score) as all_scorefrom user_scoregroup by idx, user_id;
  • 基本语法 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name [ (columnTypeList)] USING iceberg [PARTITIONED BY (partition-expressions)] [LOCATION '(fully-qualified-uri)' ] [COMMENT 'table documentation' ] [TBLPROPERTIES ('key'='value', ...)]; 其中: PARTITIONED:用于指定分区字段。 LOCATION:用于指定表存储位置。 COMMENT:用于指定表描述。 TBLPROPERTIES:用于指定表属性。 Iceberg不支持创建CTAS或RTAS表。
  • 操作步骤 使用root登录客户端安装节点,执行如下命令: cd 客户端安装目录 执行以下命令环境变量并认证用户: source bigdata_env kinit 创建的业务用户(未开启Kerberos认证的用户请跳过该操作) 将以下参数添加到“Spark/spark/conf/spark-defaults.conf”文件中并保存: spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.local.type=hadoopspark.sql.catalog.local.warehouse=/tmp/ice-warehousespark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalogspark.sql.catalog.spark_catalog.type=hive 执行以下命令登录spark-sql客户端: Hadoop Catalog方式 spark-sql --master yarn \ --conf spark.sql.defaultCatalog=local \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.storeAssignmentPolicy=ANSI Hadoop Catalog方式不支持在创建数据库和创建表时指定Location。 如果使用Hadoop Catalog,需要设置操作用户具有3中配置的“spark.sql.catalog.local.warehouse”路径的写和执行权限。 Hive Catalog方式 spark-sql --master yarn \ --conf spark.sql.defaultCatalog=spark_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.storeAssignmentPolicy=ANSI 执行以下命令创建数据库: create database if not exists iceberg_db; 执行以下命令创建Iceberg表: use iceberg_db; CREATE TABLE if not exists sample_table (id bigint, data string) USING iceberg TBLPROPERTIES ('write.format.default'='parquet'); 执行以下命令向Iceberg表中插入数据: INSERT INTO sample_table VALUES (4, 'a'), (5, 'b'), (6, 'c'); 执行以下命令更新表数据: UPDATE sample_table set data = 'd' where id = 4; 执行以下命令删除数据: DELETE FROM sample_table where id = 5; 执行以下命令查询Iceberg表数据: select * from sample_table;
  • 基本语法 ALTER TABLE oldTableName RENAME TO newTableName; ALTER TABLE TableName SET TBLPROPERTIES; ALTER TABLE TableName ADD COLUMN; ALTER TABLE TableName RENAME COLUMN; ALTER TABLE TableName ALTER COLUMN; ALTER TABLE TableName DROP COLUMN; ALTER TABLE TableName ADD PARTITION FIELD; ALTER TABLE TableName DROP PARTITION FIELD; ALTER TABLE TableName REPLACE PARTITION FIELDALTER TABLE TableName WRITE ORDERED BY; 可空类型字段不能修改为非空类型字段, struct类型不支持修改字段。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全