华为云用户手册

  • 回答 目前出现上述问题时使用的是默认配置,如表1所示,HDFS客户端到NameNode的RPC连接存在keep alive机制,保持连接不会超时,尽力等待服务器的响应,因此导致已经连接的HDFS客户端的操作会长时间无响应。 对于已经长时间无响应的HDFS客户端,可以进行如下操作: 等待NameNode响应,一旦NameNode所在节点的CPU利用率回落,NameNode可以重新获得CPU资源时,HDFS客户端即可得到响应。 如果无法等待更长时间,需要重启HDFS客户端所在的应用程序进程,使得HDFS客户端重新连接空闲的NameNode。 解决措施: 为了避免该问题出现,可以在“客户端安装路径/HDFS/hadoop/etc/hadoop/core-site.xml”中做如下配置。 表1 参数说明 参数 描述 默认值 ipc.client.ping 当配置为true时,客户端会尽力等待服务端响应,定期发送ping消息,使得连接不会因为tcp timeout而断开。 当配置为false时,客户端会使用配置项“ipc.ping.interval”对应的值,作为timeout时间,在该时间内没有得到响应,即会超时。 在上述问题场景下,建议配置为false。 true ipc.ping.interval 当“ipc.client.ping”配置为true时,表示发送ping消息的周期。 当“ipc.client.ping”设置为false时,表示连接的超时时间。 在上述问题场景下,建议配置一个较大的超时时间,避免服务繁忙时的超时,建议配置为900000,单位为ms。 60000
  • 回答 “dfs.datanode.data.dir”配置项用于指定数据块在DataNode上的存储目录,在系统安装时需要指定根目录,并且可以指定多个根目录。 请谨慎修改该配置项,可以添加新的数据根目录。 禁止删除原有存储目录,否则会造成数据块丢失,导致文件无法正常读写。 禁止手动删除或修改存储目录下的数据块,否则可能会造成数据块丢失。 NameNode和JournalNode存在类似的配置项,也同样禁止删除原有存储目录,禁止手动删除或修改存储目录下的数据块。 dfs.namenode.edits.dir dfs.namenode.name.dir dfs.journalnode.edits.dir
  • 回答 由于在删除了大量文件之后,DataNode需要时间去删除对应的Block。当立刻重启NameNode时,NameNode会去检查所有DataNode上报的Block信息,发现已删除的Block时,会输出对应的INFO日志信息,如下所示: 2015-06-10 19:25:50,215 | INFO | IPC Server handler 36 on 25000 | BLOCK* processReport: blk_1075861877_2121067 on node 10.91.8.218:9866 size 10249 does not belong to any file | org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.processReport(BlockManager.java:1854) 每一个被删除的Block会产生一条日志信息,一个文件可能会存在一个或多个Block。当删除的文件数过多时,NameNode会花大量的时间打印日志,然后导致NameNode启动慢。 当出现这种现象时,您可以通过如下方式提升NameNode的启动速度。 删除大量文件时,不要立刻重启NameNode,待DataNode删除了对应的Block后重启NameNode,即不会存在这种情况。 您可以通过hdfs dfsadmin -report命令来查看磁盘空间,检查文件是否删除完毕。 如已大量出现以上日志,您可以将NameNode的日志级别修改为ERROR,NameNode不会再打印此日志信息。 等待NameNode启动完毕后,再将此日志级别修改为INFO。修改日志级别后无需重启服务。
  • 问题 MRS 3.x及之后版本,使用hbck工具检查Region状态,若日志中存在“ERROR: (regions region1 and region2) There is an overlap in the region chain.”或者“ERROR: (region region1) Multiple regions have the same startkey: xxx”信息,表示某些region存在overlap的问题,需要如何解决?
  • 配置步骤 MRS引入配置参数“dfs.pipeline.ecn”。当该配置启用时,DataNode会在写入通道超出负荷时从其中发出信号。客户端可以基于该阻塞信号进行退避,从而防止系统超出负荷。引入该配置参数的目的是为了使通道更加稳定,并减少不必要的取消或恢复操作。收到信号后,客户端会退避一定的时间(5000ms),然后根据相关过滤器调整退避时间(单次退避最长时间为50000ms)。 请参考修改集群服务配置参数,进入HDFS的“全部配置”页面,在搜索框中输入参数名称。 表1 NameNode ECN配置 参数 描述 缺省值 dfs.pipeline.ecn 进行该配置后,DataNode能够向客户端发送阻塞通知。 false
  • 回答 建议在使用Hudi时,schema应该以向后兼容的方式演进。此错误通常发生在使用向后不兼容的演进方式删除某些列如“col1”后,更新parquet文件中以旧的schema写入的列“col1”,在这种情况下,parquet尝试在传入记录中查找所有当前字段,当发现“col1”不存在时,发生上述异常。 解决这个问题的办法是使用所有schema演进版本来创建uber schema,并使用该schema作为target schema。用户可以从hive metastore中获取schema并将其与当前schema合并。
  • 回答 ZooKeeper IO瓶颈观测手段: 通过Manager的监控页面查看单个节点上ZooKeeper请求监控,判断是否严重超出规格限制。 通过观测ZooKeeper的日志以及HBase的日志,查看是否有大量的IO Exception Timeout或者SocketTimeout Exception异常。 调优建议: 将ZooKeeper实例个数调整为5个及以上,可以通过设置peerType=observer来增加observer的数目。 通过控制单个任务并发的map数或减少每个节点下运行task的内存,降低节点负载。 升级ZooKeeper数据磁盘,如SSD等。
  • Hudi Compaction操作说明 Compaction用于合并mor表Base和Log文件。 对于Merge-On-Read表,数据使用列式Parquet文件和行式Avro文件存储,更新被记录到增量文件,然后进行同步/异步compaction生成新版本的列式文件。Merge-On-Read表可减少数据摄入延迟,因而进行不阻塞摄入的异步Compaction很有意义。 异步Compaction会进行如下两个步骤: 调度Compaction:由入湖作业完成,在这一步,Hudi扫描分区并选出待进行compaction的FileSlice,最后CompactionPlan会写入Hudi的Timeline。 执行Compaction:一个单独的进程/线程将读取CompactionPlan并对FileSlice执行Compaction操作。 使用Compaction的方式分为同步和异步两种: 同步方式由参数hoodie.compact.inline控制,默认为true,自动生成compaction调度计划并执行compaction: 关闭同步compaction datasource写入时可以通过 .option("hoodie.compact.inline", "false") 来关闭自动compaction。 spark-sql写入时可以通过set hoodie.compact.inline=false;来关闭自动compaction。 仅同步生成compaction调度而不执行compaction ·datasource写入时可以通过以下option参数来实现: option("hoodie.compact.inline", "true"). option("hoodie.schedule.compact.only.inline", "true"). option("hoodie.run.compact.only.inline", "false"). ·spark-sql写入时可以通过set 以下参数来实现: set hoodie.compact.inline=true; set hoodie.schedule.compact.only.inline=true; set hoodie.run.compact.only.inline=false; 异步方式由spark-sql来实现。 如果需要在异步compaction时只执行已经产生的compaction调度计划而不创建新的调度计划,则需要通过set命令设置以下参数: set hoodie.compact.inline=true; set hoodie.schedule.compact.only.inline=false; set hoodie.run.compact.only.inline=true; 更多compaction参数请参考compaction&cleaning配置章节。 为了保证入湖的最高效率,推荐使用同步产生compaction调度计划,异步执行compaction调度计划的方式。 父主题: 数据管理维护
  • 写入操作配置 本章节介绍Hudi重要配置的详细信息,更多配置请参考hudi官网:http://hudi.apache.org/cn/docs/configurations.html。 表1 写入操作重要配置项 参数 描述 默认值 hoodie.datasource.write.table.name 指定写入的hudi表名。 无 hoodie.datasource.write.operation 写hudi表指定的操作类型,当前支持upsert、delete、insert、bulk_insert等方式。 upsert:更新插入混合操作 delete:删除操作 insert:插入操作 bulk_insert: 用于初始建表导入数据, 注意初始建表禁止使用upsert、insert方式 insert_overwrite:对静态分区执行insert overwrite insert_overwrite_table:动态分区执行insert overwrite,该操作并不会立刻删除全表做overwrite,会逻辑上重写hudi表的元数据,无用数据后续由hudi的clean机制清理。效率比bulk_insert + overwrite 高 upsert hoodie.datasource.write.table.type 指定hudi表类型,一旦这个表类型被指定,后续禁止修改该参数,可选值MERGE_ON_READ。 COPY_ON_WRITE hoodie.datasource.write.precombine.field 该值用于在写之前对具有相同的key的行进行合并去重。 指定为具体的表字段 hoodie.datasource.write.payload.class 在更新过程中,该类用于提供方法将要更新的记录和更新的记录做合并,该实现可插拔,如要实现自己的合并逻辑,可自行编写。 org.apache.hudi.common.model.DefaultHoodieRecordPayload hoodie.datasource.write.recordkey.field 用于指定hudi的主键,hudi表要求有唯一主键。 指定为具体的表字段 hoodie.datasource.write.partitionpath.field 用于指定分区键,该值配合hoodie.datasource.write.keygenerator.class使用可以满足不同的分区场景。 无 hoodie.datasource.write.hive_style_partitioning 用于指定分区方式是否和hive保持一致,建议该值设置为true。 true hoodie.datasource.write.keygenerator.class 配合hoodie.datasource.write.partitionpath.field,hoodie.datasource.write.recordkey.field产生主键和分区方式。 说明: 写入设置KeyGenerator与表保存的参数值不一致时将提示需要保持一致。 org.apache.hudi.keygen.ComplexKeyGenerator 父主题: Hudi常见配置参数
  • index相关配置 参数 描述 默认值 hoodie.index.class 用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于hoodie.index.type配置。 "" hoodie.index.type 使用的索引类型,默认为布隆过滤器。可能的选项是[BLOOM | HBASE | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE] 。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中。 BLOOM hoodie.index.bloom.num_entries 存储在布隆过滤器中的条目数。 假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。 注意: 将此值设置的太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置的非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。 60000 hoodie.index.bloom.fpp 根据条目数允许的错误率。 用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),在磁盘空间上进行权衡以降低误报率。 0.000000001 hoodie.bloom.index.parallelism 索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,根据输入的工作负载特征自动计算的。 0 hoodie.bloom.index.prune.by.ranges 为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。 true hoodie.bloom.index.use.caching 为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找。 true hoodie.bloom.index.use.treebased.filter 为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度。 true hoodie.bloom.index.bucketized.checking 为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差。 true hoodie.bloom.index.keys.per.bucket 仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。 此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。 10000000 hoodie.bloom.index.update.partition.path 仅在索引类型为GLOBAL_BLOOM时适用。 为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。 true hoodie.index.hbase.zkquorum 仅在索引类型为HBASE时适用,必填选项。要连接的HBase ZK Quorum URL。 无 hoodie.index.hbase.zkport 仅在索引类型为HBASE时适用,必填选项。要连接的HBase ZK Quorum端口。 无 hoodie.index.hbase.zknode.path 仅在索引类型为HBASE时适用,必填选项。这是根znode,它将包含HBase创建及使用的所有znode。 无 hoodie.index.hbase.table 仅在索引类型为HBASE时适用,必填选项。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。 无 父主题: Hudi常见配置参数
  • 前提条件 MRS集群管理员已明确业务需求,并准备一个Kafka管理员用户(属于kafkaadmin组,普通模式不需要)。 已安装Kafka客户端,客户端安装目录如“/opt/client”。 本示例需创建两个Topic,可参考7,分别命名为“test_2”和“test_3”,并创建“move-kafka-topic.json”文件,创建路径如“/opt/client/Kafka/kafka”,Topic格式内容如下: {"topics":[{"topic":"test_2"},{"topic":"test_3"}],"version":1}
  • 操作步骤 以客户端安装用户,登录安装Kafka客户端的节点。 切换到Kafka客户端安装目录。 cd /opt/client 执行以下命令,配置环境变量。 source bigdata_env 执行以下命令,进行用户认证。(普通模式跳过此步骤) kinit 组件业务用户 执行以下命令进入Kafka客户端的bin目录。 cd Kafka/kafka/bin 执行以下命令生成执行计划。 ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --topics-to-move-json-file ../move-kafka-topic.json --broker-list "1,2,3" --generate 172.16.0.119:ZooKeeper实例的业务IP。 --broker-list "1,2,3":参数中的“1,2,3”为扩容后的所有broker_id。 执行vim ../reassignment.json创建“reassignment.json”文件并保存,保存路径为“/opt/kafkaclient/Kafka/kafka”。 拷贝6中生成的“Proposed partition reassignment configuration”下的内容至“reassignment.json”文件,如下所示: {"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]} 执行以下命令进行分区重分布。 ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --execute --throttle 50000000 --throttle 50000000:限制网络带宽为50MB。带宽可根据数据量大小及客户对均衡时间的要求进行调整,5TB数据量,使用50MB带宽,均衡时长约8小时。 执行以下命令查看迁移状态。 ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --verify
  • 购买ELB并配置对接ClickHouse 购买ELB并获取其私有IP地址 详细操作步骤请参考创建共享型负载均衡器。 登录“弹性负载均衡器”控制台,在“负载均衡器”界面单击“购买弹性负载均衡”。 在“购买弹性负载均衡”界面,“实例规格类型”选择“共享型”,“所属VPC”和“子网”参数需要和MRS集群保持一致,其他参数保持默认即可。 单击“立即购买”,确认配置信息,并单击“提交”。 创建完成后,在“负载均衡器”界面,选择对应的区域即可看到新建的负载均衡器。查看并获取该负载均衡器的私有IP地址。 添加ELB监听器 详细操作步骤请参考添加监听器。
  • 回答 场景一:(import场景)使用sqoop import命令抽取开源postgre到MRS hdfs或hive等。 问题现象: 使用sqoop命令查询postgre表可以,但是执行sqoop import命令倒数时报错: The authentication type 12 is not supported. Check that you have configured the pg_hba.conf file to include the client's IP address or subnet, and that it The authentication type 5 is not supported. Check that you have configured the pg_hba.conf file to include the client's IP address or subnet, and that it 问题根因: 报错中type为5时:在执行sqoop import命令时,会启动MapReduce任务,由于MRS Hadoop安装目录(/opt/Bigdata/ FusionInsight _HD_*/1_*_NodeManager/install/hadoop/share/hadoop/common/lib)下自带了postgre驱动包gsjdbc4-*.jar,与开源postgre服务不兼容导致报错。 报错中type为12时:数据库的pg_hba.conf文件配置有误。 解决方案: 报错中type为5时:在每台MRS NodeManager实例所在节点上移动驱动包gsjdbc4-*.jar到tmp目录下。 mv /opt/Bigdata/FusionInsight_HD_*/1_*_NodeManager/install/hadoop/share/hadoop/common/lib/gsjdbc4-*.jar /tmp 报错中type为12时:调整数据库的pg_hba.conf文件,将address改成sqoop所在节点的ip。 场景二:(export场景)使用sqoop export命令抽取开源postgre到MRS hdfs或hive等。 问题现象: 使用sqoop命令查询postgre表可以,但是执行sqoop export命令倒数时报错:The authentication type 5 is not supported. Check that you have configured the pg_hba.conf file to include the client's IP address or subnet, and that it 问题根因: 在执行sqoop export命令时,会启动MapReduce任务,由于MRS Hadoop安装目录(/opt/Bigdata/FusionInsight_HD_*/1_*_NodeManager/install/hadoop/share/hadoop/common/lib)下自带了postgre驱动包gsjdbc4-*.jar,与开源postgre服务不兼容导致报错。 解决方案: 在每台MRS NodeManager实例所在节点上移动驱动包gsjdbc4-*.jar到tmp目录下。 mv /opt/Bigdata/FusionInsight_HD_*/1_*_NodeManager/install/hadoop/share/hadoop/common/lib/gsjdbc4-*.jar /tmp 将/opt/Bigdata/client/Hive/Beeline/lib/gsjdbc4-*.jar删除 。
  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
  • 操作场景 将datasource表的分区消息存储到Metastore中,并在Metastore中对分区消息进行处理。 优化datasource表,支持对表中分区执行增加、删除和修改等语法,从而增加与Hive的兼容性。 支持在查询语句中,把分区裁剪并下压到Metastore上,从而过滤掉不匹配的分区。 示例如下: select count(*) from table where partCol=1; //partCol列为分区列 此时,在物理计划中执行TableScan操作时,只处理分区(partCol=1)对应的数据。
  • 操作场景 对于Spark应用来说,资源是影响Spark应用执行效率的一个重要因素。当一个长期运行的服务(比如JD BCS erver),若分配给它多个Executor,可是却没有任何任务分配给它,而此时有其他的应用却资源紧张,这就造成了很大的资源浪费和资源不合理的调度。 动态资源调度就是为了解决这种场景,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。
  • 操作步骤 要启动Datasource表优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数介绍 参数 描述 默认值 spark.sql.hive.manageFilesourcePartitions 是否启用Metastore分区管理(包括数据源表和转换的Hive表)。 true:启用Metastore分区管理,即数据源表存储分区在Hive中,并在查询语句中使用Metastore修剪分区。 false:不启用Metastore分区管理。 true spark.sql.hive.metastorePartitionPruning 是否支持将predicate下压到Hive Metastore中。 true:支持,目前仅支持Hive表的predicate下压。 false:不支持 true spark.sql.hive.filesourcePartitionFileCacheSize 启用内存中分区文件元数据的缓存大小。 所有表共享一个可以使用指定的num字节进行文件元数据的缓存。 只有当“spark.sql.hive.manageFilesourcePartitions”配置为“true”时,该配置项才会生效。 250 * 1024 * 1024 spark.sql.hive.convertMetastoreOrc 设置ORC表的处理方式: false:Spark SQL使用Hive SerDe处理ORC表。 true:Spark SQL使用Spark内置的机制处理ORC表。 true
  • 回答 在进行rollup和cube操作时,用户通常是基于维度进行分析,需要的是度量的结果,因此不会对维度进行聚合操作。 例如当前有表src(d1, d2, m),那么语句1“select d1, sum(m) from src group by d1, d2 with rollup”就是对维度d1和d2进行上卷操作计算度量m的结果,因此有实际业务意义,而其结果也跟预期是一致的。但语句2“select d1, sum(d1) from src group by d1, d2 with rollup”则从业务上无法解释。当前对于语句2所有聚合(sum/avg/max/min)结果均为0。 只有在rollup和cube操作中对出现在group by中的字段进行聚合结果才是0,非rollup和cube操作其结果跟预期一致。
  • 回答 当Yarn配置“yarn.log-aggregation-enable”为“true”时,就开启了container日志聚合功能。 日志聚合功能是指:当应用在Yarn上执行完成后,NodeManager将本节点中所有container的日志聚合到HDFS中,并删除本地日志。详情请参见配置Container日志聚合功能。 然而,开启container日志聚合功能之后,其日志聚合至HDFS目录中,只能通过获取HDFS文件来查看日志。 开源Spark和Yarn服务不支持通过WebUI查看聚合后的日志。
  • 回答 在YARN中,当一个APP的节点被AM(ApplicationMaster)加入黑名单的数量达到一定比例(默认值为节点总数的33%)时,该AM会自动释放黑名单,从而不会出现由于所有可用节点都被加入黑名单而任务无法获取节点资源的现象。 在资源池场景下,假设该集群上有8个节点,通过NodeLabel特性将集群划分为两个资源池,pool A和pool B,其中pool B包含两个节点。用户提交了一个任务App1到pool B,由于HDFS空间不足,App1运行失败,导致pool B的两个节点都被App1的AM加入了黑名单,根据上述原则,2个节点小于8个节点的33%,所以YARN不会释放黑名单,使得App1一直无法得到资源而保持运行状态,后续即使被加入黑名单的节点恢复,App1也无法得到资源。 由于上述原则不适用于资源池场景,所以目前可通过调整客户端参数(路径为“客户端安装路径/Yarn/config/yarn-site.xml”)“yarn.resourcemanager.am-scheduling.node-blacklisting-disable-threshold”为:(nodes number of pool / total nodes )* 33%解决该问题。
  • 回答 场景一: add jar语句只会将jar加载到当前连接的JDB CS erver的jarClassLoader,不同JDBCServer不会共用。JDBCServer重启后会创建新的jarClassLoader,所以需要重新add jar。 添加jar包有两种方式:可以在启动spark-sql的时候添加jar包,如spark-sql --jars /opt/test/two_udfs.jar;也可在spark-sql启动后再添加jar包,如add jar /opt/test/two_udfs.jar。add jar所指定的路径可以是本地路径也可以是HDFS上的路径。
  • 回答 Streaming Context启动时,若应用设置了checkpoint,则需要对应用中的DStream checkpoint对象进行序列化,序列化时会用到dstream.context。 dstream.context是Streaming Context启动时从output Streams反向查找所依赖的DStream,逐个设置context。若Spark Streaming应用创建1个输入流,但该输入流无输出逻辑时,则不会给它设置context。所以在序列化时报“NullPointerException”。 解决办法:应用中如果有无输出逻辑的输入流,则在代码中删除该输入流,或添加该输入流的相关输出逻辑。
  • 解答 该应用程序中使用了DStream中的print算子来显示结果,该算子会调用RDD中的take算子来实现底层的计算。 Take算子会以Partition为单位多次触发计算。 在该问题中,由于Shuffle操作,导致take算子默认有两个Partition,Spark首先计算第一个Partition,但由于没有数据输入,导致获取结果不足10个,从而触发第二次计算,因此会出现RDD的DAG结构打印两次的现象。 在代码中将print算子修改为foreach(collect),该问题则不会出现。
  • 问题 在Spark2x的spark-shell上执行如下语句失败: val acctId = List(("49562", "Amal", "Derry"), ("00000", "Fred", "Xanadu")) val rddLeft = sc.makeRDD(acctId) val dfLeft = rddLeft.toDF("Id", "Name", "City") //dfLeft.show val acctCustId = List(("Amal", "49562", "CO"), ("Dave", "99999", "ZZ")) val rddRight = sc.makeRDD(acctCustId) val dfRight = rddRight.toDF("Name", "CustId", "State") //dfRight.show val dfJoin = dfLeft.join(dfRight, dfLeft("Id") === dfRight("CustId"), "outer") dfJoin.show dfJoin.repartition(1).write.format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").option("treatEmptyValuesAsNulls", "true").option("nullValue", "").save("/tmp/outputDir")
  • 问题 Spark Streaming应用创建1个输入流,但该输入流无输出逻辑。应用从checkpoint恢复启动失败,报错如下: 17/04/24 10:13:57 ERROR Utils: Exception encounteredjava.lang.NullPointerExceptionat org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:125)at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123)at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:515)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:510)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:191)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:186)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:142)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1230)at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:143)at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566)at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:612)at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:611)at com.spark.test.kafka08LifoTwoInkfk$.main(kafka08LifoTwoInkfk.scala:21)at com.spark.test.kafka08LifoTwoInkfk.main(kafka08LifoTwoInkfk.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:772)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  • 回答 问题分析 创建表。 CREATE TABLE TEST_TABLE(DATE varchar not null,NUM integer not null,SEQ_NUM integer not null,ACCOUNT1 varchar not null,ACCOUNTDES varchar,FLAG varchar,SALL double,CONSTRAINT PK PRIMARY KEY (DATE,NUM,SEQ_NUM,ACCOUNT1)); 创建全局索引 CREATE INDEX TEST_TABLE_INDEX ON TEST_TABLE(ACCOUNT1,DATE,NUM,ACCOUNTDES,SEQ_NUM); 插入数据 UPSERT INTO TEST_TABLE (DATE,NUM,SEQ_NUM,ACCOUNT1,ACCOUNTDES,FLAG,SALL) values ('20201001',30201001,13,'367392332','sffa1','',''); 执行BulkLoad任务更新数据 hbase org.apache.phoenix.mapreduce.CsvBulkLoadTool -t TEST_TABLE -i /tmp/test.csv,test.csv内容如下: 20201001 30201001 13 367392332 sffa888 1231243 23 问题现象:无法直接更新之前存在的索引数据,导致存在两条索引数据。 +------------+-----------+-----------+---------------+----------------+| :ACCOUNT1 | :DATE | :NUM | 0:ACCOUNTDES | :SEQ_NUM |+------------+-----------+-----------+---------------+----------------+| 367392332 | 20201001 | 30201001 | sffa1 | 13 || 367392332 | 20201001 | 30201001 | sffa888 | 13 |+------------+-----------+-----------+---------------+----------------+ 解决方法 删除旧的索引表。 DROP INDEX TEST_TABLE_INDEX ON TEST_TABLE; 异步方式创建新的索引表。 CREATE INDEX TEST_TABLE_INDEX ON TEST_TABLE(ACCOUNT1,DATE,NUM,ACCOUNTDES,SEQ_NUM) ASYNC; 索引重建。 hbase org.apache.phoenix.mapreduce.index.IndexTool --data-table TEST_TABLE --index-table TEST_TABLE_INDEX --output-path /user/test_table
  • 在集群外节点上安装客户端 创建一个满足要求的弹性云服务器,要求如下: 针对MRS 3.x之前版本的集群,需要先确认当前MRS集群节点的CPU架构。针对MRS 3.x之前版本的集群,该弹性云服务器的CPU架构请和MRS集群节点保持一致,MRS 3.x及之后版本MRS客户端兼容两种CPU架构。 已准备一个弹性云服务器,主机操作系统及版本请参见表1。 表1 参考列表 CPU架构 操作系统 支持的版本号 x86计算 Euler 可用:Euler OS 2.2 可用:Euler OS 2.3 可用:Euler OS 2.5 鲲鹏计算(ARM) Euler 可用:Euler OS 2.8 例如,用户可以选择操作系统为Euler的弹性云服务器准备操作。 同时为弹性云服务分配足够的磁盘空间,例如“40GB”。 弹性云服务器的VPC需要与MRS集群在同一个VPC中。 弹性云服务器的安全组需要和MRS集群Master节点的安全组相同。 如果不同,请修改弹性云服务器安全组或配置弹性云服务器安全组的出入规则允许MRS集群所有安全组的访问。 需要允许用户使用密码方式登录Linux弹性云服务器(SSH方式),请参见SSH密码方式登录。 MRS集群安全组入方向将所有端口对客户端节点放开,具体操作请参考添加安全组规则。 登录MRS Manager页面,具体请参见访问MRS Manager(MRS 3.x之前版本),然后选择“服务管理”。 单击“下载客户端”。 在“客户端类型”选择“完整客户端”。 在“下载路径”选择“远端主机”。 将“主机IP”设置为ECS的IP地址,设置“主机端口”为“22”,并将“存放路径”设置为“/tmp”。 如果使用SSH登录ECS的默认端口“22”被修改,请将“主机端口”设置为新端口。 “存放路径”最多可以包含256个字符。 “登录用户”设置为“root”。 如果使用其他用户,请确保该用户对保存目录拥有读取、写入和执行权限。 在“登录方式”选择“密码”或“SSH私钥”。 密码:输入创建集群时设置的root用户密码。 SSH私钥:选择并上传创建集群时使用的密钥文件。 单击“确定”开始生成客户端文件。 若界面显示以下提示信息表示客户端包已经成功保存。单击“关闭”。客户端文件请到下载客户端时设置的远端主机的“存放路径”中获取。 下载客户端文件到远端主机成功。 若界面显示以下提示信息,请检查用户名密码及远端主机的安全组配置,确保用户名密码正确,及远端主机的安全组已增加SSH(22)端口的入方向规则。然后从2执行重新下载客户端。 连接到服务器失败,请检查网络连接或参数设置。 生成客户端会占用大量的磁盘IO,不建议在集群处于安装中、启动中、打补丁中等非稳态场景下载客户端。 使用VNC方式,登录弹性云服务器。参见远程登录(VNC方式)。 所有镜像均支持Cloud-init特性。Cloud-init预配置的用户名“root”,密码为创建集群时设置的密码。首次登录建议修改。 执行ntp时间同步,使集群外节点的时间与MRS集群时间同步。 检查安装NTP服务有没有安装,未安装请执行yum install ntp -y命令自行安装。 执行vim /etc/ntp.conf命令编辑NTP客户端配置文件,并增加MRS集群中Master节点的IP并注释掉其他server的地址。 server master1_ip preferserver master2_ip 图1 增加Master节点的IP 执行service ntpd stop命令关闭NTP服务。 执行如下命令,手动同步一次时间: /usr/sbin/ntpdate 192.168.10.8 192.168.10.8为主Master节点的IP地址。 执行service ntpd start或systemctl restart ntpd命令启动NTP服务。 执行ntpstat命令查看时间同步结果。 在弹性云服务器,切换到root用户,并将6中“存放路径”中的安装包复制到目录“/opt”,例如“存放路径”设置为“/tmp”时命令如下。 sudo su - root cp /tmp/MRS_Services_Client.tar /opt 在“/opt”目录执行以下命令,解压压缩包获取校验文件与客户端配置包。 tar -xvf MRS_Services_Client.tar 执行以下命令,校验文件包。 sha256sum -c MRS_Services_ClientConfig.tar.sha256 界面显示如下: MRS_Services_ClientConfig.tar: OK 执行以下命令,解压“MRS_Services_ClientConfig.tar”。 tar -xvf MRS_Services_ClientConfig.tar 执行以下命令,安装客户端到新的目录,例如“/opt/Bigdata/client”。安装时自动生成目录。 sh /opt/MRS_Services_ClientConfig/install.sh /opt/Bigdata/client 查看安装输出信息,如有以下结果表示客户端安装成功: Components client installation is complete. 验证弹性云服务器节点是否与集群Master节点的IP是否连通? 例如,执行以下命令:ping Master节点IP地址 是,执行18。 否,检查VPC、安全组是否正确,是否与MRS集群在相同VPC和安全组,然后执行18。 执行以下命令配置环境变量: source /opt/Bigdata/client/bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit MRS集群用户 例如, kinit admin 执行组件的客户端命令。 例如,执行以下命令查看HDFS目录: hdfs dfs -ls /
  • 使用MRS客户端 在已安装客户端的节点,执行sudo su - omm命令切换用户。执行以下命令切换到客户端目录: cd /opt/client 执行以下命令配置环境变量: source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit MRS集群用户 例如,kinit admin。 启用Kerberos认证的MRS集群默认创建“admin”用户账号,用于集群管理员维护集群。 直接执行组件的客户端命令。 例如:使用HDFS客户端命令查看HDFS根目录文件,执行hdfs dfs -ls /。
  • ClickHouse表数据操作 客户端登录ClickHouse节点。例如: clickhouse client --host node-master3QsRI --multiline --port 9440 --secure; node-master3QsRI 参数为查看ClickHouse服务cluster等环境参数信息中2对应的host_name参数的值。 参考创建本地复制表和分布式表创建表后,可以插入数据到本地表。 例如插入数据到本地表:test insert into test values(toDateTime(now()), rand()); 查询本地表信息。 例如查询2中的表test数据信息: select * from test; SELECT *FROM test┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:10:42 │ 1596238076 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.002 sec. 查询Distributed分布式表。 例如3中因为分布式表test_all基于test创建,所以test_all表也能查询到和test相同的数据。 select * from test_all; SELECT *FROM test_all┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:10:42 │ 1596238076 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.004 sec. 切换登录节点为相同shard_num的shard节点,并且查询当前表信息,能查询到相同的表数据。 例如,退出原有登录节点:exit; 切换到节点node-group-1tXED0003: clickhouse client --host node-group-1tXED0003 --multiline --port 9440 --secure; 通过2可以看到node-group-1tXED0003和node-master3QsRI的shard_num值相同。 show tables; SHOW TABLES┌─name─────┐│ test ││ test_all │└────────┘ 查询本地表数据。例如在节点node-group-1tXED0003查询test表数据。 select * from test; SELECT *FROM test┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:10:42 │ 1596238076 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.005 sec. 切换到不同shard_num的shard节点,并且查询之前创建的表数据信息。 例如退出之前的登录节点node-group-1tXED0003: exit; 切换到node-group-1tXED0001节点。通过2可以看到node-group-1tXED0001和node-master3QsRI的shard_num值不相同。 clickhouse client --host node-group-1tXED0001 --multiline --port 9440 --secure; 查询test本地表数据,因为test是本地表所以在不同分片节点上查询不到数据。 select * from test; SELECT *FROM testOk. 查询test_all分布式表数据,能正常查询到数据信息。 select * from test_all; SELECT *FROM test┌───────────EventDate─┬─────────id─┐│ 2020-11-05 21:12:19 │ 3686805070 │└──────────────── ┴───────────┘1 rows in set. Elapsed: 0.002 sec.
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全