华为云用户手册

  • 开发思路 根据前述场景说明进行功能分解,以上传一个新员工的信息为例,对该员工的信息进行查询、追加、删除等,可分为以下七部分: 通过kerberos认证。 调用fileSystem中的mkdir接口创建目录。 调用HdfsWriter的dowrite接口写入信息。 调用fileSystem中的open接口读取文件。 调用HdfsWriter的doAppend接口追加信息。 调用fileSystem中的deleteOnExit接口删除文件。 调用fileSystem中的delete接口删除文件夹。
  • 数据规划 首先需要把原日志文件放置在HDFS系统里。 本地新建两个文本文件,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS上建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下。 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir /tmp/input 在Linux系统HDFS客户端使用命令hdfs dfs -put local_filepath /tmp/input
  • 场景说明 通过典型场景,可以快速学习和掌握HDFS的开发过程,并对关键的接口函数有所了解。 HDFS的业务操作对象是文件,代码样例中所涉及的文件操作主要包括创建文件夹、写文件、追加文件内容、读文件和删除文件/文件夹;HDFS还有其他的业务处理,例如设置文件权限等,其他操作可以在掌握本代码样例之后,再扩展学习。 本代码样例讲解顺序为: HDFS初始化 初始化HDFS 写文件 写HDFS文件 追加文件内容 追加HDFS文件内容 读文件 读HDFS文件 删除文件 删除HDFS文件 Colocation HDFS Colocation 设置存储策略 设置HDFS存储策略 访问OBS HDFS访问OBS
  • 操作步骤 执行mvn package生成jar包,在工程目录target目录下获取,比如:hdfs-examples-1.0.jar。 将导出的Jar包拷贝上传至Linux客户端运行环境的任意目录下,例如“/opt/client”,然后在该目录下创建“conf”目录,将“user.keytab” 和 "krb5.conf"拷贝至“conf”目录。可参考6 。 配置环境变量。 source /opt/client/bigdata_env 执行如下命令,运行Jar包。 hadoop jar hdfs-examples-1.0.jar com.huawei.bigdata.hdfs.examples.HdfsMain 运行命令时,需保持客户端“Yarn/config/hdfs-site.xml”中的kerberos相关信息和“HDFS/hadoop/etc/hadoop/hdfs-site.xml”中的kerberos相关信息一致。“hdfs-site.xml”中kerberos的配置mapred改为hdfs,需要修改的地方如图1所示。 图1 hdfs-site.xml
  • 操作步骤 查看运行结果获取应用运行情况 HdfsMain Linux样例程序安全集群运行结果如下所示: [root@node-master1dekG client]# hadoop jar hdfs-examples-1.0.jar com.huawei.bigdata.hdfs.examples.HdfsMainWARNING: Use "yarn jar" to launch YARN applications.20/03/25 16:29:45 INFO security.UserGroupInformation: Login successful for user hdfsuser using keytab file user.keytab20/03/25 16:29:45 INFO security.LoginUtil: Login success!!!!!!!!!!!!!!success to create path /user/hdfs-examplessuccess to write.result is : hi, I am bigdata. It is successful if you can see me.success to read.success to delete the file /user/hdfs-examples/test.txtsuccess to delete path /user/hdfs-examplessuccess to create path /user/hdfs-examplesStoragePolicy:FROZENStoragePolicy:COLDStoragePolicy:WA RMS toragePolicy:HOTStoragePolicy:ONE_SSDStoragePolicy:ALL_SSDStoragePolicy:LAZY_PERSISTsucceed to set Storage Policy path /user/hdfs-examplessuccess to delete path /user/hdfs-examples HdfsMain Linux样例程序普通集群运行结果如下所示: [root@node-master2VknR client]# hadoop jar hdfs-examples-1.0.jar com.huawei.bigdata.hdfs.examples.HdfsMainWARNING: Use "yarn jar" to launch YARN applications.success to create path /user/hdfs-examplessuccess to write.result is : hi, I am bigdata. It is successful if you can see me.success to read.success to delete the file /user/hdfs-examples/test.txtsuccess to delete path /user/hdfs-examplessuccess to create path /user/hdfs-examplesStoragePolicy:FROZENStoragePolicy:COLDStoragePolicy:WARMStoragePolicy:HOTStoragePolicy:ONE_SSDStoragePolicy:ALL_SSDStoragePolicy:LAZY_PERSISTsucceed to set Storage Policy path /user/hdfs-examplessuccess to delete path /user/hdfs-examples 查看HDFS日志获取应用运行情况 您可以查看HDFS的namenode日志了解应用运行情况,并根据日志信息调整应用程序。
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“Scala Module”,然后单击“Next”。如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本,并勾选“Config later”(待工程创建完毕后引入scala的编译库文件),然后单击“Finish”完成工程创建。 图3 填写工程信息
  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略一来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locator所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个......)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 - HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 如果需要对colocation上传的文件做balance操作,为避免colocation失效,可以通过 MRS Manager界面中的oi.dfs.colocation.file.pattern参数进行设置,设置该参数值为对应数据文件块的路径,多个路径之间以逗号分开。例如/test1,/test2。
  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要将HDFS用户绑定supergroup用户组。 初始化 使用Colocation前需要进行kerberos安全认证。 private static void init() throws IOException { LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 public static void main(String[] args) throws IOException { init(); dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); createGroup(); put(); delete(); deleteGroup(); dfs.close(); dfsAdmin.close(); } 创建group 样例:创建一个gid01组,组中包含3个locator。 private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); } 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path("/testfile.txt"), true, COLOCATION_GROUP_GROUP01, "lid01"); // 待写入到HDFS的数据. byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); } 删除文件 样例:删除testfile.txt文件。 public static void delete() throws IOException { dfs.delete(new Path("/testfile.txt")); } 删除group 样例:删除gid01。 private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01); }
  • OpenTSDB应用开发环境简介 在进行二次开发时,要准备的开发环境如表1所示。同时需要准备运行调测的Linux环境,用于验证应用程序运行正常。 表1 开发环境 准备 说明 操作系统 Windows系统,推荐Windows 7及以上版本。 安装JDK 开发环境的基本配置。版本要求:1.8及以上。 安装和配置Eclipse 用于开发OpenTSDB应用程序的工具。 网络 确保客户端与OpenTSDB服务主机在网络上互通。 父主题: 准备OpenTSDB应用开发环境
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} JAR_PATH为程序jar包所在路径,BrokerList格式为brokerIp:9092。 需要修改程序SecurityKafkaWordCount类中kerberos.domain.name的值为$KAFKA_HOME/config/consumer.properties文件中kerberos.domain.name配置项的值。 若用户需要对接安全Kafka,则还需要在spark客户端的conf目录下的“jaas.conf”文件中增加“KafkaClient”的配置信息,示例如下: KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab = "./user.keytab"principal="leoB@HADOOP.COM"useTicketCache=falsestoreKey=truedebug=true;}; 在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名及集群 域名
  • 安全认证代码(Scala版) 目前样例代码统一调用LoginUtil类进行安全认证。 在Spark样例工程代码中,不同的样例工程,使用的认证代码不同,基本安全认证或带ZooKeeper认证。样例工程中使用的示例认证参数如表3所示,请根据实际情况修改对应参数值。 表3 参数描述 参数 示例参数值 描述 userPrincipal sparkuser 用户用于认证的账号Principal,您可以联系管理员获取此账号。 userKeytabPath /opt/FIclient/user.keytab 用户用于认证的Keytab文件,您可以联系管理员获取文件。 krb5ConfPath /opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf krb5.conf文件路径和文件名称。 ZKServerPrincipal zookeeper/hadoop.hadoop.com ZooKeeper服务端principal。请联系管理员获取对应账号。 基本安全认证: Spark Core和Spark SQL程序不需要访问HBase或ZooKeeper,所以使用基本的安全认证代码即可。请在程序中添加如下代码,并根据实际情况设置安全认证相关参数: val userPrincipal = "sparkuser"val userKeytabPath = "/opt/FIclient/user.keytab"val krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf"val hadoopConf: Configuration = new Configuration()LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); 带ZooKeeper认证: 由于“Spark Streaming”、“通过JDBC访问Spark SQL”和“Spark on HBase”样例程序,不仅需要基础安全认证,还需要添加ZooKeeper服务端Principal才能完成安全认证。请在程序中添加如下代码,并根据实际情况设置安全认证相关参数: val userPrincipal = "sparkuser"val userKeytabPath = "/opt/FIclient/user.keytab"val krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf"val ZKServerPrincipal = "zookeeper/hadoop.hadoop.com"val ZOOKEEPER_DEFAULT_ LOG IN_CONTEXT_NAME: String = "Client"val ZOOKEEPER_SERVER_PRINCIPAL_KEY: String = "zookeeper.server.principal"val hadoopConf: Configuration = new Configuration();LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeytabPath)LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal)LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
  • 操作步骤 安全模式下,请先进行安全认证,参见准备Linux客户端环境。 提交拓扑。以wordcount为例,其它拓扑请参照相关开发指引。进入Storm客户端目录“storm-0.10.0/bin”,执行命令:storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.wordcount.WordCountTopology。 执行storm list命令,查看已经提交的应用程序,如果发现名称为word-count的应用程序,则说明任务提交成功。 如果业务设置为本地模式,且使用命令行方式提交时,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。
  • 安全认证代码(Java版) 目前样例代码统一调用LoginUtil类进行安全认证。 在Spark样例工程代码中,不同的样例工程,使用的认证代码不同,基本安全认证或带ZooKeeper认证。样例工程中使用的示例认证参数如表2所示,请根据实际情况修改对应参数值。 表2 参数描述 参数 示例参数值 描述 userPrincipal sparkuser 用户用于认证的账号Principal,您可以联系管理员获取此账号。 userKeytabPath /opt/FIclient/user.keytab 用户用于认证的Keytab文件,您可以联系管理员获取文件。 krb5ConfPath /opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf krb5.conf文件路径和文件名称。 ZKServerPrincipal zookeeper/hadoop.hadoop.com ZooKeeper服务端principal。请联系管理员获取对应账号。 基本安全认证: Spark Core和Spark SQL程序不需要访问HBase或ZooKeeper,所以使用基本的安全认证代码即可。请在程序中添加如下代码,并根据实际情况设置安全认证相关参数: String userPrincipal = "sparkuser";String userKeytabPath = "/opt/FIclient/user.keytab";String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf";Configuration hadoopConf = new Configuration();LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); 带ZooKeeper认证: 由于“Spark Streaming”、“通过JDBC访问Spark SQL”和“Spark on HBase”样例程序,不仅需要基础安全认证,还需要添加ZooKeeper服务端Principal才能完成安全认证。请在程序中添加如下代码,并根据实际情况设置安全认证相关参数: String userPrincipal = "sparkuser";String userKeytabPath = "/opt/FIclient/user.keytab";String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf";String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";Configuration hadoopConf = new Configuration();LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeytabPath);LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
  • 运行结果观察方式 样例程序工程jar包运行结果可以在logs目录下的client.log观察,默认状态下的log4j.properties没有将运行状态输出,若需要观察程序运行的信息,需将log4j.properties按如下方式配置: # Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.kafka.logs.dir=logslog4j.rootLogger=INFO, stdout, kafkaAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%nlog4j.logger.kafka=INFO, kafkaAppenderlog4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HHlog4j.appender.kafkaAppender.File=${kafka.logs.dir}/client.loglog4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayoutlog4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n# Turn on all our debugging info#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender#log4j.logger.kafka.perf=DEBUG, kafkaAppender#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG 将kafkaAppender添加到rootLogger,并将日志级别调整到需要观察的级别。
  • 示例:Maven工程打包到Linux下运行样例 执行mvn package生成jar包,在工程目录target目录下获取,比如:kafka-examples-1.6.0.jar。 执行mvn dependency:copy-dependencies -DoutputDirectory=kafka-examples-lib -DincludeScope=compile,导出kafka样例工程依赖的jar包,比如放到kafka-examples-lib目录。 在第一步指定的目录下生成一个Jar包和一个存放lib的文件夹。 将刚才生成的依赖库文件夹(此处为“kafka-examples-lib”)拷贝到MRS服务的某个Linux环境上任意目录下,例如:“/opt/example”,然后将刚才生成的jar包拷贝到“/opt/example/kafka-examples-lib”目录下。 将样例工程的conf目录拷贝到与依赖库文件夹同级目录下,即“/opt/example”目录下,并创建logs目录,用于记录jar包运行日志。 切换到root用户,将拷贝进去的conf,kafka-examples-lib,logs目录修改为omm:wheel用户组所有,执行以下命令切换用户。 sudo su - root chown -R omm:wheel /opt/example/* 切换为omm用户,进入拷贝目录下“/opt/example”,首先确保conf目录下和依赖库文件目录下的所有文件,对当前用户均具有可读权限;同时保证已安装jdk并已设置java相关环境变量,然后执行命令,如java -cp .:/opt/example/conf:/opt/example/kafka-examples-lib/* com.huawei.bigdata.kafka.example.Producer,运行样例工程。 su - omm chmod 750 /opt/example cd /opt/example java -cp .:/opt/example/conf:/opt/example/kafka-examples-lib/* com.huawei.bigdata.kafka.example.Producer
  • Presto JDBC使用样例 以下示例为Presto JDBC使用样例。 以下代码片段用于实现JDBC连接Presto TPCDS Catalog。 详情请参考PrestoJDBCExample类。 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536373839404142434445464748495051 private static Connection connection;private static Statement statement;/** * Only when Kerberos authentication enabled, configurations in presto-examples/conf/presto.properties * should be set. More details please refer to https://prestodb.io/docs/0.215/installation/jdbc.html.*/private static void initConnection(String url, boolean krbsEnabled) throws SQLException { if (krbsEnabled) { String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; File proFile = new File(filePath + "presto.properties");if (proFile.exists()) { Properties props = new Properties(); try { props.load(new FileInputStream(proFile)); } catch (IOException e) { e.printStackTrace(); } connection = DriverManager.getConnection(url, props); } } else { connection = DriverManager.getConnection(url, "presto", null); } statement = connection.createStatement();}private static void releaseConnection() throws SQLException { statement.close(); connection.close();}public static void main(String[] args) throws SQLException { try { /** * Replace example_ip with your cluster presto server ip. * By default, Kerberos authentication disabled cluster presto service port is 7520, Kerberos * authentication enabled cluster presto service port is 7521 * The postfix /tpcds/sf1 means to use tpcds catalog and sf1 schema, you can use hive catalog as well * If Kerberos authentication enabled, set the second param to true. * see PrestoJDBCExample#initConnection(java.lang.String, boolean). */ initConnection("jdbc:presto://example_ip:7520/tpcds/sf1", false); //initConnection("jdbc:presto://example_ip:7521/tpcds/sf1", true); ResultSet resultSet = statement.executeQuery("select * from call_center"); while (resultSet.next()) { System.out.println(resultSet.getString("cc_name") + " : " + resultSet.getString("cc_employees")); } } catch (SQLException e) { e.printStackTrace(); } finally { releaseConnection(); }}
  • 代码样例 如下是代码片段,详细代码请参考ExampleClient类。 /*** load configurations from alluxio-site.properties* @throws IOException*/private void loadConf() throws IOException {InputStream fileInputStream = null;alluxioConf = new Properties();File propertiesFile = new File(PATH_TO_ALLUXIO_SITE_PROPERTIES);try {fileInputStream = new FileInputStream(propertiesFile);alluxioConf.load(fileInputStream);}catch (FileNotFoundException e) {System.out.println(PATH_TO_ALLUXIO_SITE_PROPERTIES + "does not exist. Exception: " + e);}catch (IOException e) {System.out.println("Failed to load configuration file. Exception: " + e);}finally{close(fileInputStream);}}/*** build Alluxio instance*/private void instanceBuild() throws IOException {// get filesystemInstancedConfiguration conf = new InstancedConfiguration(ConfigurationUtils.defaults());conf.set(PropertyKey.MASTER_RPC_ADDRESSES, alluxioConf.get("alluxio.master.rpc.addresses"));FileSystemContext fsContext = FileSystemContext.create(conf);fSystem = FileSystem.Factory.create(fsContext);}
  • 操作步骤 登录linux环境,创建运行OpenTSDB样例的工作目录,比如“/opt/opentsdb-example”,配置文件存放目录,比如“/opt/opentsdb-example/conf”,并编辑配置文件“/opt/opentsdb-example/conf/opentsdb.properties”使其对应于实际环境中的信息。 mkdir -p /opt/opentsdb-example/conf[root@node-master1rLqO ~]# cat /opt/opentsdb-example/conf/opentsdb.propertiestsd_hostname = node-ana-corejnWttsd_port = 4242tsd_protocol = https 执行mvn package生成jar包,在工程目录target目录下获取,比如:opentsdb-examples-mrs-xxx-jar-with-dependencies.jar,其中mrs-xxx表示MRS的版本号,将获取的包上传到“/opt/opentsdb-example”目录下。 执行Jar包。 加载环境变量。 source /opt/client/bigdata_env 认证集群用户(未启用kerberos的集群可跳过此步骤)。 人机用户:kinit kerberos用户 机机用户: kinit -kt 认证文件路径 kerberos用户 运行opentsdb样例程序。 java -cp /opt/opentsdb-example/conf:/opt/opentsdb-example/opentsdb-examples-mrs-xxx-jar-with-dependencies.jar com.huawei.bigdata.opentsdb.examples.OpentsdbExample
  • 操作步骤 查看Flink应用运行结果数据。 当用户查看执行结果时,需要在Flink的web页面上查看Task Manager的Stdout日志。 当执行结果输出到文件或者其他由Flink应用程序指定途径,您可以通过指定文件或其他途径获取到运行结果数据。以下用Checkpoint、Pipeline和配置表与流JOIN为例: 查看Checkpoint结果和文件 结果在flink的“taskmanager.out”文件中,用户可以通过Flink的WebUI查看“task manager”标签下的out按钮查看。 有两种方式查看Checkpoint文件。 若将checkpoint的快照信息保存到HDFS,则通过执行hdfs dfs -ls hdfs://hacluster/flink-checkpoint/命令查看。 若将checkpoint的快照信息保存到本地文件,则可直接登录到各个节点查看。 查看Stream SQL Join结果 结果在flink的“taskmanager.out”文件中,用户可以通过Flink的WebUI查看“task manager”标签下的out按钮查看。 使用Flink Web页面查看Flink应用程序运行情况 Flink Web页面主要包括了Overview、Running Jobs、Completed Jobs、Task Managers、Job Manager和Logout等部分。 在YARN的Web UI界面,查找到对应的Flink应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入Flink Web页面。 查看程序执行的打印结果:找到对应的Task Manager,查看对应的Stdout标签日志信息。 查看Flink日志获取应用运行情况 有两种方式获取Flink日志,分别为通过Flink Web页面或者Yarn的日志 Flink Web页面可以查看Task Managers、Job Manager部分的日志。 Yarn页面主要包括了Job Manager日志以及GC日志等。 页面入口:在YARN的Web UI界面,查找到对应的Flink应用程序。单击应用信息的第一列ID,然后选择Logs列单击进去即可打开。
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“Scala Module”,然后单击“Next”。 如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本和Scala SDK,然后单击“Finish”完成工程创建。 图3 填写工程信息
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Impala表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载Impala数据。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Impala数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见分析Impala数据。
  • 常用CLI Flink常用的CLI如下所示: yarn-session.sh 可以使用yarn-session.sh启动一个常驻的Flink集群,接受来自客户端提交的任务。启动一个有3个TaskManager实例的Flink集群示例如下: bin/yarn-session.sh -n 3 yarn-session.sh的其他参数可以通过以下命令获取: bin/yarn-session.sh -help Flink 使用flink命令可以提交Flink作业,作业既可以被提交到一个常驻的Flink集群上,也可以使用单机模式运行。 提交到常驻Flink集群上的一个示例如下: bin/flink run examples/streaming/WindowJoin.jar 用户在用该命令提交任务前需要先用yarn-session启动Flink集群。 以单机模式运行作业的一个示例如下: bin/flink run -m yarn-cluster -yn 2 examples/streaming/WindowJoin.jar 通过参数-m yarn-cluster使作业以单机模式运行,-yn表示TaskManager的数量。 flink脚本的其他参数可以通过以下命令获取: bin/flink --help
  • 注意事项 如果yarn-session.sh使用-z配置特定的zookeeper的namespace,则在使用flink run时必须使用-yid指出applicationID,使用-yz指出zookeeper的namespace,前后namespace保持一致。 举例: bin/yarn-session.sh -n 3 -z YARN101 bin/flink run -yid application_****_**** -yz YARN101 examples/streaming/WindowJoin.jar 如果yarn-session.sh不使用-z配置特定的zookeeper的namespace,则在使用flink run时不要使用-yz指定特定的zookeeper的namespace。 举例: bin/yarn-session.sh -n 3bin/flink run examples/streaming/WindowJoin.jar 如果使用flink run -m yarn-cluster时启动集群则可以使用-yz指定一个zookeeper的namespace。 不能同时启动两个或两个以上的集群来共享一个namespace。 用户在启动集群或提交作业时如果使用了-z配置项,则在删除、停止及查询作业、触发savepoint时也要使用-z配置项指明namespace。
  • 问题 在omm用户(非root用户)下,通过spark-submit提交yarn-client模式的任务,会出现FileNotFoundException异常,任务还能继续执行,但无法查看Driver端日志。例如:执行命令 spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client /opt/client/Spark/spark/examples/jars/spark-examples_2.11-2.2.1-mrs-1.7.0.jar ,结果如下图所示。
  • 回答 原因分析: 在yarn-client模式下执行任务时,Spark的Driver程序在本地执行;其中通过-Dlog4j.configuration=./log4j-executor.properties配置了Driver的日志文件,log4j-executor.properties配置文件设置日志输出到${spark.yarn.app.container.log.dir}/stdout文件中,由于Spark Driver在本地执行时${spark.yarn.app.container.log.dir}没有设置即为空,则输出目录为/stdout,而非root用户下,在根目录下面没有创建和修改stdout的权限,就会报FileNotFoundException异常。而在yarn-cluster模式下执行任务时,Spark的Driver程序在Application Master下执行,而在Application Master启动时就会通过-D${spark.yarn.app.container.log.dir}设置了输出目录,因此在yarn-cluster模式执行任务不会报FileNotFoundException异常。 处理方法: 注:下面所说的$SPAKR_HOME默认是/opt/client/Spark/spark 解决方案1:手动切换日志配置文件。修改文件$SPARK_HOME/conf/spark-defaults.conf中spark.driver.extraJavaOptions的配置项-Dlog4j.configuration=./log4j-executor.properties(默认情况下为./log4j-executor.properties),在yarn-client模式下,修改为-Dlog4j.configuration=./log4j.properties,在yarn-cluster模式下修改为-Dlog4j.configuration=./log4j-executor.properties。 解决方案2:修改启动脚本$SPARK_HOME/bin/spark-class。在spark-class脚本#!/usr/bin/env bash下面添加。 # Judge mode: client and cluster; Default: clientargv=`echo $@ | tr [A-Z] [a-z]`if [[ "$argv" =~ "--master" ]];then mode=`echo $argv | sed -e 's/.*--master //'` master=`echo $mode | awk '{print $1}'` case $master in "yarn") deploy=`echo $mode | awk '{print $3}'` if [[ "$mode" =~ "--deploy-mode" ]];then deploy=$deploy else deploy="client" fi ;; "yarn-client"|"local") deploy="client" ;; "yarn-cluster") deploy="cluster" ;; esacelse deploy="client"fi# modify the spark-defaults.confnumber=`sed -n -e '/spark.driver.extraJavaOptions/=' $SPARK_HOME/conf/spark-defaults.conf`if [ "$deploy"x = "client"x ];then `sed -i "${number}s/-Dlog4j.configuration=.*properties /-Dlog4j.configuration=.\/log4j.properties /g" $SPARK_HOME/conf/spark-defaults.conf`else `sed -i "${number}s/-Dlog4j.configuration=.*properties /-Dlog4j.configuration=.\/log4j-executor.properties /g" $SPARK_HOME/conf/spark-defaults.conf`fi 这些脚本行的功能和解决方案1类似,通过判断yarn的模式来修改文件$SPARK_HOME/conf/spark-defaults.conf中spark.driver.extraJavaOptions的配置项-Dlog4j.configuration=./log4j-executor.properties。
  • Impala应用开发常用概念 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Impala的相关操作。本文中的Impala客户端特指Impala client的安装目录,里面包含通过Java API访问Impala的样例代码。 HiveQL语言 Hive Query Language,类SQL语句,与Hive类似。 Statestore Statestore管理Impala集群中所有的Impalad实例的健康状态,并将实例健康信息广播到所有实例上。当某一个Impalad实例发生故障,比如节点异常、网络异常等,Statestore将通知其他Impalad实例,后续的查询请求等将不会向该实例分发。 Catalog Catalog实例服务将每个Impalad实例上发生的元数据变动同步到集群内其他Impalad实例,从而避免在一个Impalad实例中更改元数据,其他各个实例需要执行REFRESH操作来更新。但是,在Hive中建表,修改表等,则需要执行REFRESH或者INVALIDATE METADATA操作。 父主题: Impala应用开发概述
  • JDBC客户端运行及结果查看 执行mvn clean compile assembly:single生成jar包,在工程目录target目录下获取,比如:presto-examples-1.0-SNAPSHOT-jar-with-dependencies.jar。 在运行调测环境上创建一个目录作为运行目录,如或“/opt/presto_examples”(Linux环境),并在该目录下创建子目录“conf”。 将1导出的presto-examples-1.0-SNAPSHOT-jar-with-dependencies.jar拷贝到“/opt/presto_examples”下。 开启Kerberos认证集群需要将4获取的user.keytab和krb5.conf拷贝到的/opt/presto_examples/conf下,并修改样例代码中conf目录下的presto.preperties。未开启Kerberos认证集群无须执行此步骤。 表1 presto.preperties参数说明 参数 说明 user 用于Kerberos认证的用户名,即准备Presto应用开发用户中创建的开发用户的用户名。 KerberosPrincipal 用于认证的名字,即认证准备Presto应用开发用户中创建的开发用户的用户名。 KerberosConfigPath krb5.conf的路径。 KerberosKeytabPath user.keytab的路径。 presto.preperties样例 user = prestouserSSL = trueKerberosRemoteServiceName = HTTPKerberosPrincipal = prestouserKerberosConfigPath = /opt/presto_examples/conf/krb5.confKerberosKeytabPath = /opt/presto_examples/conf/user.keytab 在Linux环境下执行运行样例程序。 chmod +x /opt/presto_examples -R cd /opt/presto_examplesjava -jar presto-examples-1.0-SNAPSHOT-jar-with-dependencies.jar 在命令行终端查看样例代码所查询出的结果。 Linux环境运行成功结果会有如下信息: NY Metro : 2Mid Atlantic : 6Mid Atlantic : 6North Midwest : 1North Midwest : 3North Midwest : 7
  • Flink应用开发常用概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过socket、Kafka和文件等形式导入,在Flink系统处理后,通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 Topology 一个Topology代表用户的一个执行任务。一个Topology由输入(如kafka soruce)、输出(如kafka sink)和多个Data Transformation组成。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。 父主题: Flink应用开发概述
  • HTTP REST API Master REST API:https://docs.alluxio.io/os/restdoc/2.0/master/index.html Worker REST API:https://docs.alluxio.io/os/restdoc/2.0/worker/index.html Proxy REST API:https://docs.alluxio.io/os/restdoc/2.0/proxy/index.html Job REST API:https://docs.alluxio.io/os/restdoc/2.0/job/index.html
  • Alluxio应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Alluxio应用程序开发流程 表1 Alluxio应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Alluxio的基本概念。 Alluxio常用概念 准备开发和运行环境 Alluxio的客户端程序当前推荐使用java语言进行开发,并使用Maven工具构建工程。样例程序运行环境即MRS服务所VPC集群的节点。 Alluxio开发环境简介 根据场景开发工程 提供了Java语言的样例工程和数据查询的样例工程。 Alluxio样例程序开发思路 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 调测Alluxio应用 父主题: Alluxio应用开发概述
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全