华为云用户手册

  • 代码样例 下面代码片段在com.huawei.bigdata.hbase.examples包的“TestMain”类的init方法中。 private static void init() throws IOException { // Default load from conf directory conf = HBaseConfiguration.create(); //In Windows environment String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator; //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; conf.addResource(new Path(userdir + "core-site.xml"), false); conf.addResource(new Path(userdir + "hdfs-site.xml"), false); conf.addResource(new Path(userdir + "hbase-site.xml"), false); }
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 场景说明 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: DataStream应用程序可以在Windows环境和Linux环境中运行。 实时统计总计网购时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 数据规划 DataStream样例工程的数据存储在文本中。 将log1.txt和log2.txt放置在指定路径下,例如"/opt/log1.txt"和"/opt/log2.txt"。 数据文件若存放在本地文件系统,需在所有部署Yarn NodeManager的节点指定目录放置,并设置运行用户访问权限。 若将数据文件放置于HDFS,需指定程序中读取文件路径HDFS路径,例如"hdfs://hacluster/path/to/file"。
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单地互通,而需要在通信之前进行相互认证,以确保通信的安全性。用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。 当前Flink系统支持认证和加密传输,要使用认证和加密传输,用户需要安装Flink客户端并配置安全认证,本章节以“/opt/hadoopclient”为客户端安装目录为例,介绍安装客户端及配置安全认证。客户端安装目录请根据实际修改。
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据。 数据包含两个属性:分别是Int和String类型。 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如: nettyconnector.ssl.enabled: true nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 安全认证配置: Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置,具体配置请参见配置管理Flink。 SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见配置Flink应用安全认证。 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySink的并发度相同,否则无法正常创建连接。
  • Flink样例工程介绍 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Flink相关样例工程,安全模式路径为“flink-examples/flink-examples-security”,普通模式路径为“flink-examples/flink-examples-normal”: 表2 Flink相关样例工程 样例工程 描述 FlinkCheckpointJavaExample 异步Checkpoint机制程序的应用开发示例。 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。 相关业务场景介绍请参见Flink开启Checkpoint样例程序。 FlinkCheckpointScalaExample FlinkHBaseJavaExample 通过Flink API作业读写HBase数据的应用开发示例。 相关业务场景介绍请参见Flink读取HBase表样例程序。 FlinkHudiJavaExample 通过Flink API作业读写Hudi数据的应用开发示例。 相关业务场景介绍请参见Flink读取Hudi表样例程序。 FlinkKafkaJavaExample 向Kafka生产并消费数据程序的应用开发示例。 通过调用flink-connector-kafka模块的接口,生产并消费数据。 相关业务场景介绍请参见Flink Kafka样例程序。 FlinkKafkaScalaExample FlinkPipelineJavaExample Job Pipeline程序的应用开发示例。 相关业务场景介绍请参见Flink Job Pipeline样例程序。 发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据并打印输出。 FlinkPipelineScalaExample FlinkRESTAPIJavaExample 调用FlinkServer的RestAPI创建租户的应用开发示例。 相关业务场景介绍请参见FlinkServer REST API样例程序。 FlinkStreamJavaExample DataStream程序的应用开发示例。 相关业务场景介绍请参见Flink DataStream样例程序。 假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,可通过Flink应用程序实现例如实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息的功能。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Stream SQL Join程序的应用开发示例。 相关业务场景介绍请参见Flink Join样例程序。 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。实现实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询的功能。 FlinkStreamSqlJoinScalaExample flink-sql 使用客户端通过jar作业提交SQL作业的应用开发示例。 相关业务场景介绍请参见Flink Jar作业提交SQL样例程序。 pyflink-example 提供Python读写Kafka作业和Python提交SQL作业的样例。 相关业务场景介绍请参见PyFlink样例程序。
  • 架构 Flink架构如图2所示。 图2 Flink架构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • 组件介绍 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • Flink基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • MRS组件应用开发流程说明 通常MRS组件应用开发流程如下所示,各组件应用的开发编译操作可参考组件开发指南对应章节。 图1 MRS组件应用开发流程 表1 MRS组件应用开发流程说明 阶段 说明 准备开发环境 在进行应用开发前,需首先准备开发环境,推荐使用IntelliJ IDEA工具,同时本地需完成JDK、Maven等初始配置。 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通。 配置并导入样例工程 MRS提供了不同组件场景下的多种样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置安全认证 连接开启了Kerberos认证的MRS集群时,应用程序中需配置具有相关资源访问权限的用户进行安全认证。 根据业务场景开发程序 根据实际业务场景开发程序,调用组件接口实现对应功能。 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。
  • 样例工程运行依赖包参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: 表1 样例工程运行依赖包 样例工程 依赖包 依赖包获取地址 DataStream程序 异步Checkpoint机制程序 flink-dist_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 使用Flink Jar提交SQL作业程序 FlinkServer REST API程序 flink-dist_*.jar flink-table_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 向Kafka生产并消费数据程序 kafka-clients-*.jar flink-connector-kafka_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 pipeline程序 flink-connector-netty_*.jar flink-dist_*.jar flink-connector-netty_*.jar可在二次开发样例代码编译后产生的lib文件夹下获取。 flink-dist_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Stream SQL Join程序 kafka-clients-*.jar flink-connector-kafka_*.jar flink-dist_*.jar flink-table_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Flink读写HBase程序 flink-connector-hbase*.jar flink-dist_*.jar flink-table_*.jar hbase-clients-*.jar flink-connector-hbase_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 hbase-clients-*.jar由HBase组件发布提供,可在HBase组件客户端或者服务端安装路径下的lib目录下获取。 Flink读写Hudi程序 hbase-unsafe-*.jar 可在二次开发样例代码编译后产生的lib文件夹下获取。
  • Kerberos认证说明 开启了Kerberos认证的安全模式集群,进行应用开发时需要进行安全认证。使用Kerberos的系统在设计上采用“客户端/服务器”结构与AES等加密技术,并且能够进行相互认证(即客户端和服务器端均可对对方进行身份认证)。可以用于防止窃听、防止replay攻击、保护数据完整性等场合,是一种应用对称密钥体制进行密钥管理的系统。 图1 Kerberos原理架构 表1 Kerberos模块说明 模块 说明 Application Client 应用客户端,通常是需要提交任务(或者作业)的应用程序。 Application Server 应用服务端,通常是应用客户端需要访问的应用程序。 Kerberos 提供安全认证的服务。 KerberosAdmin 提供认证用户管理的进程。 KerberosServer 提供认证票据分发的进程。 应用客户端(Application Client)可以是集群内某个服务,也可以是客户二次开发的一个应用程序,应用程序可以向应用服务提交任务或者作业。 应用程序在提交任务或者作业前,需要向Kerberos服务申请TGT(Ticket-Granting Ticket),用于建立和Kerberos服务器的安全会话。 Kerberos服务在收到TGT请求后,会解析其中的参数来生成对应的TGT,使用客户端指定的用户名的密钥进行加密响应消息。 应用客户端收到TGT响应消息后,解析获取TGT,此时,再由应用客户端(通常是rpc底层)向Kerberos服务获取应用服务端的ST(Server Ticket)。 Kerberos服务在收到ST请求后,校验其中的TGT合法后,生成对应的应用服务的ST,再使用应用服务密钥将响应消息进行加密处理。 应用客户端收到ST响应消息后,将ST打包到发给应用服务的消息里面传输给对应的应用服务端(Application Server)。 应用服务端收到请求后,使用本端应用服务对应的密钥解析其中的ST,并校验成功后,本次请求合法通过。
  • Kerberos认证代码示例 package com.huawei.bigdata.hdfs.examples;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.security.UserGroupInformation;public class KerberosTest { private static String PATH_TO_HDFS_SITE_XML = KerberosTest.class.getClassLoader().getResource("hdfs-site.xml") .getPath(); private static String PATH_TO_CORE_SITE_XML = KerberosTest.class.getClassLoader().getResource("core-site.xml") .getPath(); private static String PATH_TO_KEYTAB = KerberosTest.class.getClassLoader().getResource("user.keytab").getPath(); private static String PATH_TO_KRB5_CONF = KerberosTest.class.getClassLoader().getResource("krb5.conf").getPath(); private static String PRNCIPAL_NAME = "develop"; private FileSystem fs; private Configuration conf; /** * initialize Configuration */ private void initConf() { conf = new Configuration(); // add configuration files conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * login Kerberos to get TGT, if the cluster is in security mode * @throws IOException if login is failed */ private void login() throws IOException { // not security mode, just return if (! "kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } //security mode System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } /** * initialize FileSystem, and get ST from Kerberos * @throws IOException */ private void initFileSystem() throws IOException { fs = FileSystem.get(conf); } /** * An example to access the HDFS * @throws IOException */ private void doSth() throws IOException { Path path = new Path("/tmp"); FileStatus fStatus = fs.getFileStatus(path); System.out.println("Status of " + path + " is " + fStatus); //other thing } public static void main(String[] args) throws Exception { KerberosTest test = new KerberosTest(); test.initConf(); test.login(); test.initFileSystem(); test.doSth(); }} Kerberos认证时需要配置Kerberos认证所需要的文件参数,主要包含keytab文件路径、Kerberos认证的用户名称、Kerberos认证所需要的客户端配置“krb5.conf”文件。 login()方法为调用hadoop的接口执行Kerberos认证,生成TGT票据。 doSth()方法调用hadoop的接口访问文件系统,此时底层RPC会自动携带TGT去Kerberos认证,生成ST票据。 以上代码可在安全模式下的HDFS二次开发样例工程中创建KerberosTest.java,运行并查看调测结果,具体操作过程请参考HDFS开发指南(安全模式)。
  • 安全认证基本概念 本文以HDFS组件应用的安全认证为例介绍安全认证相关的常见基本概念,可以帮助用户减少学习Kerberos框架所花费的时间,有助于更好的理解Kerberos业务。 TGT 票据授权票据(Ticket-Granting Ticket),由Kerberos服务生成,提供给应用程序与Kerberos服务器建立认证安全会话,该票据的默认有效期为24小时,24小时后该票据自动过期。 TGT申请方式(以HDFS为例): 通过HDFS提供的接口获取。 /** * login Kerberos to get TGT, if the cluster is in security mode * @throws IOException if login is failed */ private void login() throws IOException { // not security mode, just return if (! "kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } //security mode System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } 通过MRS集群客户端以kinit方式获取。 登录MRS集群客户端所在节点,进入客户端安装目录。 cd {客户端安装目录} 执行以下命令配置环境变量。 source bigdata_env 执行以下命令进行用户认证。 kinit MRS集群业务用户 ST 服务票据(Server Ticket),由Kerberos服务生成,提供给应用程序与应用服务建立安全会话,该票据一次性有效。 ST的生成在MRS中,基于hadoop-rpc通信,由rpc底层自动向Kerberos服务端提交请求,由Kerberos服务端生成。
  • MRS 3.1.2-LTS.3 表2 MRS 3.1.2-LTS.3版本集群Maven仓库的jar版本与组件的对应关系 组件 组件版本 jar版本 Flink 1.12.0 1.12.0-hw-ei-310003 Hive 3.1.0 3.1.0-hw-ei-310003 Tez 0.9.2 0.9.1.0101-hw-ei-12 Spark 2.4.5 2.4.5-hw-ei-310003 CarbonData 2.0.1 - Hadoop 3.1.1 3.1.1-hw-ei-310003 HBase 2.2.3 2.2.3-hw-ei-310003 ZooKeeper 3.5.6 3.5.6-hw-ei-310003 Hue 4.7.0 - Oozie 5.1.0 5.1.0-hw-ei-310003 Flume 1.9.0 - Kafka 2.4.0 2.4.0-hw-ei-310003 Ranger 2.0.0 - ClickHouse 21.3.4.25 0.3.0 scala 2.12 -
  • MRS 3.2.0-LTS.1 表1 MRS 3.2.0-LTS.1版本集群Maven仓库的jar版本与组件的对应关系 组件 组件版本 jar版本 Flink 1.15.0 1.15.0-h0.cbu.mrs.320.r33 Hive 3.1.0 3.1.0-h0.cbu.mrs.320.r33 Tez 0.9.2 0.9.2-h0.cbu.mrs.320.r33 Spark2x 3.1.1 3.1.1-h0.cbu.mrs.320.r33 Hadoop 3.3.1 3.3.1-h0.cbu.mrs.320.r33 HBase 2.2.3 2.2.3-h0.cbu.mrs.320.r33 ZooKeeper 3.6.3 3.6.3-h0.cbu.mrs.320.r33 Hue 4.7.0 - IoTDB 0.14.0 0.14.0-h0.cbu.mrs.320.r33 Oozie 5.1.0 5.1.0-h0.cbu.mrs.320.r33 Flume 1.9.0 1.9.0-h0.cbu.mrs.320.r33 Kafka 2.11-2.4.0 2.4.0-h0.cbu.mrs.320.r33 Ranger 2.0.0 2.0.0-h0.cbu.mrs.320.r33 Phoenix 5.0.0 5.0.0-HBase-2.0-h0.cbu.mrs.320.r33 ClickHouse 22.3.2.2 0.3.1-h0.cbu.mrs.320.r33 Loader 1.99.3 1.99.3-h0.cbu.mrs.320.r33 DBService 2.7.0 - HetuEngine 1.2.0 1.2.0-h0.cbu.mrs.320.r33 CDL 1.0.0 1.0.0-h0.cbu.mrs.320.r33 Guardian 0.1.0 1.0.6-h0.cbu.mrs.321.r28
  • Flink应用程序开发流程 Flink开发流程参考如下步骤: 图1 Flink应用程序开发流程 表1 Flink应用开发的流程说明 阶段 说明 参考章节 了解基本概念 在开始开发应用前,需要了解Flink的基本概念。 Flink基本概念 准备开发和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 导入并配置Flink样例工程 准备安全认证 如果您使用的是安全集群,需要进行安全认证。 配置Flink应用安全认证 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 开发Flink应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编译并调测Flink应用 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看Flink应用调测结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 组件操作指南中的“Flink性能调优”
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 追加文件内容 * * @throws java.io.IOException */private void append() throws IOException { final String content = "I append this content."; FSDataOutputStream out = null; try { out = fSystem.append(new Path(DEST_PATH + File.separator + FILE_NAME)); out.write(content.getBytes()); out.hsync(); LOG .info("success to append."); } finally { // make sure the stream is closed finally. IOUtils.closeStream(out); }}
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建目录 * * @throws java.io.IOException */ private void mkdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!createPath(destPath)) { LOG.error("failed to create destPath " + DEST_PATH); return; } LOG.info("success to create path " + DEST_PATH);}/** * create file path * * @param filePath * @return * @throws java.io.IOException */private boolean createPath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { fSystem.mkdirs(filePath); } return true;}
  • 场景说明 在同一个客户端进程内同时访问 FusionInsight ZooKeeper和第三方的ZooKeeper时,为了避免访问连接ZooKeeper认证冲突,提供了样例代码使HBase客户端访问FusionInsight ZooKeeper和客户应用访问第三方ZooKeeper。 以下为“src/main/resources”目录下提供的与认证相关的配置文件。 zoo.cfg # The configuration in jaas.conf used to connect fi zookeeper.zookeeper.sasl.clientconfig=Client_new[1]# Principal of fi zookeeper server side.zookeeper.server.principal=zookeeper/hadoop.hadoop.com[2]# Set true if the fi cluster is security mode.# The other two parameters doesn't work if the value is false.zookeeper.sasl.client=true[3] [1] zookeeper.sasl.clientconfig:指定使用jaas.conf文件中的对应配置访问FusionInsight ZooKeeper; [2] zookeeper.server.principal:指定ZooKeeper服务端使用principal; [3] zookeeper.sasl.client:如果MRS集群是安全模式,该值设置为“true”,否则设置为“false”,设置为“false”的情况下,“zookeeper.sasl.clientconfig”和“zookeeper.server.principal”参数不生效。 jaas.conf Client_new { [4] com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="D:\\work\\sample_project\\src\\hbase-examples\\hbase-zk-example\\target\\classes\\conf\\user.keytab" [5] principal="hbaseuser1" useTicketCache=false storeKey=true debug=true;};Client { [6] org.apache.zookeeper.server.auth.DigestLoginModule required username="bob" password="xxxxxx"; [7]}; [4] Client_new:zoo.cfg中指定的读取配置,当该名称修改时,需要同步修改zoo.cfg中对应配置。 [5] keyTab :指明工程使用的“user.keytab”在运行样例的主机上的保存路径,使用绝对路径便于更好定位文件位置。在Windows环境和Linux环境下配置时需注意区分不同操作系统路径书写方式,即“\\”与“\”差异。 [6] Client:第三方ZooKeeper使用该配置进行访问连接,具体连接认证配置由第三方ZooKeeper版本决定。 [7] password:密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
  • 常用概念 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Hive的相关操作。 HQL语言 Hive Query Language,类SQL语句。 HCatalog HCatalog是建立在Hive元数据之上的一个表信息管理层,吸收了Hive的DDL命令。为MapReduce提供读写接口,提供Hive命令行接口来进行数据定义和元数据查询。基于MRS的HCatalog功能,Hive、MapReduce开发人员能够共享元数据信息,避免中间转换和调整,能够提升数据处理的效率。 WebHCat WebHCat运行用户通过Rest API来执行Hive DDL,提交MapReduce任务,查询MapReduce任务执行结果等操作。
  • 操作步骤 在Windows环境下的Intellij IDEA开发环境中,单击IDEA右侧Maven导入依赖。 图1 导入依赖 (可选)如果对接的集群开启了ZooKeeper的SSL认证通信功能,则需要添加JVM配置参数,如下所示: -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true 在“JDBCExampleZK.java”文件下单击右键,在弹出菜单单击“Run 'JDBCExampleZK.main()' ”。 图2 运行程序 在IDEA的console中可以看到输出结果。 图3 输出结果
  • 通过HSBroker的用户名密码认证实现查询HetuEngine SQL任务 本章节适用于MRS 3.3.0及以后版本。 通过HSBroker方式连接到HetuEngine,组装对应的SQL发送到HetuEngine执行,完成对Hive数据源的增删改查操作。 import jaydebeapi driver = "io.XXX.jdbc.XXXDriver" # need to change the value based on the cluster informationurl = "jdbc:XXX://192.168.37.61:29861,192.168.37.62:29861/hive/default?serviceDiscoveryMode=hsbroker"user = "YourUserName"tenant = "YourTenant"jdbc_location = "Your file path of the jdbc jar" sql = "show catalogs" if __name__ == '__main__': conn = jaydebeapi.connect(driver, url, {"user": user, "SSL": "false", "tenant": tenant}, [jdbc_location]) curs = conn.cursor() curs.execute(sql) result = curs.fetchall() print(result) curs.close() conn.close() 上述代码中各参数说明如下表所示: 表1 参数及参数说明 参数名称 参数说明 url jdbc:XXX://HSBroker1_IP:HSBroker1_Port,HSBroker2_IP:HSBroker2_Port,HSBroker3_IP:HSBroker3_Port/catalog/schema?serviceDiscoveryMode=hsbroker 说明: XXX:驱动名,请以实际样例代码中的内容为准。 catalog、schema分别是JDBC客户端要连接的catalog和schema名称。 HSBroker_IP:HSBroker_Port是HSBroker的URL,多个URL以逗号隔开。例如:“192.168.81.37:2181,192.168.195.232:2181,192.168.169.84:2181” user 访问HetuServer的用户名,即在集群中创建的“人机”用户的用户名。 tenant 指定访问HetuEngine计算实例的租户资源队列。 jdbc_location 导入并配置HetuEngine Python3样例工程中获取的hetu-jdbc-XXX.jar包的完整路径。 Windows系统路径示例:"D:\\hetu-examples-python3\\hetu-jdbc-XXX.jar" Linux系统路径示例:"/opt/hetu-examples-python3/hetu-jdbc-XXX.jar" 父主题: HetuEngine样例程序(Python3)
  • Hive介绍 Hive是一个开源的,建立在Hadoop上的 数据仓库 框架,提供类似SQL的HQL语言操作结构化数据,其基本原理是将HQL语言自动转换成MapReduce任务或Spark任务,从而完成对Hadoop集群中存储的海量数据进行查询和分析。 Hive主要特点如下: 通过HQL语言非常容易的完成数据提取、转换和加载(ETL)。 通过HQL完成海量结构化数据分析。 灵活的数据存储格式,支持JSON、 CS V、TEXTFILE、RCFILE、ORCFILE、SEQUENCEFILE等存储格式,并支持自定义扩展。 多种客户端连接方式,支持JDBC接口。 Hive主要应用于海量数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。 为保证Hive服务的高可用性、用户数据的安全及访问服务的可控制,在开源社区的Hive-3.1.0版本基础上,Hive新增如下特性: 数据文件加密机制。 开源社区的Hive特性,请参见https://cwiki.apache.org/confluence/display/hive/designdocs。
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件。 支持7-Zip 16.04版本。
  • 样例代码 -- 从本地文件系统/opt/hive_examples_data/目录下将employee_info.txt加载进employees_info表中.---- 用新数据覆盖原有数据 LOAD DATA LOCAL INPATH '/opt/hive_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; ---- 保留原有数据,将新数据追加到表中LOAD DATA LOCAL INPATH '/opt/hive_examples_data/employee_info.txt' INTO TABLE employees_info; -- 从HDFS上/user/hive_examples_data/employee_info.txt加载进employees_info表中. ---- 用新数据覆盖原有数据LOAD DATA INPATH '/user/hive_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; ---- 保留原有数据,将新数据追加到表中LOAD DATA INPATH '/user/hive_examples_data/employee_info.txt' INTO TABLE employees_info; 加载数据的实质是将数据复制到HDFS上指定表的目录下。
  • 提供分流能力 表8 提供分流能力的相关接口 API 说明 def split(selector: OutputSelector[T]): SplitStream[T] 传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 def select(outputNames: String*): DataStream[T] 从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。
  • 提供Join能力 表12 提供Join能力的相关接口 API 说明 def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] 通过给定的key在一个窗口范围内join两条数据流。 join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。 def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] 通过给定的key在一个窗口范围内co-group两条数据流。 coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全