云服务器内容精选

  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略一来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locator所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个......)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 - HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 如果需要对colocation上传的文件做balance操作,为避免colocation失效,可以通过MRS Manager界面中的oi.dfs.colocation.file.pattern参数进行设置,设置该参数值为对应数据文件块的路径,多个路径之间以逗号分开。例如/test1,/test2。
  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要将HDFS用户绑定supergroup用户组。 初始化 使用Colocation前需要进行kerberos安全认证。 private static void init() throws IOException { LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 public static void main(String[] args) throws IOException { init(); dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); createGroup(); put(); delete(); deleteGroup(); dfs.close(); dfsAdmin.close(); } 创建group 样例:创建一个gid01组,组中包含3个locator。 private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); } 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path("/testfile.txt"), true, COLOCATION_GROUP_GROUP01, "lid01"); // 待写入到HDFS的数据. byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); } 删除文件 样例:删除testfile.txt文件。 public static void delete() throws IOException { dfs.delete(new Path("/testfile.txt")); } 删除group 样例:删除gid01。 private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01); }
  • Client Alluxio Client主要包括三种方式:Java API、Shell、HTTP REST API。 Java API 提供Alluxio文件系统的应用接口,本开发指南主要介绍如何使用Java API进行Alluxio客户端的开发。 Shell 提供shell命令完成Alluxio文件系统的基本操作。 HTTP REST API 提供除Shell、Java API以外的其他接口,可通过此接口查询信息,具体请参考Alluxio API接口介绍。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的addIndicesExample方法中: addIndices(): 将索引添加到没有数据的表中 public void addIndicesExample() { LOG.info("Entering Adding a Hindex."); // Create index instance TableIndices tableIndices = new TableIndices(); HIndexSpecification spec = new HIndexSpecification(indexNameToAdd); spec.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.STRING); tableIndices.addIndex(spec); Admin admin = null; HIndexAdmin iAdmin = null; try { admin = conn.getAdmin(); iAdmin = HIndexClient.newHIndexAdmin(admin); // add index to the table iAdmin.addIndices(tableName, tableIndices); // Alternately, add the specified indices with data // iAdmin.addIndicesWithData(tableName, tableIndices); LOG.info("Successfully added indices to the table " + tableName); } catch (IOException e) { LOG.error("Add Indices failed for table " + tableName + "." + e); } finally { if (iAdmin != null) { try { // Close the HIndexAdmin object. iAdmin.close(); } catch (IOException e) { LOG.error("Failed to close HIndexAdmin ", e); } } if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting Adding a Hindex."); } 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的addIndicesExampleWithData方法中: addIndicesWithData():将索引添加到具有大量预先存在数据的表中 public void addIndicesExampleWithData() { LOG.info("Entering Adding a Hindex With Data."); // Create index instance TableIndices tableIndices = new TableIndices(); HIndexSpecification spec = new HIndexSpecification(indexNameToAdd); spec.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.STRING); tableIndices.addIndex(spec); Admin admin = null; HIndexAdmin iAdmin = null; try { admin = conn.getAdmin(); iAdmin = HIndexClient.newHIndexAdmin(admin); // add index to the table iAdmin.addIndicesWithData(tableName, tableIndices); // Alternately, add the specified indices with data // iAdmin.addIndicesWithData(tableName, tableIndices); LOG.info("Successfully added indices to the table " + tableName); } catch (IOException e) { LOG.error("Add Indices failed for table " + tableName + "." + e); } finally { if (iAdmin != null) { try { // Close the HIndexAdmin object. iAdmin.close(); } catch (IOException e) { LOG.error("Failed to close HIndexAdmin ", e); } } if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting Adding a Hindex With Data."); }
  • Kafka应用开发常用概念 Topic Kafka维护的同一类的消息称为一个Topic。 Partition 每一个Topic可以被分为多个Partition,每个Partition对应一个可持续追加的、有序不可变的log文件。 Producer 将消息发往Kafka topic中的角色称为Producer。 Consumer 从Kafka topic中获取消息的角色称为Consumer。 Broker Kafka集群中的每一个节点服务器称为Broker。 父主题: Kafka应用开发概述
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Impala表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载Impala数据。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Impala数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见分析Impala数据。
  • 准备认证机制代码 在开启Kerberos认证的环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。Kafka应用开发需要进行Kafka、ZooKeeper、Kerberos的安全认证,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。提供了LoginUtil相关接口来完成这些配置,如下样例代码中只需要配置用户自己申请的账号名称和对应的keytab文件名称即可,由于人机账号的keytab会随用户密码过期而失效,故建议使用机机账号进行配置。 认证样例代码: 设置keytab认证文件模块 /** * 用户自己申请的账号keytab文件名称 */ private static final String USER_KEYTAB_FILE = "用户自己申请的账号keytab文件名称"; /** * 用户自己申请的账号名称 */ private static final String USER_PRINCIPAL = "用户自己申请的账号名称"; MRS服务Kerberos认证模块,如果服务没有开启kerberos认证,这块逻辑不执行 public static void securityPrepare() throws IOException { String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; String krbFile = filePath + "krb5.conf"; String userKeyTableFile = filePath + USER_KEYTAB_FILE; //windows路径下分隔符替换 userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); krbFile = krbFile.replace("\\", "\\\\"); LoginUtil.setKrb5Config(krbFile); LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); } 如果修改了集群kerberos域名,需要在代码中增加kerberos.domain.name的配置,并按照hadoop.expr=toLowerCase(%{default_realm}%{KerberosServer})规则配置正确的域名信息。例如:修改域名为HUAWEI.COM,则配置为hadoop.huawei.com。
  • 常用CLI Flink常用的CLI如下所示: yarn-session.sh 可以使用yarn-session.sh启动一个常驻的Flink集群,接受来自客户端提交的任务。启动一个有3个TaskManager实例的Flink集群示例如下: bin/yarn-session.sh -n 3 yarn-session.sh的其他参数可以通过以下命令获取: bin/yarn-session.sh -help Flink 使用flink命令可以提交Flink作业,作业既可以被提交到一个常驻的Flink集群上,也可以使用单机模式运行。 提交到常驻Flink集群上的一个示例如下: bin/flink run examples/streaming/WindowJoin.jar 用户在用该命令提交任务前需要先用yarn-session启动Flink集群。 以单机模式运行作业的一个示例如下: bin/flink run -m yarn-cluster -yn 2 examples/streaming/WindowJoin.jar 通过参数-m yarn-cluster使作业以单机模式运行,-yn表示TaskManager的数量。 flink脚本的其他参数可以通过以下命令获取: bin/flink --help
  • 注意事项 如果yarn-session.sh使用-z配置特定的zookeeper的namespace,则在使用flink run时必须使用-yid指出applicationID,使用-yz指出zookeeper的namespace,前后namespace保持一致。 举例: bin/yarn-session.sh -n 3 -z YARN101 bin/flink run -yid application_****_**** -yz YARN101 examples/streaming/WindowJoin.jar 如果yarn-session.sh不使用-z配置特定的zookeeper的namespace,则在使用flink run时不要使用-yz指定特定的zookeeper的namespace。 举例: bin/yarn-session.sh -n 3 bin/flink run examples/streaming/WindowJoin.jar 如果使用flink run -m yarn-cluster时启动集群则可以使用-yz指定一个zookeeper的namespace。 不能同时启动两个或两个以上的集群来共享一个namespace。 用户在启动集群或提交作业时如果使用了-z配置项,则在删除、停止及查询作业、触发savepoint时也要使用-z配置项指明namespace。
  • 问题 使用运行的Spark Streaming任务回写kafka时,kafka上接收不到回写的数据,且kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从kafka中读取数据,执行对应处理之后,然后将结果数据回写至kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将kafka的阈值调大,建议在MRS Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 功能分解 根据上述场景进行功能分解,如表1所示。 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Storm Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Storm Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Storm Bolt 4 创建topology 请参见创建Storm Topology 部分代码请参考创建Storm Spout,创建Storm Bolt,创建Storm Topology,完整代码请参考Storm-examples示例工程。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • Presto JDBC使用样例 下面的代码片段在PrestoJDBCExample类中,用于实现JDBC连接Presto TPCDS Catalog。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private static Connection connection; private static Statement statement; /** * Only when Kerberos authentication enabled, configurations in presto-examples/conf/presto.properties * should be set. More details please refer to https://prestodb.io/docs/0.215/installation/jdbc.html. */ private static void initConnection(String url, boolean krbsEnabled) throws SQLException { if (krbsEnabled) { String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; File proFile = new File(filePath + "presto.properties");if (proFile.exists()) { Properties props = new Properties(); try { props.load(new FileInputStream(proFile)); } catch (IOException e) { e.printStackTrace(); } connection = DriverManager.getConnection(url, props); } } else { connection = DriverManager.getConnection(url, "presto", null); } statement = connection.createStatement(); } private static void releaseConnection() throws SQLException { statement.close(); connection.close(); } public static void main(String[] args) throws SQLException { try { /** * Replace example_ip with your cluster presto server ip. * By default, Kerberos authentication disabled cluster presto service port is 7520, Kerberos * authentication enabled cluster presto service port is 7521 * The postfix /tpcds/sf1 means to use tpcds catalog and sf1 schema, you can use hive catalog as well * If Kerberos authentication enabled, set the second param to true. * see PrestoJDBCExample#initConnection(java.lang.String, boolean). */ initConnection("jdbc:presto://example_ip:7520/tpcds/sf1", false); //initConnection("jdbc:presto://example_ip:7521/tpcds/sf1", true); ResultSet resultSet = statement.executeQuery("select * from call_center"); while (resultSet.next()) { System.out.println(resultSet.getString("cc_name") + " : " + resultSet.getString("cc_employees")); } } catch (SQLException e) { e.printStackTrace(); } finally { releaseConnection(); } }
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';