华为云用户手册

  • 基于HBase本地二级索引查询数据 在具有索引的用户表中,可以使用Filter来查询数据。对于创建单索引和组合索引的用户表,使用过滤器查询的结果与没有使用索引的表相同,但数据查询性能高于没有使用索引的表。 索引的使用规则如下: 对于为一个或多个列创建单个索引的情况: 当将此列用于AND或OR查询筛选时,使用索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2)。 当在查询中使用“索引列和非索引列”进行过滤时,此索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1)。 当在查询中使用“索引列或非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)。 对于为多个列创建组合索引的情况: 当用于查询的列是组合索引的全部或部分列并且与组合索引具有相同的顺序时,使用索引会提高查询性能。 例如,为C1,C2和C3创建组合索引。 该索引在以下情况下生效: Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2) FILTER_CONDITION(IndexCol1) 该索引在下列情况下不生效: Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol3) FILTER_CONDITION(IndexCol2) FILTER_CONDITION(IndexCol3) 当在查询中使用“索引列和非索引列”进行过滤时,使用索引可提高查询性能。 例如: Filter_Condition(IndexCol1)AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1) 当在查询中使用“索引列或非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如: Filter_Condition(IndexCol1)OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2))OR(Filter_Condition(NonIndexCol1)) 当多个列用于查询时,只能为组合索引中的最后一列指定值范围,而其他列只能设置为指定值。 例如,为C1,C2和C3创建组合索引。在范围查询中,只能为C3设置数值范围,过滤条件为“C1 = XXX,C2 = XXX,C3 = 数值范围”。
  • 快速配置常用参数 其他参数在安装集群时已进行了适配,以下参数需要根据使用场景进行调整。以下参数除特别指出外,一般在Spark2x客户端的“spark-defaults.conf”文件中配置。 表1 快速配置常用参数 配置项 说明 默认值 spark.sql.parquet.compression.codec 对于非分区parquet表,设置其存储文件的压缩格式。 在JD BCS erver服务端的“spark-defaults.conf”配置文件中进行设置。 snappy spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。目前仅在YARN模式下有效。 JDB CS erver默认值为true,client默认值为false。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512m,2g)。 4G spark.sql.autoBroadcastJoinThreshold 当进行join操作时,配置广播的最大值。 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。 配置为-1时,将不进行广播。 10485760 spark.yarn.queue JDBCServer服务所在的Yarn队列。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 default spark.driver.memory 大集群下推荐配置32~64g驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512m, 2g)。 4G spark.yarn.security.credentials.hbase.enabled 是否打开获取HBase token的功能。如果需要Spark-on-HBase功能,并且配置了安全集群,参数值设置为“true”。否则设置为“false”。 false spark.serializer 用于串行化将通过网络发送或需要缓存的对象的类以序列化形式展现。 Java序列化的默认值适用于任何Serializable Java对象,但运行速度相当慢,所以建议使用org.apache.spark.serializer.KryoSerializer并配置Kryo序列化。可以是org.apache.spark.serializer.Serializer的任何子类。 org.apache.spark.serializer.JavaSerializer spark.executor.cores 每个执行者使用的内核个数。 在独立模式和Mesos粗粒度模式下设置此参数。当有足够多的内核时,允许应用程序在同样的worker上执行多个执行程序;否则,在每个worker上,每个应用程序只能运行一个执行程序。 1 spark.shuffle.service.enabled NodeManager中一个长期运行的辅助服务,用于提升Shuffle计算性能。 false spark.sql.adaptive.enabled 是否开启自适应执行框架。 false spark.executor.memoryOverhead 每个执行器要分配的堆内存量(单位为兆字节)。 这是占用虚拟机开销的内存,类似于内部字符串,其他内置开销等等。会随着执行器大小(通常为6-10%)而增长。 1GB spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false
  • HDFS文件系统目录简介 更多关于HDFS组件操作指导,请参考使用HDFS。 HDFS文件系统中目录结构如下表所示。 表1 HDFS文件系统目录结构(适用于 MRS 3.x之前版本) 路径 类型 简略功能 是否可以删除 删除的后果 /tmp/spark/sparkhive-scratch 固定目录 存放Spark JDBCServer中metastore session临时文件 否 任务运行失败 /tmp/sparkhive-scratch 固定目录 存放Spark cli方式运行metastore session临时文件 否 任务运行失败 /tmp/carbon/ 固定目录 数据导入过程中,如果存在异常CarbonData数据,则将异常数据放在此目录下 是 错误数据丢失 /tmp/Loader-${作业名}_${MR作业id} 临时目录 存放Loader Hbase bulkload作业的region信息,作业完成后自动删除 否 Loader Hbase Bulkload作业失败 /tmp/logs 固定目录 MR任务日志在HDFS上的聚合路径 是 MR任务日志丢失 /tmp/archived 固定目录 MR任务日志在HDFS上的归档路径 是 MR任务日志丢失 /tmp/hadoop-yarn/staging 固定目录 保存AM运行作业运行日志、作业概要信息和作业配置属性 否 任务运行异常 /tmp/hadoop-yarn/staging/history/done_intermediate 固定目录 所有任务运行完成后,临时存放/tmp/hadoop-yarn/staging目录下文件 否 MR任务日志丢失 /tmp/hadoop-yarn/staging/history/done 固定目录 周期性扫描线程定期将done_intermediate的日志文件转移到done目录 否 MR任务日志丢失 /tmp/mr-history 固定目录 存储预加载历史记录文件的路径 否 MR历史任务日志数据丢失 /tmp/hive 固定目录 存放Hive的临时文件 否 导致Hive任务失败 /tmp/hive-scratch 固定目录 Hive运行时生成的临时数据,如会话信息等 否 当前执行的任务会失败 /user/{user}/.sparkStaging 固定目录 存储SparkJDBCServer应用临时文件 否 executor启动失败 /user/spark/jars 固定目录 存放Spark executor运行依赖包 否 executor启动失败 /user/loader 固定目录 存放loader的作业脏数据以及HBase作业数据的临时存储目录 否 HBase作业失败或者脏数据丢失 /user/loader/etl_dirty_data_dir /user/loader/etl_hbase_putlist_tmp /user/loader/etl_hbase_tmp /user/mapred 固定目录 存放Hadoop相关的文件 否 导致Yarn启动失败 /user/hive 固定目录 Hive相关数据存储的默认路径,包含依赖的spark lib包和用户默认表数据存储位置等 否 用户数据丢失 /user/omm-bulkload 临时目录 HBase批量导入工具临时目录 否 HBase批量导入任务失败 /user/hbase 临时目录 HBase批量导入工具临时目录 否 HBase批量导入任务失败 /sparkJobHistory 固定目录 Spark eventlog数据存储目录 否 HistoryServer服务不可用,任务运行失败 /flume 固定目录 Flume采集到HDFS文件系统中的数据存储目录 否 Flume工作异常 /mr-history/tmp 固定目录 MapReduce作业产生的日志存放位置 是 日志信息丢失 /mr-history/done 固定目录 MR JobHistory Server管理的日志的存放位置 是 日志信息丢失 /tenant 添加租户时创建 配置租户在HDFS中的存储目录,系统默认将自动在“/tenant”目录中以租户名称创建文件夹。例如租户“ta1”,默认HDFS存储目录为“tenant/ta1”。第一次创建租户时,系统自动在HDFS根目录创建“/tenant”目录。支持自定义存储路径。 否 租户不可用 /apps{1~5}/ 固定目录 WebHCat使用到Hive的包的路径 否 执行WebHCat任务会失败 /hbase 固定目录 HBase数据存储目录 否 HBase用户数据丢失 /hbaseFileStream 固定目录 HFS文件存储目录 否 HFS文件丢失,且无法恢复 /ats/active 固定目录 HDFS路径,用于存储活动的应用程序的timeline数据 否 删除后会导致tez任务运行失败 /ats/done 固定目录 HDFS路径,用于存储完成的应用程序的timeline数据 否 删除后会自动创建 /flink 固定目录 存放checkpoint任务数据 否 删除会导致运行任务失败 表2 HDFS文件系统目录结构(适用于MRS 3.x及之后版本) 路径 类型 简略功能 是否可以删除 删除的后果 /tmp/spark2x/sparkhive-scratch 固定目录 存放Spark2x JDBCServer中metastore session临时文件 否 任务运行失败 /tmp/sparkhive-scratch 固定目录 存放Spark2x cli方式运行metastore session临时文件 否 任务运行失败 /tmp/logs/ 固定目录 存放container日志文件 是 container日志不可查看 /tmp/carbon/ 固定目录 数据导入过程中,如果存在异常CarbonData数据,则将异常数据放在此目录下 是 错误数据丢失 /tmp/Loader-${作业名}_${MR作业id} 临时目录 存放Loader Hbase bulkload作业的region信息,作业完成后自动删除 否 Loader Hbase Bulkload作业失败 /tmp/hadoop-omm/yarn/system/rmstore 固定目录 ResourceManager运行状态信息 是 ResourceManager重启后状态信息丢失 /tmp/archived 固定目录 MR任务日志在HDFS上的归档路径 是 MR任务日志丢失 /tmp/hadoop-yarn/staging 固定目录 保存AM运行作业运行日志、作业概要信息和作业配置属性 否 任务运行异常 /tmp/hadoop-yarn/staging/history/done_intermediate 固定目录 所有任务运行完成后,临时存放/tmp/hadoop-yarn/staging目录下文件 否 MR任务日志丢失 /tmp/hadoop-yarn/staging/history/done 固定目录 周期性扫描线程定期将done_intermediate的日志文件转移到done目录 否 MR任务日志丢失 /tmp/mr-history 固定目录 存储预加载历史记录文件的路径 否 MR历史任务日志数据丢失 /tmp/hive-scratch 固定目录 Hive运行时生成的临时数据,如会话信息等 否 当前执行的任务会失败 /user/{user}/.sparkStaging 固定目录 存储SparkJDBCServer应用临时文件 否 executor启动失败 /user/spark2x/jars 固定目录 存放Spark2x executor运行依赖包 否 executor启动失败 /user/loader 固定目录 存放loader的作业脏数据以及HBase作业数据的临时存储目录 否 HBase作业失败或者脏数据丢失 /user/loader/etl_dirty_data_dir /user/loader/etl_hbase_putlist_tmp /user/loader/etl_hbase_tmp /user/oozie 固定目录 存放oozie运行时需要的依赖库,需用户手动上传 否 oozie调度失败 /user/mapred/hadoop-mapreduce-3.1.1.tar.gz 固定文件 MR分布式缓存功能使用的各jar包 否 MR分布式缓存功能无法使用 /user/hive 固定目录 Hive相关数据存储的默认路径,包含依赖的spark lib包和用户默认表数据存储位置等 否 用户数据丢失 /user/omm-bulkload 临时目录 HBase批量导入工具临时目录 否 HBase批量导入任务失败 /user/hbase 临时目录 HBase批量导入工具临时目录 否 HBase批量导入任务失败 /spark2xJobHistory2x 固定目录 Spark2x eventlog数据存储目录 否 HistoryServer服务不可用,任务运行失败 /flume 固定目录 Flume采集到HDFS文件系统中的数据存储目录 否 Flume工作异常 /mr-history/tmp 固定目录 MapReduce作业产生的日志存放位置 是 日志信息丢失 /mr-history/done 固定目录 MR JobHistory Server管理的日志的存放位置 是 日志信息丢失 /tenant 添加租户时创建 配置租户在HDFS中的存储目录,系统默认将自动在“/tenant”目录中以租户名称创建文件夹。例如租户“ta1”,默认HDFS存储目录为“tenant/ta1”。第一次创建租户时,系统自动在HDFS根目录创建“/tenant”目录。支持自定义存储路径。 否 租户不可用 /apps{1~5}/ 固定目录 WebHCat使用到Hive的包的路径 否 执行WebHCat任务会失败 /hbase 固定目录 HBase数据存储目录 否 HBase用户数据丢失 /hbaseFileStream 固定目录 HFS文件存储目录 否 HFS文件丢失,且无法恢复 父主题: 使用HDFS
  • 访问文件浏览器(File Browser) 访问Hue WebUI。 单击,进入“File Browser”。 默认进入当前登录用户的主目录。 文件浏览器将显示目录中的子目录或文件以下信息: 表1 HDFS文件属性介绍 属性名 描述 “Name” 表示目录或文件的名称。 “Size” 表示文件的大小。 “User” 表示目录或文件的属主。 “Group” 表示目录或文件的属组。 “Permissions” 表示目录或文件的权限设置。 “Date” 表示目录或文件创建时间。 在搜索框输入关键字,系统会在当前目录自动搜索目录或文件。 清空搜索框的内容,系统会重新显示所有目录和文件。
  • 执行动作 单击,选择一个或多个目录或文件。 单击“Actions”,在弹出菜单选择一个操作。 “Rename”:表示重新命名一个目录或文件。 “Move”:表示移动文件,在“移至”选择新的目录并单击“移动”完成移动。 “Copy”:表示复制选中的文件或目录。 “Change permissions”:表示修改选中目录或文件的访问权限。 可以为属主、属组和其他用户设置“Read”、“Write”和“Execute”权限。 “Sticky”表示禁止HDFS的管理员、目录属主或文件属主以外的用户在目录中移动文件。 “Recursive”表示递归设置权限到子目录。 “Storage policies”:表示设置目录或文件在HDFS中的存储策略。 “Summary”:表示查看选中的文件或目录的HDFS存储信息。
  • 相关文档 如果执行start-balancer.sh,“hadoop-root-balancer-主机名.out”日志显示“Access denied for user test1. Superuser privilege is required”,请参见执行balance常见问题定位方法。 如果在HDFS客户端启动一个Balance进程,该进程被异常停止后,再次执行Balance操作时失败,请参见HDFS执行Balance时被异常停止如何处理。 如果数据出现不均衡,某磁盘过满而其他磁盘未写满,请参见非HDFS数据残留导致数据分布不均衡。 如果单个节点内DataNode的各磁盘使用率不均匀,请参见节点内DataNode磁盘使用率不均衡。 如果需要平衡正在运行的单个DataNode上的磁盘数据,请参见配置HDFS DiskBalancer磁盘均衡。
  • 操作场景 HDFS集群可能出现DataNode节点间磁盘利用率不平衡的情况,比如集群中添加新数据节点的场景。如果HDFS出现数据不平衡的状况,可能导致多种问题,比如MapReduce应用程序无法很好地利用本地计算的优势、数据节点之间无法达到更好的网络带宽使用率或节点磁盘无法利用等等。所以MRS集群管理员需要定期检查并保持DataNode数据平衡。 HDFS提供了一个容量均衡程序Balancer。通过运行这个程序,可以使得HDFS集群达到一个平衡的状态,使各DataNode磁盘使用率与HDFS集群磁盘使用率的偏差不超过阈值。图1和图2分别是Balance前后DataNode的磁盘使用率变化。 图1 执行均衡操作前DataNode的磁盘使用率 图2 执行均衡操作后DataNode的磁盘使用率 均衡操作时间估算受两个因素影响: 需要迁移的总数据量: 每个DataNode节点的数据量应大于(平均使用率-阈值)*平均数据量,小于(平均使用率+阈值)*平均数据量。若实际数据量小于最小值或大于最大值即存在不平衡,系统选择所有DataNode节点中偏差最多的数据量作为迁移的总数据量。 Balancer的迁移是按迭代(iteration)方式串行顺序处理的,每个iteration迁移数据量不超过10GB,每个iteration重新计算使用率的情况。 因此针对集群情况,可以大概估算每个iteration耗费的时间(可以通过执行Balancer的日志观察到每次iteration的时间),并用总数据量除以10GB估算任务执行时间。 由于按iteration处理,Balancer可以随时启动或者停止。
  • 使用dynamic策略执行distcp命令时,报错“Too many chunks created with splitRatio” 使用dynamic策略执行distcp命令时,命令异常退出,报“Too many chunks created with splitRatio”的错误。 这个问题的原因是“distcp.dynamic.max.chunks.tolerable”的值(默认值为20000)小于“distcp.dynamic.split.ratio”的值(默认为2)乘以Map数。即一般出现在Map数超过10000的情况。可以通过-m参数降低Map数小于10000: hadoop distcp -strategy dynamic -m 9500 hdfs://cluster1/source hdfs://cluster2/target 或通过-D参数指定更大的“distcp.dynamic.max.chunks.tolerable”的值: hadoop distcp -Ddistcp.dynamic.max.chunks.tolerable=30000 -strategy dynamic hdfs://cluster1/source hdfs://cluster2/target
  • update和overwrite选项用法说明 -update:用于被拷贝的文件在目标位置中不存在,或者更新目标位置中被拷贝文件的内容。 在使用update选项的情况下,如果被拷贝文件在目标位置中已经存在,但文件内容不同,则目标位置的文件内容会被更新。 -overwrite:用于覆盖在目标位置中已经存在的文件。 在使用overwrite选项的情况下,如果被拷贝文件在目标位置中已经存在,目标位置的文件依然会被覆盖。 如果多个源位置有相同名称的文件,则distcp命令会失败。 在不使用update和overwrite选项的情况下,如果被拷贝文件在目标位置中已经存在,则该文件会跳过。 不加选项和加两个选项中任一个选项的区别,示例如下: 假设,源位置的文件结构如下: hdfs://cluster1/source/first/1 hdfs://cluster1/source/first/2 hdfs://cluster1/source/second/10 hdfs://cluster1/source/second/20 不加选项的命令: hadoop distcp hdfs://cluster1/source/first hdfs://cluster1/source/second hdfs://cluster2/target 上述命令默认会在目标位置创建文件夹first、second,所以拷贝结果如下: hdfs://cluster2/target/first/1 hdfs://cluster2/target/first/2 hdfs://cluster2/target/second/10 hdfs://cluster2/target/second/20 加两个选项中任一个选项的命令,例如加update选项: hadoop distcp -update hdfs://cluster1/source/first hdfs://cluster1/source/second hdfs://cluster2/target 上述命令只会将源位置的内容拷贝到目标位置,所以拷贝结果如下: hdfs://cluster2/target/1 hdfs://cluster2/target/2 hdfs://cluster2/target/10 hdfs://cluster2/target/20
  • 相关文档 如果使用distcp命令拷贝空文件夹报错,请参见使用distcp命令拷贝空文件夹报错。 如果使用distcp命令在安全集群上执行失败并发生异常,请参见执行distcp命令报错如何处理。 执行distcp跨集群拷贝文件时,出现部分文件拷贝失败并报错“ Source and target differ in block-size. Use -pb to preserve block-sizes during copy. ”,请参见执行distcp跨集群拷贝文件报错“Source and target differ in block-size”。 更多使用distcp命令时常见问题如下:
  • 当使用distcp命令时,如果某些被拷贝的文件内容较大时如何处理 当使用distcp命令时,如果某些被拷贝的文件内容较大时,建议修改执行拷贝任务的mapreduce的超时时间。可以通过在distcp命令中指定mapreduce.task.timeout选项实现。例如,修改超时时间为30分钟,则命令如下: hadoop distcp -Dmapreduce.task.timeout=1800000 hdfs://cluster1/source hdfs://cluster2/target 您也可以使用选项filters,不对这种大文件进行拷贝,命令示例如下: hadoop distcp -filters /opt/client/filterfile hdfs://cluster1/source hdfs://cluster2/target 其中filterfile是本地文件,它的内容是多条用于匹配不拷贝文件路径的正则表达式,它的内容示例如下: .*excludeFile1.* .*excludeFile2.*
  • 操作步骤 登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行distcp命令的用户所属的用户组必须为supergroup组,且执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 执行distcp命令。例如: hadoop distcp hdfs://hacluster/source hdfs://hacluster/target
  • 操作步骤 以root用户登录已安装Hive客户端的节点。 执行以下命令,进入客户端安装目录,例如“/opt/client”。 cd /opt/client 执行以下命令,配置客户端环境变量。 source bigdata_env 在客户端中执行如下命令,执行用户认证操作,未启用Kerberos认证的集群请跳过该操作。 kinit 用户名 执行以下命令登录Hive客户端。 beeline 指定静态分区或者动态分区。 静态分区: 静态分区是指手动输入分区名称,在创建表时使用关键字PARTITIONED BY指定分区列名及数据类型。应用开发时,使用ALTER TABLE ADD PARTITION语句增加分区,以及使用LOAD DATA INTO PARTITION语句将数据加载到分区时,只能加载到静态分区。 动态分区:通过查询命令,将结果插入到某个表的分区时,可以使用动态分区。 动态分区通过在客户端执行如下命令开启: set hive.exec.dynamic.partition=true; 动态分区默认模式是“strict”,也就是必须至少指定一列为静态分区,在静态分区下建立动态子分区,可以通过如下设置分区模式为“nonstrict”开启完全的动态分区: set hive.exec.dynamic.partition.mode=nonstrict; 需注意: 动态分区可能导致一个DML语句创建大量的分区,对应创建大量新文件夹,对系统性能可能带来影响。 在文件数量大的情况下,执行一个SQL语句启动时间较长,可以在执行SQL语句之前执行“set mapreduce.input.fileinputformat.list-status.num-threads = 100;”命令缩短启动时间。“mapreduce.input.fileinputformat.list-status.num-threads”参数需要先添加到Hive的白名单才可设置。
  • 配置场景 Spark Streaming对接Kafka时,当Spark Streaming应用重启后,应用根据上一次读取的topic offset作为起始位置和当前topic最新的offset作为结束位置从Kafka上读取数据的。 Kafka服务的topic的leader异常后,如果Kafka的leader和follower的offset相差太大,用户重启Kafka服务,Kafka的follower和leader相互切换,则Kafka服务重启后,topic的offset变小。 如果Spark Streaming应用一直在运行,由于Kafka上topic的offset变小,会导致读取Kafka数据的起始位置比结束位置大,这样将无法从Kafka读取数据,应用报错。 如果在重启Kafka服务前,先停止Spark Streaming应用,等Kafka重启后,再重启Spark Streaming应用使应用从checkpoint恢复。此时,Spark Streaming应用会记录终止前读取到的offset位置,以此为基准读取后面的数据,而Kafka offset变小(例如从10万变成1万),Spark Streaming会等待Kafka leader的offset增长至10万之后才会去消费,导致新发送的offset在1万至10万之间的数据丢失。 针对上述背景,提供配置Streaming对接Kafka更高级别的可靠性。对接Kafka可靠性功能开启后,上述场景处理方式如下。 如果Spark Streaming应用在运行应用时Kafka上topic的offset变小,则会将Kafka上topic最新的offset作为读取Kafka数据的起始位置,继续读取后续的数据。 对于已经生成但未调度处理的任务,如果读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务会运行失败。 如果任务失败过多,则会将executor加入黑名单,从而导致后续的任务无法部署运行。此时用户可以通过配置“spark.blacklist.enabled”参数关闭黑名单功能,黑名单功能默认为开启。 如果Kafka上topic的offset变小后,Spark Streaming应用进行重启恢复终止前未处理完的任务如果读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务直接丢弃,不进行处理。
  • 操作步骤 以Hive客户端安装用户登录安装客户端的节点。 执行以下命令,切换到客户端安装目录,配置环境变量并认证用户。 切换至客户端安装目录: cd 客户端安装目录 加载环境变量: source bigdata_env 认证用户: kinit Hive业务用户 执行以下命令登录Hive客户端。 beeline 配置Hive视图的访问控制权限示例: 不采用“current_user”函数,要实现不同的用户访问不同数据,需要创建不同的视图: 将视图v1授权给用户hiveuser1,hiveuser1用户可以访问表table1中“type='hiveuser1'”的数据: create view v1 as select * from table1 where type='hiveuser1'; 将视图v2授权给用户hiveuser2,hiveuser2用户可以访问表table1中“type='hiveuser2'”的数据: create view v2 as select * from table1 where type='hiveuser2'; 采用“current_user”函数,则只需要创建一个视图: 将视图v分别赋给用户hiveuser1、hiveuser2,当hiveuser1查询视图v时,“current_user()”被自动转化为hiveuser1,当hiveuser2查询视图v时,“current_user()”被自动转化为hiveuser2: create view v as select * from table1 where type=current_user();
  • 注意事项 Group By数据倾斜 Group By也同样存在数据倾斜的问题,设置“hive.groupby.skewindata”为“true”,生成的查询计划会有两个MapReduce Job,第一个Job的Map输出结果会随机的分布到Reduce中,每个Reduce做聚合操作,并输出结果,这样的处理会使相同的Group By Key可能被分发到不同的Reduce中,从而达到负载均衡,第二个Job再根据预处理的结果按照Group By Key分发到Reduce中完成最终的聚合操作。 Count Distinct聚合问题 当使用聚合函数count distinct完成去重计数时,处理值为空的情况会使Reduce产生很严重的数据倾斜,可以将空值单独处理,如果是计算count distinct,可以通过where子句将该值排除掉,并在最后的count distinct结果中加1。如果还有其他计算,可以先将值为空的记录单独处理,再和其他计算结果合并。
  • 操作场景 在Hive中执行多表Join时,Hive支持开启CBO(Cost Based Optimization),系统会自动根据表的统计信息,例如数据量、文件数等,选出合适计划提高多表Join的效率。Hive需要先收集表的统计信息后才能使CBO正确的优化。 CBO优化器会基于统计信息和查询条件,尽可能地使Join顺序达到更优。但是也可能存在特殊情况导致Join顺序调整不准确。例如数据存在倾斜,以及查询条件值在表中不存在等场景,可能调整出非优化的Join顺序。 开启列统计信息自动收集时,需要在Reduce侧做聚合统计。对于没有Reduce阶段的insert任务,将会多出Reduce阶段,用于收集统计信息。
  • 缓冲区超时设置 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下: env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  • 约束与限制 本章节仅适用于MRS 3.3.1-LTS及之后版本。 管理面提交作业方式不支持开启动态脱敏特性。 不支持Hudi表的脱敏。 不支持非SQL使用方法的脱敏。 不支持涉及直接读写HDFS的操作的脱敏。 不支持复杂类型array、map、struct的脱敏。 只支持spark-beeline(JDBC 连接)方式提交Spark作业。 脱敏策略传递时,若目标表已有脱敏策略且与源表有冲突,则目标表脱敏策略强制重置为Custom:“***”。 当前仅支持int、char、varchar、date、decimal、float、bigint、timestamp 、tinyint、smallint、double、string、binary数据类型的脱敏,其中int、date、decimal、float、bigint、timestamp 、tinyint、smallint、double类型配置脱敏策略后,spark-beeline查询结果存在与策略预期不一致的现象,但查询结果非原始值,如需要与策略结果保持一致,则推荐使用“Nullify”脱敏策略。 对于不支持的数据类型,如果配置了脱敏策略或输出列涉及脱敏传递,最终都按“Nullify”脱敏策略处理。
  • 设置Hive动态分区表精准拦截 针对涉及动态分区的自读自写场景,由于动态分区在编译阶段获取不到输出的分区信息,无法判断是否存在自读自写场景,为了实现拦截功能,提供了nonstrict和strict两种拦截模式供用户选择。其中: nonstrict模式是按照表进行拦截的,即对提交的SQL任务判断是否存在查询的表和写入的表是同一张表,如果存在就进行拦截,否则不拦截。nonstrict模式的拦截逻辑是在SQL编译阶段完成的,该模式优点是拦截效率高,缺点是查询和写入分区所对应的表相同,分区不同时也会被拦截。 strict模式是按照分区进行拦截的,即对提交的SQL任务判断是否存在查询的分区和写入的分区相同,如果存在就进行拦截,否则不拦截。strict模式的拦截逻辑是在SQL任务执行MoveTask阶段完成的,该模式优点是拦截精度高,缺点是拦截成本高,因为需要等到SQL任务大部分逻辑运行完且运行到MoveTask阶段才识别是否需要进行拦截。
  • 如何选择重启策略 如果用户在作业失败后,不希望重试,则推荐使用No restart策略。 如果用户在作业失败后,希望对作业进行重试,推荐使用failure-rate策略。因为fixed-delay策略可能会因为网络、内存等硬件故障导致用户作业失败次数达到最大重试次数,从而导致作业失败。 为了防止在failure-rate策略下的无限重启,推荐如下参数配置: restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 10 min restart-strategy.failure-rate.delay: 10 s
  • FlinkServer作业重启策略介绍 Flink支持不同的重启策略,以在发生故障时控制作业是否重启以及如何重启。如果不指定重启策略,集群会使用默认的重启策略。用户也可以在提交作业时指定一个重启策略,可参考如何创建FlinkServer作业在作业开发界面配置(MRS 3.1.0及以后版本)。 重启策略也可以通过Flink的配置文件“客户端安装目录/Flink/flink/conf/flink-conf.yaml”中的参数“restart-strategy”指定,为全局配置,还可以在应用代码中动态指定,会覆盖全局配置。 重启策略包括失败率(failure-rate)和两种默认策略,默认策略为如下: 无重启(No restart):如果没有启用CheckPoint,默认使用该策略。 固定间隔(fixed-delay):如果启用了CheckPoint,但没有配置重启策略,默认使用该策略。
  • fixed-delay策略 发生故障时会尝试重启作业固定次数,如果超过了最大的尝试次数,作业最终会失败。并且在两次连续重启尝试之间,重启策略会等待固定的时间。 以配置如果重启失败了3次则认为该Job失败,重试时间间隔为10s为例,参数配置为: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
  • failure-rate策略 在作业失败后会直接重启,但超过设置的失败率后,作业会被认定为失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 以配置10分钟内如果重启失败了3次则认为该作业失败,重试时间间隔为10s为例,参数配置为: restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 10 min restart-strategy.failure-rate.delay: 10 s
  • 约束与限制 如果运行的SQL脚本依赖数据的失效性或SQL脚本中使用了一些内置时间函数,建议不启用断点重连机制,或者每次运行时使用新的batchid。因为重复执行时,可能由于某些SQL语句已经执行过了不再重新执行,导致获取到过期的数据。 一个SQL脚本里面会包含一个或多个子任务。如果SQL脚本中存在先创建再删除临时表的逻辑,建议将删除临时表的逻辑放到脚本的最后。假定删除临时表子任务的后续子任务执行失败,并且删除临时表的子任务之前的子任务用到了该临时表;当下一次以相同batchid执行该SQL脚本时,因为临时表在上一次执行时已被删除,则会导致删除临时表的子任务之前用到该临时表的子任务(不包括创建该临时表的子任务,因为上一次已经执行成功,本次不会再执行,仅可编译)编译失败。这种情况下,建议使用新的batchid执行脚本。
  • 操作步骤 以Hive客户端安装用户登录安装客户端的节点。 执行以下命令,切换到客户端安装目录,配置环境变量并认证用户。 切换至客户端安装目录: cd 客户端安装目录 加载环境变量: source bigdata_env 认证用户,如果集群未开启Kerberos认证请跳过该操作: kinit Hive业务用户 beeline启动断线重连功能。 beeline -e "${SQL}" --hivevar batchid=xxx 其中“xxxx”表示每一次通过beeline提交任务的批次号,通过该批次号,可以识别出先提交的任务。如果提交任务时不带批次号,该特性功能不会启用。“xxxx”的值是执行任务时指定的。 beeline kill正在运行的任务。 beeline -e "" --hivevar batchid=xxx --hivevar kill=true 登录beeline客户端,启动断线重连机制。 beeline 启动断线重连机制: set hivevar:batchid=xxx
  • 操作场景 在批处理任务运行过程中,beeline客户端由于网络异常等问题断线时,Hive能支持beeline在断线前已经提交的任务继续运行。当再次运行该批处理任务时,已经提交过的任务不再重新执行,直接从下一个任务开始执行。 在批处理任务运行过程中,HiveServer服务由于某些原因导致故障时,Hive能支持当再次运行该批处理任务时,已经成功执行完成的任务不再重新执行,直接从HiveServer2故障时正在运行的任务开始运行。
  • 参数说明 可在Hive参数配置界面,查看或根据实际需求修改Hive Beeline断线重连功能涉及的参数。 表1 Hive Beeline断线重连功能相关参数介绍 参数 参数说明 k.cleanup.finished.job.interval 启用Beeline断线重连功能后,执行清理任务的间隔时间,默认隔60s执行一次。 zk.cleanup.finished.job.outdated.threshold 启用Beeline断线重连功能后,节点的过期时间,每个批次的任务都会生成对应节点,从当前批次任务的结束时间开始算,如果超过60分钟,则表示已经过期了,那么就清除节点。 batch.job.max.retry.count 启用Beeline断线重连功能后,单批次任务的最大重试次数,当单批次的任务失败重试次数超过这个值,就会删除该任务记录,下次运行时将从头开始运行,默认是10次。 beeline.reconnect.zk.path 启用Beeline断线重连功能后,存储任务执行进度的根节点,Hive服务默认是“/beeline”。
  • 使用coalesce调整分片的数量 coalesce可以调整分片的数量。coalesce函数有两个参数: coalesce(numPartitions: Int, shuffle: Boolean = false) 当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。 遇到下列场景,可选择使用coalesce算子: 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。 当输入切片个数太大,导致程序无法正常运行时使用。 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。
  • 操作步骤 推荐:使用“SNAPPY”压缩,适用于压缩比和读取效率要求均衡场景。 Create table xx (col_name data_type) stored as orc tblproperties ("orc.compress"="SNAPPY"); xx为具体使用的Hive表名。 可用:使用“ZLIB”压缩,适用于压缩比要求较高场景。 Create table xx (col_name data_type) stored as orc tblproperties ("orc.compress"="ZLIB");
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全