华为云用户手册

  • KrbServer及LdapServer简介 为了管理集群中数据与资源的访问控制权限,推荐安装安全模式集群。在安全模式下,客户端应用程序在访问集群中的任意资源之前均需要通过身份认证,建立安全会话链接。 MRS 通过KrbServer为所有组件提供Kerberos认证功能,实现了可靠的认证机制。 LdapServer支持轻量目录访问协议(Lightweight Directory Access Protocol,简称为LDAP),为Kerberos认证提供用户和用户组数据保存能力。
  • KrbServer及LdapServer结构 用户登录时安全认证功能主要依赖于Kerberos和LDAP。 图1 安全认证场景架构 图1可分为三类场景: 登录Manager WebUI 认证架构包含步骤1、2、3、4 登录组件Web UI 认证架构包含步骤5、6、7、8 组件间访问 认证架构为步骤9 表1 关键模块解释 名称 含义 Manager 集群Manager Manager WS WebBrowser Kerberos1 部署在Manager中的KrbServer(管理平面)服务,即 OMS Kerberos Kerberos2 部署在集群中的KrbServer(业务平面)服务 LDAP1 部署在Manager中的LdapServer(管理平面)服务,即OMS LDAP LDAP2 部署在集群中的LdapServer(业务平面)服务 Kerberos1访问LDAP数据:以负载均衡方式访问主备LDAP1两个实例和双备LDAP2两个实例。只能在主LDAP1主实例上进行数据的写操作,可以在LDAP1或者LDAP2上进行数据的读操作。 Kerberos2访问LDAP数据:读操作可以访问LDAP1和LDAP2,数据的写操作只能在主LDAP1实例进行。
  • KrbServer及LdapServer原理 Kerberos认证 图2 认证流程图 LDAP数据读写 图3 数据修改过程 LDAP数据同步 安装集群前OMS LDAP数据同步 图4 OMS LDAP数据同步 安装集群前数据同步方向:主OMS LDAP同步到备OMS LDAP。 安装集群后LDAP数据同步 图5 LDAP数据同步 安装集群后数据同步方向:主OMS LDAP同步到备OMS LDAP、备组件LDAP和备组件LDAP。
  • ZooKeeper开源增强特性:ZooKeeper SSL通信(Netty连接) ZooKeeper设计最初含有Nio包,且不能较好的支持3.5版本后的SSL。为了解决这个问题,Netty被加入到ZooKeeper中。所以如果用户需要使用SSL,启用Netty并设置Server端和Client端的以下参数。 开源的服务端只支持简单的文本密码,这可能导致相关安全问题。为此在服务端将不再使用此类文本密码。 Client端 将“zkCli.sh/zkEnv.sh”文件中的参数“-Dzookeeper.client.secure”设置为“true”以在Client端使用安全通信。之后客户端可以连接服务端的secureClientPort。 通过设置“zkCli.sh/zkEnv.sh”文件中的以下参数配置客户端环境。 参数 描述 -Dzookeeper.clientCnxnSocket 用于客户端的Netty通信。 默认值:"org.apache.zookeeper.ClientCnxnSocketNetty" -Dzookeeper.ssl.keyStore.location keystore文件路径。 -Dzookeeper.ssl.keyStore.password 加密密码。 -Dzookeeper.ssl.trustStore.location truststore文件路径。 -Dzookeeper.ssl.trustStore.password 加密密码。 -Dzookeeper.config.crypt.class 用于加密密码的解密。 -Dzookeeper.ssl.password.encrypted 默认值:false 当keystore和truststore的密码为加密密码时设置为true。 -Dzookeeper.ssl.enabled.protocols 通过配置此参数定义SSL协议以适用于SSL上下文。 -Dzookeeper.ssl.exclude.cipher.ext 通过配置此参数定义SSL上下文中应排除的密码列表,之间以逗号间隔。 以上参数须在“zkCli.sh/zk.Env.sh”文件内设置。 Server端 在文件“zoo.cfg”中将SSL端口参数“secureClientPort”设置为“3381”。 在server端将文件“zoo.cfg”中的参数“zookeeper.serverCnxnFactory”设置为“org.apache.zookeeper.server.NettyServerCnxnFactory”。 设置文件zoo.cfg(路径:“zookeeper/conf/zoo.cfg”)中的以下参数来配置服务端环境。 参数 描述 ssl.keyStore.location keystore.jks文件路径。 ssl.keyStore.password 加密密码。 ssl.trustStore.location truststore文件路径。 ssl.trustStore.password 加密密码。 config.crypt.class 用于加密密码的解密。 ssl.keyStore.password.encrypted 默认值:false 设置为true时可使用加密密码。 ssl.trustStore.password.encrypted 默认值:false 设置为true时可使用加密密码。 ssl.enabled.protocols 通过配置此参数定义SSL协议以适用于SSL上下文。 ssl.exclude.cipher.ext 通过配置此参数定义SSL上下文中应排除的密码列表,之间以逗号间隔。 启动ZKserver,然后将安全客户端连接到安全端口。 凭证 ZooKeeper上Client和Server之间的凭证由X509AuthenticationProvider执行。根据以下参数指定服务端证书及信任客户端证书,并通过这些证书初始化X509AuthenticationProvider。 zookeeper.ssl.keyStore.location zookeeper.ssl.keyStore.password zookeeper.ssl.trustStore.location zookeeper.ssl.trustStore.password 若用户不想使用ZooKeeper的默认机制,可根据所需配置不同的ZooKeeper信任机制。
  • MapReduce开源增强特性:特定场景优化MapReduce的Merge/Sort流程提升MapReduce性能 下图展示了MapReduce任务的工作流程。 图2 MapReduce 作业 图3 MapReduce作业执行流程 Reduce过程分为三个不同步骤:Copy、Sort(实际应当称为Merge)及Reduce。在Copy过程中,Reducer尝试从NodeManagers获取Maps的输出并存储在内存或硬盘中。紧接着进行Shuffle过程(包含Sort及Reduce),这个过程将获取到的Maps输出进行存储并有序地合并然后提供给Reducer。当Job有大量的Maps输出需要处理的时候,Shuffle过程将变得非常耗时。对于一些特定的任务(例如hash join或hash aggregation类型的SQL任务),Shuffle过程中的排序并非必须的。但是Shuffle却默认必须进行排序,所以需要对此处进行改进。 此特性通过对MapReduce API进行增强,能自动针对此类型任务关闭Sort过程。当Sort被关闭,获取Maps输出数据以后,直接合并后输出给Reduce,避免了由于排序而浪费大量时间。这种方式极大程度地提升了大部分SQL任务的效率。
  • MapReduce开源增强特性:History Server优化解决日志小文件问题 运行在Yarn上的作业在执行完成后,NodeManager会通过LogAggregationService把产生的日志收集到HDFS上,并从本地文件系统中删除。日志收集到HDFS上以后由HistoryServer来进行统一的日志管理。LogAggregationService在收集日志时会把container产生的本地日志合并成一个日志文件上传到HDFS,在一定程度上可以减少日志文件的数量。但在规模较大且任务繁忙的集群上,经过长时间的运行,HDFS依然会面临存储的日志文件过多的问题。 以一个20节点的计算场景为例,默认清理周期(15日)内将产生约1800万日志文件,占用NameNode近18G内存空间,同时拖慢HDFS的系统响应速度。 由于收集到HDFS上的日志文件只有读取和删除的需求,因此可以利用Hadoop Archives功能对收集的日志文件目录进行定期归档。 日志归档 在HistoryServer中新增AggregatedLogArchiveService模块,定期检查日志目录中的文件数。在文件数达到设定阈值时,启动归档任务进行日志归档,并在归档完成后删除原日志文件,以减少HDFS上的文件数量。 归档日志清理 由于Hadoop Archives不支持在归档文件中进行删除操作,因此日志清理时需要删除整个归档文件包。通过修改AggregatedLogDeletionService模块,获取归档日志中最新的日志生成时间,若所有日志文件均满足清理条件,则清理该归档日志包。 归档日志浏览 Hadoop Archives支持URI直接访问归档包中的文件内容,因此浏览过程中,当History Server发现原日志文件不存在时,直接将URI重定向到归档文件包中即可访问到已归档的日志文件。 本功能通过调用HDFS的Hadoop Archives功能进行日志归档。由于Hadoop Archives归档任务实际上是执行一个MR应用程序,所以在每次执行日志归档任务后,会新增一条MR执行记录。 本功能归档的日志来源于日志收集功能,因此只有在日志收集功能开启状态下本功能才会生效。
  • MapReduce开源增强特性:JobHistoryServer HA特性 JobHistoryServer(JHS)是用于查看MapReduce历史任务信息的服务器,当前开源JHS只支持单实例服务。JobHistoryServer HA能够解决JHS单点故障时,应用访问MapReduce接口无效,导致整体应用执行失败的场景,从而大大提升 MapReduce服务 的高可用性。 图1 JobHistoryServer HA主备倒换的状态转移过程 JobHistoryServer高可用性 采用ZooKeeper实现主备选举和倒换。 JobHistoryServer使用浮动IP对外提供服务。 兼容JHS单实例,也支持HA双实例。 同一时刻,只有一个节点启动JHS进程,防止多个JHS操作同一文件冲突。 支持扩容减容、实例迁移、升级、健康检查等。
  • YARN结构 YARN分层结构的本质是ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager将各个资源部分(计算、内存、带宽等)精心安排给基础NodeManager(YARN的每个节点代理)。ResourceManager还与Application Master一起分配资源,与NodeManager一起启动和监视它们的基础应用程序。在此上下文中,Application Master承担了以前的TaskTracker的一些角色,ResourceManager承担了JobTracker的角色。 Application Master管理一个在YARN内运行的应用程序的每个实例。Application Master负责协调来自ResourceManager的资源,并通过NodeManager监视容器的执行和资源使用(CPU、内存等的资源分配)。 NodeManager管理一个YARN集群中的每个节点。NodeManager提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1通过插槽管理Map和Reduce任务的执行,而NodeManager管理抽象容器,这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。 图1 YARN结构 图1中各部分的功能如表1所示。 表1 结构图说明 名称 描述 Client YARN Application客户端,用户可以通过客户端向ResourceManager提交任务,查询Application运行状态等。 ResourceManager(RM) 负责集群中所有资源的统一管理和分配。接收来自各个节点(NodeManager)的资源汇报信息,并根据收集的资源按照一定的策略分配给各个应用程序。 NodeManager(NM) NodeManager(NM)是YARN中每个节点上的代理,管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)。 ApplicationMaster(AM) 即图中的App Mstr,负责一个Application生命周期内的所有工作。包括:与RM调度器协商以获取资源;将得到的资源进一步分配给内部任务(资源的二次分配);与NM通信以启动/停止任务;监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。 Container Container是YARN中的资源抽象,封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等(目前仅封装内存和CPU),当AM向RM申请资源时,RM为AM返回的资源便是用Container表示。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。 在YARN中,资源调度器是以层级队列方式组织资源的,这种组织方式有利于资源在不同队列间分配和共享,进而提高集群资源利用率。如下图所示,Superior Scheduler和Capacity Scheduler的核心资源分配模型相同。 调度器会维护队列的信息。用户可以向一个或者多个队列提交应用。每次NM心跳的时候,调度器会根据一定规则选择一个队列,再选择队列上的一个应用,并尝试在这个应用上分配资源。若因参数限制导致分配失败,将选择下一个应用。选择一个应用后,调度器会处理此应用的资源申请。其优先级从高到低依次为:本地资源的申请、同机架的申请,任意机器的申请。 图2 资源分配模型
  • YARN原理 新的Hadoop MapReduce框架被命名为MRv2或YARN。YARN主要包括ResourceManager、ApplicationMaster与NodeManager三个部分。 ResourceManager:RM是一个全局的资源管理器,负责整个系统的资源管理和分配。主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager)。 调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念Container表示。Container是一个动态资源分配单位,将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等。 应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动等。 NodeManager:NM是每个节点上的资源和任务管理器,一方面,会定时向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,接收并处理来自AM的Container启动/停止等请求。 ApplicationMaster:AM负责一个Application生命周期内的所有工作。包括: 与RM调度器协商以获取资源。 将得到的资源进一步分配给内部的任务(资源的二次分配)。 与NM通信以启动/停止任务。 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
  • 开源容量调度器Capacity Scheduler原理 Capacity Scheduler是一种多用户调度器,它以队列为单位划分资源,为每个队列设定了资源最低保证和使用上限。同时,也为每个用户设定了资源使用上限以防止资源滥用。而当一个队列的资源有剩余时,可暂时将剩余资源共享给其他队列。 Capacity Scheduler支持多个队列,为每个队列配置一定的资源量,并采用FIFO调度策略。为防止同一用户的应用独占队列资源,Capacity Scheduler会对同一用户提交的作业所占资源量进行限定。调度时,首先计算每个队列使用的资源,选择使用资源最少的队列;然后按照作业优先级和提交时间顺序选择,同时考虑用户资源量的限制和内存限制。Capacity Scheduler主要有如下特性: 容量保证。MRS集群管理员可为每个队列设置资源最低保证和资源使用上限,而所有提交到队列的应用程序共享这些资源。 灵活性。如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新的应用程序提交,则占用资源的队列将资源释放给该队列。这种资源灵活分配的方式可明显提高资源利用率。 多重租赁。支持多用户共享集群和多应用程序同时运行。为防止单个应用程序、用户或者队列独占集群中的资源,MRS集群管理员可为之增加多重约束(比如单个应用程序同时运行的任务数等)。 安全保证。每个队列有严格的ACL列表规定它的访问用户,每个用户可指定哪些用户允许查看自己应用程序的运行状态或者控制应用程序。此外,MRS集群管理员可指定队列管理员和集群系统管理员。 动态更新配置文件。MRS集群管理员可根据需要动态修改配置参数以实现在线集群管理。 Capacity Scheduler中每个队列可以限制资源使用量。队列间的资源分配以使用量作为排列依据,使得容量小的队列有竞争优势。集群整体吞吐较大,延迟调度机制使得应用可以有机会放弃跨机器或者跨机架的调度,争取本地调度。
  • HDFS HA方案背景 在Hadoop 2.0.0之前,HDFS集群中存在单点故障问题。由于每个集群只有一个NameNode,如果NameNode所在机器发生故障,将导致HDFS集群无法使用,除非NameNode重启或者在另一台机器上启动。这在两个方面影响了HDFS的整体可用性: 当异常情况发生时,如机器崩溃,集群将不可用,除非重新启动NameNode。 计划性的维护工作,如软硬件升级等,将导致集群停止工作。 针对以上问题,HDFS高可用性方案通过自动或手动(可配置)的方式,在一个集群中为NameNode启动一个热替换的NameNode备份。当一台机器故障时,可以迅速地自动进行NameNode主备切换。或者当主NameNode节点需要进行维护时,通过MRS集群管理员控制,可以手动进行NameNode主备切换,从而保证集群在维护期间的可用性。 有关HDFS自动故障转移功能,请参阅: MRS 3.2.0之前版本:http://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html#Automatic_Failover MRS 3.2.0及之后版本:https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html#Automatic_Failover
  • HDFS HA实现方案 图1 典型的HA部署方式 在一个典型的HA集群中(如图1),需要把两个NameNodes配置在两台独立的机器上。在任何一个时间点,只有一个NameNode处于Active状态,另一个处于Standby状态。Active节点负责处理所有客户端操作,Standby节点时刻保持与Active节点同步的状态以便在必要时进行快速主备切换。 为保持Active和Standby节点的数据一致性,两个节点都要与一组称为JournalNode的节点通信。当Active对文件系统元数据进行修改时,会将其修改日志保存到大多数的JournalNode节点中,例如有3个JournalNode,则日志会保存在至少2个节点中。Standby节点监控JournalNodes的变化,并同步来自Active节点的修改。根据修改日志,Standby节点将变动应用到本地文件系统元数据中。一旦发生故障转移,Standby节点能够确保与Active节点的状态是一致的。这保证了文件系统元数据在故障转移时在Active和Standby之间是完全同步的。 为保证故障转移快速进行,Standby需要时刻保持最新的块信息,为此DataNodes同时向两个NameNodes发送块信息和心跳。 对一个HA集群,保证任何时刻只有一个NameNode是Active状态至关重要。否则,命名空间会分为两部分,有数据丢失和产生其他错误的风险。为保证这个属性,防止“split-brain”问题的产生,JournalNodes在任何时刻都只允许一个NameNode写入。在故障转移时,将变为Active状态的NameNode获得写入JournalNodes的权限,这会有效防止其他NameNode的Active状态,使得切换安全进行。 关于HDFS高可用性方案的更多信息,可参考如下链接: MRS 3.2.0之前版本:http://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html MRS 3.2.0及之后版本:https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
  • Impala Catalog Service 负责Impala的元数据管理,进程名为catalogd,将元数据的变化发送到所有的Impalad进程。当创建表、加载数据或者其他的一些从Hive发起的操作后,Impala查询之前需要在Impalad上执行REFRESH或者INVALIDATE METADATA刷新Catalog上缓存的元数据信息。如果元数据变化是通过Impala执行的,则不需要执行刷新。
  • Impala与其他组件的关系 Impala与HDFS间的关系 Impala默认利用HDFS作为其文件存储系统。Impala通过解析和计算处理结构化的数据,Hadoop HDFS则为Impala提供了高可靠性的底层存储支持。使用Impala将无需移动HDFS中的数据并且提供更快的访问。 Impala与Hive间的关系 Impala使用Hive的元数据、ODBC驱动程序和SQL语法。与Hive不同,Impala不基于MapReduce算法,它实现了一个基于守护进程的分布式架构,它负责在同一台机器上运行的查询执行的所有方面。因此,它减少了使用MapReduce的延迟,这使Impala比Hive快。 Impala与Kudu间的关系 Kudu与Impala紧密集成,替代Impala+HDFS+Parquet组合。允许使用Impala的SQL语法从Kudu tablets插入、查询、更新和删除数据。此外,还可以用JDBC或ODBC,Impala作为代理连接Kudu进行数据操作。 Impala与HBase间的关系 Impala表默认使用存储在HDFS上的数据文件,便于全表扫描的批量加载和查询。但是,HBase可以提供对OLTP样式组织的数据的便捷高效查询。
  • Impala Daemon Impala daemon的进程名为Impalad,是Impala的核心进程。 Impalad关键功能如下: 运行在所有的数据节点上。 读写数据文件。 接收来自于Impala-shell命令、Hue、JDBC或者ODBC等客户端的查询请求。 可以并行执行来自集群中其他节点的查询请求,将中间结果返回给调度节点。 可以调用节点将结果返回给客户端。 Impalad进程通过持续的和StateStore通信来确认自己所在的节点是否健康和是否可以接受新的任务请求。
  • Impala Impala直接对存储在HDFS、HBase或 对象存储服务 (OBS)中的Hadoop数据提供快速、交互式SQL查询。除了使用相同的统一存储平台之外,Impala还使用与Apache Hive相同的元数据,SQL语法(Hive SQL),ODBC驱动程序和用户界面(Hue中的Impala查询UI)。这为实时或面向批处理的查询提供了一个熟悉且统一的平台。作为查询大数据的工具的补充,Impala不会替代基于MapReduce构建的批处理框架,例如Hive。基于MapReduce构建的Hive和其他框架最适合长时间运行的批处理作业。 Impala主要特点如下: 支持Hive查询语言(HQL)中大多数的SQL-92功能,包括SELECT,JOIN和聚合函数。 HDFS,HBase和对象存储服务(OBS)存储,包括: HDFS文件格式:基于分隔符的Text file,Parquet,Avro,SequenceFile和RCFile。 压缩编解码器:Snappy,GZIP,Deflate,BZIP。 常见的数据访问接口包括: JDBC驱动程序。 ODBC驱动程序。 Hue beeswax和Impala查询UI。 Impala-shell命令行接口。 支持Kerberos身份认证。 Impala主要应用于实时查询数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景。 有关Impala的详细信息,请参见https://impala.apache.org/impala-docs.html。 Impala由Impalad、StateStore、Catalog 3个角色组成。
  • KafkaManager KafkaManager是Apache Kafka的管理工具,提供Kafka集群界面化的Metric监控和集群管理。 通过KafkaManager进行以下操作: 支持管理多个Kafka集群 支持界面检查集群状态(主题,消费者,偏移量,分区,副本,节点) 支持界面执行副本的leader选举 使用选择生成分区分配以选择要使用的分区方案 支持界面执行分区重新分配(基于生成的分区方案) 支持界面选择配置创建主题(支持多种Kafka版本集群) 支持界面删除主题(仅0.8.2版本并设置参数“delete.topic.enable = true”的集群支持) 支持批量生成多个主题的分区分配,并可选择要使用的分区方案 支持批量运行重新分配多个主题的分区 支持为已有主题增加分区 支持更新现有主题的配置 可以为分区级别和主题级别度量标准启用JMX查询 可以过滤掉zookeeper中没有ids / owner /&offsets /目录的使用者。 父主题: 组件介绍
  • Structured Streaming原理 Structured Streaming是构建在Spark SQL引擎上的流式数据处理引擎,用户可以使用Scala、Java、Python或R中的Dataset/DataFrame API进行流数据聚合运算、按事件时间窗口计算、流流Join等操作。当流数据连续不断的产生时,Spark SQL将会增量的、持续不断的处理这些数据并将结果更新到结果集中。同时,系统通过checkpoint和Write Ahead Logs确保端到端的完全一次性容错保证。 Structured Streaming的核心是将流式的数据看成一张不断增加的数据库表,这种流式的数据处理模型类似于数据块处理模型,可以把静态数据库表的一些查询操作应用在流式计算中,Spark执行标准的SQL查询,从不断增加的无边界表中获取数据。 图8 Structured Streaming无边界表 每一条查询的操作都会产生一个结果集Result Table。每一个触发间隔,当新的数据新增到表中,都会最终更新Result Table。无论何时结果集发生了更新,都能将变化的结果写入一个外部的存储系统。 图9 Structured Streaming数据处理模型 Structured Streaming在OutPut阶段可以定义不同的存储方式,有如下3种: Complete Mode:整个更新的结果集都会写入外部存储。整张表的写入操作将由外部存储系统的连接器完成。 Append Mode:当时间间隔触发时,只有在Result Table中新增加的数据行会被写入外部存储。这种方式只适用于结果集中已经存在的内容不希望发生改变的情况下,如果已经存在的数据会被更新,不适合适用此种方式。 Update Mode:当时间间隔触发时,只有在Result Table中被更新的数据才会被写入外部存储系统。注意,和Complete Mode方式的不同之处是不更新的结果集不会写入外部存储。
  • Spark应用运行原理 Spark的应用运行架构如图 Spark应用运行架构所示,运行流程如下所示: 应用程序(Application)是作为一个进程的集合运行在集群上的,由Driver进行协调。 在运行一个应用时,Driver会去连接集群管理器(Standalone、Mesos、YARN)申请运行Executor资源,并启动ExecutorBackend。然后由集群管理器在不同的应用之间调度资源。Driver同时会启动应用程序DAG调度、Stage划分、Task生成。 然后Spark会把应用的代码(传递给SparkContext的JAR或者Python定义的代码)发送到Executor上。 所有的Task执行完成后,用户的应用程序运行结束。 图2 Spark应用运行架构 Spark采用Master和Worker的模式,如图 Spark的Master和Worker所示。用户在Spark客户端提交应用程序,调度器将Job分解为多个Task发送到各个Worker中执行,各个Worker将计算的结果上报给Driver(即Master),Driver聚合结果返回给客户端。 图3 Spark的Master和Worker 在此结构中,有几个说明点: 应用之间是独立的。 每个应用有自己的executor进程,Executor启动多个线程,并行地执行任务。无论是在调度方面,或者是executor方面。各个Driver独立调度自己的任务;不同的应用任务运行在不同的JVM上,即不同的Executor。 不同Spark应用之间是不共享数据的,除非把数据存储在外部的存储系统上(比如HDFS)。 因为Driver程序在集群上调度任务,所以Driver程序建议和worker节点比较近,比如在一个相同的局部网络内。 Spark on YARN有两种部署模式: yarn-cluster模式下,Spark的Driver会运行在YARN集群内的ApplicationMaster进程中,ApplicationMaster已经启动之后,提交任务的客户端退出也不会影响任务的运行。 yarn-client模式下,Driver启动在客户端进程内,ApplicationMaster进程只用来向YARN集群申请资源。
  • Spark结构 Spark的结构如图1所示,各模块的说明如表 基本概念说明所示。 图1 Spark结构 表1 基本概念说明 模块 说明 Cluster Manager 集群管理器,管理集群中的资源。Spark支持多种集群管理器,Spark自带的Standalone集群管理器、Mesos或YARN,系统默认采用YARN模式。 Application Spark应用,由一个Driver Program和多个Executor组成。 Deploy Mode 部署模式,分为cluster和client模式。cluster模式下,Driver会在集群内的节点运行;而在client模式下,Driver在客户端运行(集群外)。 Driver Program 是Spark应用程序的主进程,运行Application的main()函数并创建SparkContext。负责应用程序的解析、生成Stage并调度Task到Executor上。通常SparkContext代表Driver Program。 Executor 在Work Node上启动的进程,用来执行Task,管理并处理应用中使用到的数据。一个Spark应用一般包含多个Executor,每个Executor接收Driver的命令,并执行一到多个Task。 Worker Node 集群中负责启动并管理Executor以及资源的节点。 Job 一个Action算子(比如collect算子)对应一个Job,由并行计算的多个Task组成。 Stage 每个Job由多个Stage组成,每个Stage是一个Task集合,由DAG分割而成。 Task 承载业务逻辑的运算单元,是Spark平台上可执行的最小工作单元。一个应用根据执行计划以及计算量分为多个Task。
  • Spark Streaming原理 Spark Streaming是一种构建在Spark上的实时计算框架,扩展了Spark处理大规模流式数据的能力。当前Spark支持两种数据处理方式: Direct Streaming Direct Streaming方式主要通过采用Direct API对数据进行处理。以Kafka Direct接口为例,与启动一个Receiver来连续不断地从Kafka中接收数据并写入到WAL中相比,Direct API简单地给出每个batch区间需要读取的偏移量位置。然后,每个batch的Job被运行,而对应偏移量的数据在Kafka中已准备好。这些偏移量信息也被可靠地存储在checkpoint文件中,应用失败重启时可以直接读取偏移量信息。 图4 Direct Kafka接口数据传输 需要注意的是,Spark Streaming可以在失败后重新从Kafka中读取并处理数据段。然而,由于语义仅被处理一次,重新处理的结果和没有失败处理的结果是一致的。 因此,Direct API消除了需要使用WAL和Receivers的情况,且确保每个Kafka记录仅被接收一次,这种接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性、高效性及易用性,因此推荐使用Direct Streaming方式处理数据。 Receiver 在一个Spark Streaming应用开始时(也就是Driver开始时),相关的StreamingContext(所有流功能的基础)使用SparkContext启动Receiver成为长驻运行任务。这些Receiver接收并保存流数据到Spark内存中以供处理。用户传送数据的生命周期如图5所示: 图5 数据传输生命周期 接收数据(蓝色箭头) Receiver将数据流分成一系列小块,存储到Executor内存中。另外,在启用预写日志(Write-ahead Log,简称WAL)以后,数据同时还写入到容错文件系统的预写日志中。 通知Driver(绿色箭头) 接收块中的元数据被发送到Driver的StreamingContext。这个元数据包括: 定位其在Executor内存中数据位置的块Reference ID。 若启用了WAL,还包括块数据在日志中的偏移信息。 处理数据(红色箭头) 对每个批次的数据,StreamingContext使用Block信息产生RDD及其Job。StreamingContext通过运行任务处理Executor内存中的Block来执行Job。 周期性的设置检查点(橙色箭头) 为了容错的需要,StreamingContext会周期性的设置检查点,并保存到外部文件系统中。 容错性 Spark及其RDD允许无缝的处理集群中任何Worker节点的故障。鉴于Spark Streaming建立于Spark之上,因此其Worker节点也具备了同样的容错能力。然而,由于Spark Streaming的长正常运行需求,其应用程序必须也具备从Driver进程(协调各个Worker的主要应用进程)故障中恢复的能力。使Spark Driver能够容错是件很棘手的事情,因为可能是任意计算模式实现的任意用户程序。不过Spark Streaming应用程序在计算上有一个内在的结构:在每批次数据周期性地执行同样的Spark计算。这种结构允许把应用的状态(也叫做Checkpoint)周期性地保存到可靠的存储空间中,并在Driver重新启动时恢复该状态。 对于文件这样的源数据,这个Driver恢复机制足以做到零数据丢失,因为所有的数据都保存在了像HDFS这样的容错文件系统中。但对于像Kafka和Flume等其他数据源,有些接收到的数据还只缓存在内存中,尚未被处理,就有可能会丢失。这是由于Spark应用的分布操作方式引起的。当Driver进程失败时,所有在Cluster Manager中运行的Executor,连同在内存中的所有数据,也同时被终止。为了避免这种数据损失,Spark Streaming引进了WAL功能。 WAL通常被用于数据库和文件系统中,用来保证任何数据操作的持久性,即先将操作记入一个持久的日志,再对数据施加这个操作。若施加操作的过程中执行失败了,则通过读取日志并重新施加前面指定的操作,系统就得到了恢复。下面介绍了如何利用这样的概念保证接收到的数据的持久性。 Kafka数据源使用Receiver来接收数据,是Executor中的长运行任务,负责从数据源接收数据,并且在数据源支持时还负责确认收到数据的结果(收到的数据被保存在Executor的内存中,然后Driver在Executor中运行来处理任务)。 当启用了预写日志以后,所有收到的数据同时还保存到了容错文件系统的日志文件中。此时即使Spark Streaming失败,这些接收到的数据也不会丢失。另外,接收数据的正确性只在数据被预写到日志以后Receiver才会确认,已经缓存但还没有保存的数据可以在Driver重新启动之后由数据源再发送一次。这两个机制确保了零数据丢失,即所有的数据或者从日志中恢复,或者由数据源重发。 如果需要启用预写日志功能,可以通过如下动作实现: 通过“streamingContext.checkpoint”设置checkpoint的目录,这个目录是一个HDFS的文件路径,既用作保存流的checkpoint,又用作保存预写日志。 设置SparkConf的属性“spark.streaming.receiver.writeAheadLog.enable”为“true”(默认值是“false”)。 在WAL被启用以后,所有Receiver都获得了能够从可靠收到的数据中恢复的优势。建议缓存RDD时不采取多备份选项,因为用于预写日志的容错文件系统很可能也复制了数据。 在启用了预写日志以后,数据接收吞吐率会有降低。由于所有数据都被写入容错文件系统,文件系统的写入吞吐率和用于数据复制的网络带宽,可能就是潜在的瓶颈了。在此情况下,建议创建更多的Recevier增加数据接收的并行度,或使用更好的硬件以增加容错文件系统的吞吐率。 恢复流程 当一个失败的Driver重启时,按如下流程启动: 图6 计算恢复流程 恢复计算(橙色箭头) 使用checkpoint信息重启Driver,重新构造SparkContext并重启Receiver。 恢复元数据块(绿色箭头) 为了保证能够继续下去所必备的全部元数据块都被恢复。 未完成作业的重新形成(红色箭头) 由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生RDD和对应的作业。 读取保存在日志中的块数据(蓝色箭头) 在这些作业执行时,块数据直接从预写日志中读出。这将恢复在日志中可靠地保存的所有必要数据。 重发尚未确认的数据(紫色箭头) 失败时没有保存到日志中的缓存数据将由数据源再次发送。因为Receiver尚未对其确认。 因此通过预写日志和可靠的Receiver,Spark Streaming就可以保证没有输入数据会由于Driver的失败而丢失。
  • SparkSession原理 SparkSession是Spark编程的统一API,也可看作是读取数据的统一入口。SparkSession提供了一个统一的入口点来执行以前分散在多个类中的许多操作,并且还为那些较旧的类提供了访问器方法,以实现最大的兼容性。 使用构建器模式创建SparkSession。如果存在SparkSession,构建器将自动重用现有的SparkSession;如果不存在则会创建一个SparkSession。 在I/O期间,在构建器中设置的配置项将自动同步到Spark和Hadoop。 import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.some.config.option", "config-value") .getOrCreate() SparkSession可以用于对数据执行SQL查询,将结果返回为DataFrame。 sparkSession.sql("select * from person").show SparkSession可以用于设置运行时的配置项,这些配置项可以在SQL中使用变量替换。 sparkSession.conf.set("spark.some.config", "abcd") sparkSession.conf.get("spark.some.config") sparkSession.sql("select ${spark.some.config}") SparkSession包括一个“catalog”方法,其中包含使用Metastore(即数据目录)的方法。方法返回值为数据集,可以使用相同的Dataset API来运行。 val tables = sparkSession.catalog.listTables() val columns = sparkSession.catalog.listColumns("myTable") 底层SparkContext可以通过SparkSession的SparkContext API访问。 val sparkContext = sparkSession.sparkContext
  • Spark简介 Spark是一个开源的,并行数据处理框架,能够帮助用户简单、快速的开发,统一的大数据应用,对数据进行离线处理,流式处理,交互式分析等等。 Spark提供了一个快速的计算、写入及交互式查询的框架。相比于Hadoop,Spark拥有明显的性能优势。Spark使用in-memory的计算方式,通过这种方式来避免一个MapReduce工作流中的多个任务对同一个数据集进行计算时的IO瓶颈。Spark利用Scala语言实现,Scala能够使得处理分布式数据集时,能够像处理本地化数据一样。除了交互式的数据分析,Spark还能够支持交互式的数据挖掘,由于Spark是基于内存的计算,很方便处理迭代计算,而数据挖掘的问题通常都是对同一份数据进行迭代计算。除此之外,Spark能够运行于安装Hadoop 2.0 Yarn的集群。之所以Spark能够在保留MapReduce容错性,数据本地化,可扩展性等特性的同时,能够保证性能的高效,并且避免繁忙的磁盘IO,主要原因是因为Spark创建了一种叫做RDD(Resilient Distributed Dataset)的内存抽象结构。 原有的分布式内存抽象,例如key-value store以及数据库,支持对于可变状态的细粒度更新,这一点要求集群需要对数据或者日志的更新进行备份来保障容错性。这样就会给数据密集型的工作流带来大量的IO开销。而对于RDD来说,它只有一套受限制的接口,仅支持粗粒度的更新,例如map,join等等。通过这种方式,Spark只需要简单的记录建立数据的转换操作的日志,而不是完整的数据集,就能够提供容错性。这种数据的转换链记录就是数据集的溯源。由于并行程序,通常是对一个大数据集应用相同的计算过程,因此之前提到的粗粒度的更新限制并没有想象中的大。事实上,Spark论文中阐述了RDD完全可以作为多种不同计算框架,例如MapReduce,Pregel等的编程模型。并且,Spark同时提供了操作允许用户显式地将数据转换过程持久化到硬盘。对于数据本地化,是通过允许用户能够基于每条记录的键值,控制数据分区实现的。(采用这种方式的一个明显好处是,能够保证两份需要进行关联的数据将会被同样的方式进行哈希)。如果内存的使用超过了物理限制,Spark将会把这些比较大的分区写入到硬盘,由此来保证可扩展性。 Spark具有如下特点: 快速:数据处理能力,比MapReduce快10-100倍。 易用:可以通过Java,Scala,Python,简单快速的编写并行的应用处理大数据量,Spark提供了超过80种的操作符来帮助用户组件并行程序。 普遍性:Spark提供了众多的工具,例如Spark SQL和Spark Streaming。可以在一个应用中,方便的将这些工具进行组合。 与Hadoop集成:Spark能够直接运行于Hadoop的集群,并且能够直接读取现存的Hadoop数据。 MRS服务的Spark组件具有以下优势: MRS服务的Spark Streaming组件支持数据实时处理能力而非定时触发。 MRS服务的Spark组件支持Structured Streaming,支持DataSet API来构建流式应用,提供了exactly-once的语义支持,流和流的join操作支持内连接和外连接。 MRS服务的Spark组件支持pandas_udf,可以利用pandas_udf替代pyspark中原来的udf对数据进行处理,可以减少60%-90%的处理时长(受具体操作影响)。 MRS服务的Spark组件支持Graph功能,支持图计算作业使用图进行建模。 MRS服务的SparkSQL兼容部分Hive语法(以Hive-Test-benchmark测试集上的64个SQL语句为准)和标准SQL语法(以tpc-ds测试集上的99个SQL语句为准)。 Spark的架构和详细原理介绍,请参见:https://spark.apache.org/docs/3.1.1/quick-start.html。
  • Hue与Hadoop集群的关系 Hue与Hadoop集群的交互关系如图1所示。 图1 Hue与Hadoop集群 表1 Hue与其它组件的关系 名称 描述 HDFS HDFS提供REST接口与Hue交互,用于查询、操作HDFS文件。 在Hue把用户请求从用户界面组装成接口数据,通过调用REST接口调用HDFS,通过浏览器返回结果呈现给用户。 Hive Hive提供THRIFT接口与Hue交互,用于执行Hive SQL、查询表元数据。 在Hue界面编辑HQL语句,通THRIFT接口提交HQL语句到HiveServer执行,同时把执行通过浏览器呈现给用户。 Yarn/MapReduce MapReduce提供REST与Hue交互,用于查询Yarn作业信息。 进入Hue页面,输入筛选条件参数,UI将参数发送到后台,Hue通过调用MapReduce(MR1/MR2-YARN)提供的REST接口,获取任务运行的状态,起始结束时间、运行日志等信息。 Oozie Oozie提供REST接口与Hue交互,用于创建工作流、Coordinator、Bundle,以及它们的任务管理和监控。 在Hue前端提供图形化工作流、Coordinator、Bundle编辑器,Hue调用Oozie REST接口对工作流、Coordinator、Bundle进行创建、修改、删除、提交、监控。 ZooKeeper ZooKeeper提供REST接口与Hue交互,用于查询ZooKeeper节点信息。 在Hue前端显示ZooKeeper节点信息,Hue调用ZooKeeper REST接口获取这些节点信息。 Impala Impala提供Hue beeswax接口与Hue交互,用于执行Hive SQL、查询表元数据。 在Hue界面编辑HQL语句,通Hue beeswax接口提交HQL语句到HiveServer执行,同时把执行结果通过浏览器呈现给用户。
  • Hive CBO原理介绍 CBO,全称是Cost Based Optimization,即基于代价的优化器。 其优化目标是: 在编译阶段,根据查询语句中涉及到的表和查询条件,计算出产生中间结果少的高效join顺序,从而减少查询时间和资源消耗。 Hive中实现CBO的总体过程如下: Hive使用开源组件Apache Calcite实现CBO。首先SQL语句转化成Hive的AST,然后转成Calcite可以识别的RelNodes。Calcite将RelNode中的Join顺序调整后,再由Hive将RelNode转成AST,继续Hive的逻辑优化和物理优化过程。流程图如图1所示: 图1 实现流程图 Calcite调整Join顺序的具体过程如下: 针对所有参与Join的表,依次选取一个表作为第一张表。 依据选取的第一张表,根据代价选择第二张表,第三张表。由此可以得到多个不同的执行计划。 计算出代价最小的一个计划,作为最终的顺序优化结果。 代价的具体计算方法: 当前版本,代价的衡量基于Join出来的数据条数:Join出来的条数越少,代价越小。Join条数的多少,取决于参与Join的表的选择率。表的数据条数,取自表级别的统计信息。 过滤条件过滤后的条数,由列级别的统计信息,max,min,以及NDV(Number of Distinct Values)来估算出来。 例如存在一张表table_a,其统计信息如下:数据总条数1000000,NDV 50,查询条件如下: Select * from table_a where colum_a='value1'; 则估算查询的最终条数为1000000 * 1/50 = 20000条,选择率为2%。 以下以TPC-DS Q3为例来介绍CBO是如何调整Join顺序的。 select dt.d_year, item.i_brand_id brand_id, item.i_brand brand, sum(ss_ext_sales_price) sum_agg from date_dim dt, store_sales, item where dt.d_date_sk = store_sales.ss_sold_date_sk and store_sales.ss_item_sk = item.i_item_sk and item.i_manufact_id = 436 and dt.d_moy = 12 group by dt.d_year , item.i_brand , item.i_brand_id order by dt.d_year , sum_agg desc , brand_id limit 10; 语句解释:这个语句由三张表来做Inner join,其中store_sales是事实表,有约2900000000条数据,date_dim是维度表,有约73000条数据,item是维度表,有约18000条数据。每一个表上都有过滤条件,其Join关系如所图2示: 图2 Join关系 CBO应该先选择能起到更好过滤效果的表来Join。 通过分析min,max,NDV,以及数据条数。CBO估算出不同维度表的选择率,详情如表1所示。 表1 数据过滤 表名 原始数据条数 过滤后数据条数 选择率 date_dim 73000 6200 8.5% item 18000 19 0.1% 上述表格获取到原始表的数据条数,估算出过滤后的数据条数后,计算出选择率=过滤后条数/原始条数。 从上表可以看出,item表具有较好的过滤效果,因此CBO将item表的Join顺序提前。 CBO未开启时的Join示意图如图3所示: 图3 未开启CBO CBO开启后的Join示意图如图4所示: 图4 开启CBO 可以看出,优化后中间结果由495000000条减少到了2900000条,执行时间也大幅减少。
  • 新建Wiki 通过知识库主页进入个人Wiki。 单击左侧导航个人Wiki,进入个人Wiki页面。 单击右上角,进入新建Wiki窗口。 设置名称和公开性。 公开性请根据实际情况设置。默认属性为“个人可见”。 单击“确定”。创建好的Wiki显示在列表中。 在Wiki列表中,可以根据需要修改Wiki名称、并进行分享、收藏或删除操作。 进入Wiki中,可以根据需要修改Wiki的设置,查看导出内容,并进行分享、收藏、删除Wiki操作。
  • 新建子工作项 新建好的子工作项显示在“子工作项”页签中。 Task和Bug是最末层次,Task和Bug没有子工作项。 增加子工作项有两种方法: 操作方法一 进入工作项详情,切换到“子工作项”页签。 单击“快速新建子工作项”可以快速完成子工作项创建。适合于快速场景的创建。 单击“新建子工作项”,可以设置更多参数信息,适合于完整创建子工作项。 操作方法二 在工作项列表中,分为快速创建和完整创建: 单击工作项所在行,只需输入关键信息快速完成创建。 单击工作项所在行,创建完整信息的子工作项。
  • 开通 KooMessage 服务 云消息 服务(KooMessage)需要先开通才能订购和使用,当前仅支持华为云主账号开通。 您可以自助开通免审核。 进入KooMessage官网首页。 登录KooMessage控制台。 KooMessage服务只向通过认证的企业提供开通,如还未完成企业用户认证,请单击“去认证企业用户”进行认证。 勾选“我已阅读并同意《KooMessage服务使用声明》”,单击“立即开通”。 图1 开通KooMessage服务 开通成功后进入控制台首页,且后续直接登录使用无需再开通。
  • 测试命令 生产命令: sh producer.sh -n "${连接地址}" -t ${Topic名称} -s ${消息大小} -w ${生产者线程数} 连接地址:购买RocketMQ实例后,获取实例的连接地址。 Topic名称:创建Topic时设置的Topic名称。 消息大小:1KB 生产者线程数:256 消费命令: sh consumer.sh -n "${连接地址}" -t ${Topic名称} -g ${消费组名称} 连接地址:购买RocketMQ实例后,获取实例的连接地址。 Topic名称:创建Topic时设置的Topic名称。 消费组名称:创建消费组时设置的消费组名称。
  • 测试结果 表4 测试结果 性能指标 rocketmq.b2.large.4 rocketmq.b2.large.8 rocketmq.b2.large.12 实例生产速率 1008.500 Count/s 2019.710 Count/s 3011.640 Count/s 实例消费速率 1008.520 Count/s 2019.710 Count/s 3010.590 Count/s 平均生产时延 26.336 ms 8.788 ms 5.876 ms
共100000条