华为云用户手册

  • 功能介绍 本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式: 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。 目前表名长度最长为128,字段名长度最长为128,字段注解长度最长为4000,WITH SERDEPROPERTIES 中key长度最长为256,value长度最长为4000。以上的长度均表示字节长度。
  • 代码样例 以下为代码片段: hbase.root.logger=INFO,console,RFA //hbase客户端日志输出配置,console:输出到控制台;RFA:输出到日志文件hbase.security.logger=DEBUG,console,RFAS //hbase客户端安全相关的日志输出配置,console:输出到控制台;RFAS:输出到日志文件hbase.log.dir=/var/log/Bigdata/hbase/client/ //日志路径,根据实际路径修改,但目录要有写入权限hbase.log.file=hbase-client.log //日志文件名hbase.log.level=INFO //日志级别,如果需要更详细的日志定位问题,需要修改为DEBUG,修改完需要重启进程才能生效hbase.log.maxbackupindex=20 //最多保存的日志文件数目# Security audit appenderhbase.security.log.file=hbase-client-audit.log //审计日志文件命令
  • 未安装客户端时编译并运行程序 进入工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包。 将导出的Jar包上传至Linux运行环境的任意目录下,例如“/optclient”。 将工程中的“lib”文件夹和“conf”文件夹上传至和Jar包相同的Linux运行环境目录下,例如“/opt/client”(其中“lib”目录汇总包含了工程中依赖的所有的Jar包,“conf”目录包含运行jar包所需的集群相关配置文件,请参考准备运行环境)。 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令运行Jar包。 java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.HdfsExample java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample:时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 已安装客户端时编译并运行程序 进入样例工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包,例如“HDFSTest-XXX.jar”,jar包名称以实际打包结果为准。 将导出的Jar包上传至Linux客户端运行环境的任意目录下,例如“/opt/client”。 配置环境变量: cd /opt/client source bigdata_env 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令,运行Jar包。 hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.HdfsExample hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • Hive简介 Hive是一个开源的,建立在Hadoop上的 数据仓库 框架,提供类似SQL的HQL语言操作结构化数据,其基本原理是将HQL语言自动转换成MapReduce任务或Spark任务,从而完成对Hadoop集群中存储的海量数据进行查询和分析。 Hive主要特点如下: 通过HQL语言非常容易的完成数据提取、转换和加载(ETL)。 通过HQL完成海量结构化数据分析。 灵活的数据存储格式,支持JSON、 CS V、TEXTFILE、RCFILE、ORCFILE、SEQUENCEFILE等存储格式,并支持自定义扩展。 多种客户端连接方式,支持JDBC接口。 Hive的主要应用于海量数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。 为保证Hive服务的高可用性、用户数据的安全及访问服务的可控制,在开源社区的Hive-3.1.0版本基础上,Hive新增如下特性: 数据文件加密机制:开源社区的Hive特性,请参见https://cwiki.apache.org/confluence/display/hive/designdocs。
  • 前提条件 已安装客户端时: 已安装HDFS客户端。 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。 未安装客户端时: Linux环境已安装JDK,版本号需要和IDEA导出Jar包使用的JDK版本一致。 当Linux环境所在主机不是集群中的节点时,需要在Linux环境所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。
  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要设置运行用户,此用户需绑定supergroup用户组。 在运行Colocation工程时,HDFS的配置项fs.defaultFS不能配置为viewfs://ClusterX。 初始化 使用Colocation前需要设置运行用户。 private static void init() throws IOException { // 设置用户,若用户没有设置HADOOP_USER_NAME,则使用USER if (System.getenv("HADOOP_USER_NAME") == null && System.getProperty("HADOOP_USER_NAME") == null) { System.setProperty("HADOOP_USER_NAME", USER); }} 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); 创建group 样例:创建一个gid01组,组中包含3个locator。 /** * 创建group * * @throws java.io.IOException */private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" }));} 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 /** * 创建并写入文件 * * @throws java.io.IOException */private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path(TESTFILE_TXT), true, COLOCATION_GROUP_GROUP01, "lid01"); // 待写入HDFS的数据 byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close();} 删除文件 样例:删除testfile.txt文件。 /** * 删除文件 * * @throws java.io.IOException */@SuppressWarnings("deprecation")private static void delete() throws IOException { dfs.delete(new Path(TESTFILE_TXT));} 删除group 样例:删除gid01。 /** * 删除group * * @throws java.io.IOException */private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01);}
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 在Linux客户端运行应用和在Windows环境下运行应用的初始化代码相同,代码样例如下所示。 //初始化confLoad(); // 创建一个用例 HdfsExample hdfs_examples = new HdfsExample("/user/hdfs-examples", "test.txt"); /** * * 如果程序运行在Linux上,则需要core-site.xml、hdfs-site.xml的路径修改 * 为在Linux下客户端文件的绝对路径 * * */ private static void confLoad() throws IOException { conf = new Configuration(); // conf file conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); // conf.addResource(new Path(PATH_TO_SMALL_SITE_XML)); } /** *创建用例 */ public HdfsExample(String path, String fileName) throws IOException { this.DEST_PATH = path; this.FILE_NAME = fileName; instanceBuild(); } private void instanceBuild() throws IOException { fSystem = FileSystem.get(conf); } (可选)运行此样例代码需要设置运行用户,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码。 添加环境变量HADOOP_USER_NAME:参考调测HDFS应用章节。 修改代码:在没有设置HADOOP_USER_NAME的场景下,直接修改代码中的USER。如下所示。 System.setProperty("HADOOP_USER_NAME", USER);
  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: • Colocation分配节点原理 • 扩容与Colocation分配 • Colocation与数据节点容量 Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略一来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locator所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个……)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 无 HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 Colocation提供了文件同分布的功能,执行集群balancer或mover操作时,会移动数据块,使Colocation功能失效。因此,使用Colocation功能时,建议将HDFS配置项dfs.datanode.block-pinning.enabled设置为true,此时执行集群Balancer或Mover操作时,使用Colocation写入的文件将不会被移动,从而保证了文件同分布。
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建目录 * * @throws java.io.IOException */ private void mkdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!createPath(destPath)) { LOG .error("failed to create destPath " + DEST_PATH); return; } LOG.info("success to create path " + DEST_PATH);}/** * create file path * * @param filePath * @return * @throws java.io.IOException */private boolean createPath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { fSystem.mkdirs(filePath); } return true;}
  • HDFS应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 HDFS应用程序开发流程 表1 HDFS应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HDFS的基本概念。 常用概念 准备开发和运行环境 使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 HDFS的运行环境即HDFS客户端,请根据指导完成客户端的安装和配置。 准备HDFS应用开发和运行环境 准备工程 HDFS提供了不同场景下的样例程序,可以导入样例工程进行程序学习。 导入并配置HDFS样例工程 根据场景开发工程 提供样例工程,帮助用户快速了解HDFS各部件的编程接口。 开发HDFS应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HDFS应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测HDFS应用 父主题: HDFS开发指南(普通模式)
  • 常用概念 Colocation 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性是,将那些需进行关联操作的文件存放在相同的数据节点上,在进行关联操作计算时,避免了到别的数据节点上获取数据的动作,大大降低了网络带宽的占用。 Client HDFS Client主要包括五种方式:JAVA API、C API、Shell、HTTP REST API、WEB UI五种方式,可参考HDFS常用API介绍、HDFS Shell命令介绍。 JAVA API 提供HDFS文件系统的应用接口,本开发指南主要介绍如何使用Java API进行HDFS文件系统的应用开发。 C API 提供HDFS文件系统的应用接口,使用C语言开发的用户可参考C接口的描述进行应用开发。 Shell 提供shell命令完成HDFS文件系统的基本操作。 HTTP REST API 提供除Shell、Java API和C API以外的其他接口,可通过此接口监控HDFS状态等信息。 WEB UI 提供Web可视化组件管理界面。
  • 代码样例 以下代码片段在“hbase-zk-example\src\main\java\com\huawei\hadoop\hbase\example”包的“TestZKSample”类中,用户主要需要关注“login”和“connectApacheZK”这两个方法。 private static void login(String keytabFile, String principal) throws IOException { conf = HBaseConfiguration.create(); //In Windows environment String confDirPath = TestZKSample.class.getClassLoader().getResource("").getPath() + File.separator;[1] //In Linux environment //String confDirPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; // Set zoo.cfg for hbase to connect to fi zookeeper. conf.set("hbase.client.zookeeper.config.path", confDirPath + "zoo.cfg"); if (User.isHBaseSecurityEnabled(conf)) { // jaas.conf file, it is included in the client pakcage file System.setProperty("java.security.auth.login.config", confDirPath + "jaas.conf"); // set the kerberos server info,point to the kerberosclient System.setProperty("java.security.krb5.conf", confDirPath + "krb5.conf"); // set the keytab file name conf.set("username.client.keytab.file", confDirPath + keytabFile); // set the user's principal try { conf.set("username.client.kerberos.principal", principal); User.login(conf, "username.client.keytab.file", "username.client.kerberos.principal", InetAddress.getLocalHost().getCanonicalHostName()); } catch (IOException e) { throw new IOException("Login failed.", e); } } } private void connectApacheZK() throws IOException, org.apache.zookeeper.KeeperException { try { // Create apache zookeeper connection. ZooKeeper digestZk = new ZooKeeper("127.0.0.1:2181", 60000, null); LOG.info("digest directory:{}", digestZk.getChildren("/", null)); LOG.info("Successfully connect to apache zookeeper."); } catch (InterruptedException e) { LOG.error("Found error when connect apache zookeeper ", e); } }
  • 回答 首先查看ZooKeeper中/flink_base的目录权限是否为:'world,'anyone: cdrwa;如果不是,请修改/flink_base的目录权限为:'world,'anyone: cdrwa,然后继续根据步骤二排查;如果是,请根据步骤二排查。 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink_base/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink_base/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • 部署运行及结果查看 导出本地jar包,请参见打包Storm样例工程应用。 将4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm业务。 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml 如果设置业务以本地模式启动,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml 如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。 如果使用了properties文件,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties 拓扑提交成功后请自行登录storm UI查看。
  • 操作步骤 安全模式下,请先进行安全认证。 初始化客户端环境变量。 进入客户端安装目录“/opt/Storm_client”执行以下命令,导入环境变量信息。 source bigdata_env 使用在“准备开发用户”章节创建的开发用户进行安全登录。 执行kinit命令进行“人机”用户的安全登录。 kinit用户名 例如: kinit developuser 然后按照提示输入密码,无异常提示返回,则完成了用户的kerberos认证。 提交拓扑(以wordcount为例,其它拓扑请参照相关开发指引),进入Storm客户端“Storm/storm-1.2.1/bin”目录,将刚打出的source.jar提交(如果在Windows上进行的打包,则需要将Windows上的source.jar上传到Linux服务器,假定上传到“/opt/jartarget”目录),执行命令:storm jar /opt/jartarget/source.jar com.huawei.storm.example.wordcount.WordCountTopology。 执行storm list命令,查看已经提交的应用程序,如果发现名称为word-count的应用程序,则说明任务提交成功。 如果业务设置为本地模式,且使用命令行方式提交时,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。
  • Storm应用开发流程 本文档主要基于Java API进行Storm拓扑的开发。 开发流程中各阶段的说明如图1和表1所示: 图1 拓扑开发流程 表1 Storm应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Storm的基本概念,了解场景需求,拓扑等。 Storm应用开发常用概念 准备开发和运行环境 Storm的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 Storm的运行环境即Storm客户端,请根据指导完成客户端的安装和配置。 准备Storm应用开发和运行环境 准备工程 Storm提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Storm样例工程 根据场景开发拓扑 提供了Storm拓扑的构造和Spout/Bolt开发过程。 开发Storm应用 打包IntelliJ IDEA代码 Storm样例程序是在Linux环境下运行,需要将IntelliJ IDEA中的代码打包成jar包。 打包Storm样例工程应用 打包业务 将IntelliJ IDEA代码生成的jar包与工程依赖的jar包,合并导出可提交的source.jar。 打包Storm业务 提交拓扑 指导用户将开发好的程序提交运行。 提交Storm拓扑 查看程序运行结果 指导用户提交拓扑后查看程序运行结果。 查看Storm应用调测结果 父主题: Storm应用开发概述
  • 操作步骤 将从IntelliJ IDEA打包出来的jar包放入指定文件夹(例如“D:\source”)。 在样例代码目录“src/storm-examples/storm-examples”下创建“lib”目录,将IntelliJ IDEA中导出的jar包复制到“lib”目录下,并解压。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在IntelliJ IDEA样例工程的“tools”目录下找到打包工具:“storm-jartool.cmd”。 双击打包工具,输入要打包的jar包所在目录(“D:\source”)并回车,再输入打出包存放的目录(“D:\target”),在“D:\target”中,会生成“source.jar”文件。
  • 回答 用户尝试收集大量数据到Driver端,如果Driver端的内存不足以存放这些数据,那么就会抛出OOM(OutOfMemory)的异常,然后Driver端一直在进行GC,尝试回收垃圾来存放返回的数据,导致应用长时间挂起。 解决措施: 如果用户需要在OOM场景下强制将应用退出,那么可以在启动Spark Core应用时,在客户端配置文件“$SPARK_HOME/conf/spark-defaults.conf”中的配置项“spark.driver.extraJavaOptions”中添加如下内容: -XX:OnOutOfMemoryError='kill -9 %p'
  • 操作步骤 将从IntelliJ IDEA中导出的jar包复制到Linux客户端指定目录(例如“/opt/jarsource”)。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在Storm客户端安装目录“Storm/storm-1.2.1/bin”下执行打包命令,将上述jar包打成一个完整的业务jar包放入指定目录/opt/jartarget(可为任意空目录)。执行sh storm-jartool.sh /opt/jarsource/ /opt/jartarget命令后,会在“/opt/jartarget”下生成source.jar。
  • 问题 执行Spark Core应用,尝试收集大量数据到Driver端,当Driver端内存不足时,应用挂起不退出,日志内容如下。 16/04/19 15:56:22 ERROR Utils: Uncaught exception in thread task-result-getter-2java.lang.OutOfMemoryError: Java heap spaceat java.lang.reflect.Array.newArray(Native Method)at java.lang.reflect.Array.newInstance(Array.java:75)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71)at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91)at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716)at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: Java heap spaceat java.lang.reflect.Array.newArray(Native Method)at java.lang.reflect.Array.newInstance(Array.java:75)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71)at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91)at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716)at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)
  • 样例代码路径说明 表1 样例代码路径说明 样例代码项目 样例名称 样例语言 SparkJavaExample Spark Core程序 Java SparkScalaExample Spark Core程序 Scala SparkPythonExample Spark Core程序 Python SparkSQLJavaExample Spark SQL程序 Java SparkSQLScalaExample Spark SQL程序 Scala SparkSQLPythonExample Spark SQL程序 Python SparkThriftServerJavaExample 通过JDBC访问Spark SQL的程序 Java SparkThriftServerScalaExample 通过JDBC访问Spark SQL的程序 Scala SparkOnHbaseJavaExample-AvroSource Spark on HBase 程序-操作Avro格式数据 Java SparkOnHbaseScalaExample-AvroSource Spark on HBase 程序-操作Avro格式数据 Scala SparkOnHbasePythonExample-AvroSource Spark on HBase 程序-操作Avro格式数据 Python SparkOnHbaseJavaExample-HbaseSource Spark on HBase 程序-操作HBase数据源 Java SparkOnHbaseScalaExample-HbaseSource Spark on HBase 程序-操作HBase数据源 Scala SparkOnHbasePythonExample-HbaseSource Spark on HBase 程序-操作HBase数据源 Python SparkOnHbaseJavaExample-JavaHBaseBulkPutExample Spark on HBase 程序-BulkPut接口使用 Java SparkOnHbaseScalaExample-HBaseBulkPutExample Spark on HBase 程序-BulkPut接口使用 Scala SparkOnHbasePythonExample-HBaseBulkPutExample Spark on HBase 程序-BulkPut接口使用 Python SparkOnHbaseJavaExample-JavaHBaseBulkGetExample Spark on HBase 程序-BulkGet接口使用 Java SparkOnHbaseScalaExample-HBaseBulkGetExample Spark on HBase 程序-BulkGet接口使用 Scala SparkOnHbasePythonExample-HBaseBulkGetExample Spark on HBase 程序-BulkGet接口使用 Python SparkOnHbaseJavaExample-JavaHBaseBulkDeleteExample Spark on HBase 程序-BulkDelete接口使用 Java SparkOnHbaseScalaExample-HBaseBulkDeleteExample Spark on HBase 程序-BulkDelete接口使用 Scala SparkOnHbasePythonExample-HBaseBulkDeleteExample Spark on HBase 程序-BulkDelete接口使用 Python SparkOnHbaseJavaExample-JavaHBaseBulkLoadExample Spark on HBase 程序-BulkLoad接口使用 Java SparkOnHbaseScalaExample-HBaseBulkLoadExample Spark on HBase 程序-BulkLoad接口使用 Scala SparkOnHbasePythonExample-HBaseBulkLoadExample Spark on HBase 程序-BulkLoad接口使用 Python SparkOnHbaseJavaExample-JavaHBaseForEachPartitionExample Spark on HBase 程序-foreachPartition接口使用 Java SparkOnHbaseScalaExample-HBaseForEachPartitionExample Spark on HBase 程序-foreachPartition接口使用 Scala SparkOnHbasePythonExample-HBaseForEachPartitionExample Spark on HBase 程序-foreachPartition接口使用 Python SparkOnHbaseJavaExample-JavaHBaseDistributedScanExample Spark on HBase 程序-分布式Scan HBase表 Java SparkOnHbaseScalaExample-HBaseDistributedScanExample Spark on HBase 程序-分布式Scan HBase表 Scala SparkOnHbasePythonExample-HBaseDistributedScanExample Spark on HBase 程序-分布式Scan HBase表 Python SparkOnHbaseJavaExample-JavaHBaseMapPartitionExample Spark on HBase 程序-mapPartitions接口使用 Java SparkOnHbaseScalaExample-HBaseMapPartitionExample Spark on HBase 程序-mapPartitions接口使用 Scala SparkOnHbasePythonExample-HBaseMapPartitionExample Spark on HBase 程序-mapPartitions接口使用 Python SparkOnHbaseJavaExample-JavaHBaseStreamingBulkPutExample Spark on HBase 程序-SparkStreaming批量写入HBase表 Java SparkOnHbaseScalaExample-HBaseStreamingBulkPutExample Spark on HBase 程序-SparkStreaming批量写入HBase表 Scala SparkOnHbasePythonExample-HBaseStreamingBulkPutExample Spark on HBase 程序-SparkStreaming批量写入HBase表 Python SparkHbasetoHbaseJavaExample 从HBase读取数据再写入HBase Java SparkHbasetoHbaseScalaExample 从HBase读取数据再写入HBase Scala SparkHbasetoHbasePythonExample 从HBase读取数据再写入HBase Python SparkHivetoHbaseJavaExample 从Hive读取数据再写入HBase Java SparkHivetoHbaseScalaExample 从Hive读取数据再写入HBase Scala SparkHivetoHbasePythonExample 从Hive读取数据再写入HBase Python SparkStreamingKafka010JavaExample Spark Streaming对接Kafka0-10程序 Java SparkStreamingKafka010ScalaExample Spark Streaming对接Kafka0-10程序 Scala SparkStructuredStreamingJavaExample Structured Streaming程序 Java SparkStructuredStreamingScalaExample Structured Streaming程序 Scala SparkStructuredStreamingPythonExample Structured Streaming程序 Python StructuredStreamingADScalaExample Structured Streaming流流Join Scala StructuredStreamingStateScalaExample Structured Streaming 状态操作 Scala SparkOnMultiHbaseScalaExample Spark同时访问两个HBase Scala SparkOnHudiJavaExample 使用Spark执行Hudi基本操作 Java SparkOnHudiPythonExample 使用Spark执行Hudi基本操作 Python SparkOnHudiScalaExample 使用Spark执行Hudi基本操作 Scala
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 运行Python样例代码无需通过Maven打包,只需要上传user.keytab、krb5.conf 文件到客户端所在服务器上。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
  • 部署运行及结果查看 导出本地jar包,请参见打包Storm样例工程应用。 将1中导出的本地Jar包,5中获取的配置文件和6中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm业务。 执行命令提交拓扑。 keytab方式下,若用户修改了keytab文件名,如修改为“huawei.keytab”,则需要在命令中增加第二个参数进行说明,提交命令示例(拓扑名为hbase-test): storm jar /opt/jartarget/source.jar com.huawei.storm.example.hbase.SimpleHBaseTopology hbase-test huawei.keytab 安全模式下在提交source.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。 因为示例中的HBaseBolt并没有建表功能,在提交之前确保hbase中存在相应的表,若不存在需要手动建表,hbase shell建表语句如下create 'WordCount', 'cf'。 安全模式下hbase需要用户有相应表甚至列族和列的访问权限,因此首先需要在hbase所在集群上使用hbase管理员用户登录,之后在hbase shell中使用grant命令给提交用户申请相应表的权限,如示例中的WordCount,成功之后再使用提交用户登录并提交拓扑。 拓扑提交成功后请自行登录HBase集群查看。 如果使用票据登录,则需要使用命令行定期上传票据,具体周期由票据刷新截止时间而定,步骤如下: 在安装好的storm客户端目录的“Storm/storm-1.2.1/conf/storm.yaml”文件尾部新起一行添加如下内容: topology.auto-credentials: - org.apache.storm.security.auth.kerberos.AutoTGT 执行命令./storm upload-credentials hbase-test。
  • 参数解释 FS Action节点中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 name FS活动的名称 delete 删除指定的文件和目录的标签 move 将文件从源目录移动到目标目录的标签 chmod 修改文件或目录权限的标签 path 当前文件路径 source 源文件路径 target 目标文件路径 permissions 权限字符串 “${变量名}”表示:该值来自job.properties所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见配置Oozie作业运行参数)
  • 场景说明 在安全集群环境下,各个组件需要在通信之前进行相互认证,以确保通信的安全性。 用户在开发Oozie应用程序时,某些场景下需要Oozie与Hadoop、Hive等之间进行通信。那么Oozie应用程序中需要写入安全认证代码,确保Oozie程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交Oozie应用程序运行前,在Oozie客户端执行如下命令获得认证。 kinit 组件业务用户 代码认证(Kerberos安全认证): 通过获取客户端的principal和keytab文件在应用程序中进行认证,用于Kerberos安全认证的keytab文件和principal文件您可以联系管理员创建并获取,具体使用方法在样例代码中会有详细说明。 目前样例代码统一调用LoginUtil类进行安全认证,支持Oracle JAVA平台和IBM JAVA平台。 代码示例中请根据实际情况,修改“USERNAME”为实际用户名,例如“developuser”。 private static void login(String keytabFilePath, String krb5FilePath, String user) throws IOException { Configuration conf = new Configuration(); conf.set(KERBEROS_PRINCIPAL, user); conf.set(KEYTAB_FILE, keytabFilePath); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); conf.set(HADOOP_SECURITY_AUTHORIZATION, "true"); /* * if need to connect zk, please provide jaas info about zk. of course, * you can do it as below: * System.setProperty("java.security.auth.login.config", confDirPath + * "jaas.conf"); but the demo can help you more : Note: if this process * will connect more than one zk cluster, the demo may be not proper. you * can contact us for more help */ LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, user, keytabFilePath); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(user, keytabFilePath, krb5FilePath, conf); }
  • 操作场景 本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。 本章节代码样例基于Kafka新API,对应IntelliJ IDEA工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。 本章节只适用于 MRS 产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 应用开发操作步骤 确认Storm和Kafka组件已经安装,并正常运行。 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。 在Linux环境下安装Storm客户端。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于登录 FusionInsight Manager平台并通过认证,并且获取到该用户的keytab文件。 获取的用户需要同时属于storm组和kafka组。 默认情况下,用户的密码有效期是90天,所以获取的keytab文件的有效期是90天。如果需要延长该用户keytab的有效期,修改用户的密码策略并重新获取keytab。 下载并安装Kafka客户端程序。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表4 Spark Streaming方法介绍 方法 说明 socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] 从TCP源主机:端口创建一个输入流。 start():Unit 启动Spark Streaming计算。 awaitTermination(timeout: long):Unit 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 终止Spark Streaming计算。 transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] 对每一个RDD应用function操作得到一个新的DStream。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(otherStream, [numTasks]) 实现不同的Spark Streaming之间做合并操作。 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。 表5 Spark Streaming增强特性接口 方法 说明 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。
共100000条