云服务器内容精选

  • 通过DataFrame API访问数据源 构造schema 1 2 3 4 5 6 7 8 9 10 val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) 根据schema的类型构造数据 1 2 val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1) 导入数据到HBase 1 sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") 读取HBase上的数据 1 2 3 4 5 6 7 8 val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show() 返回结果:
  • 准备运行调测环境 在弹性云服务器管理控制台,申请一个新的弹性云服务器,用于应用开发、运行、调测。 弹性云服务器的安全组需要和MRS集群Master节点的安全组相同。 弹性云服务器的VPC需要与MRS集群在同一个VPC中。 弹性云服务器的网卡需要与MRS集群在同一个网段中。 申请弹性IP,绑定新申请的ECS的IP,并配置安全组出入规则。 下载客户端程序,请参考下载MRS客户端。 登录存放下载的客户端的节点,再安装客户端。 执行以下命令解压客户端包: cd /opt tar -xvf /opt/MRS_Services_Client.tar 执行以下命令校验安装文件包: sha256sum -c /opt/MRS_Services_ClientConfig.tar.sha256 MRS_Services_ClientConfig.tar:OK 执行以下命令解压安装文件包: tar -xvf /opt/MRS_Services_ClientConfig.tar 执行如下命令安装客户端到指定目录(绝对路径),例如“/opt/client”。目录会自动创建。 cd /opt/MRS_Services_ClientConfig sh install.sh /opt/client Components client installation is complete.
  • 场景说明 假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,如表1所示,A业务操作流程如下: 创建用户信息表。 在用户信息中新增用户的学历、职称等信息。 根据用户编号查询用户姓名和地址。 根据用户姓名进行查询。 查询年龄段在[20–29]之间的用户信息。 数据统计,统计用户信息表的人员数、年龄最大值、年龄最小值、平均年龄。 用户销户,删除用户信息表中该用户的数据。 A业务结束后,删除用户信息表。 表1 用户信息 编号 姓名 性别 年龄 地址 12005000201 Zhang San Male 19 Shenzhen City, Guangdong Province 12005000202 Li Wanting Female 23 Hangzhou City, Zhejiang Province 12005000203 Wang Ming Male 26 Ningbo City, Zhejiang Province 12005000204 Li Gang Male 18 Xiangyang City, Hubei Province 12005000205 Zhao Enru Female 21 Shangrao City, Jiangxi Province 12005000206 Chen Long Male 32 Zhuzhou City, Hunan Province 12005000207 Zhou Wei Female 29 Nanyang City, Henan Province 12005000208 Yang Yiwen Female 30 Wenzhou City, Zhejiang Province 12005000209 Xu Bing Male 26 Weinan City, Shaanxi Province 12005000210 Xiao Kai Male 25 Dalian City, Liaoning Province
  • 功能分解 根据上述的业务场景进行功能分解,需要开发的功能点如表2所示。 表2 在HBase中开发的功能 序号 步骤 代码实现 1 根据表1中的信息创建表。 请参见创建HBase表。 2 导入用户数据。 请参见插入HBase数据。 3 增加“教育信息”列族,在用户信息中新增用户的学历、职称等信息。 请参见修改HBase表。 4 根据用户编号查询用户姓名和地址。 请参见使用Get读取HBase数据。 5 根据用户姓名进行查询。 请参见使用HBase过滤器Filter。 6 用户销户,删除用户信息表中该用户的数据。 请参见删除HBase数据。 7 A业务结束后,删除用户信息表。 请参见删除HBase表。
  • 操作步骤 运行结果会有如下成功信息: ... 2020-01-09 10:43:49,338 INFO [main] examples.HBaseExample: Entering dropTable. 2020-01-09 10:43:49,341 INFO [main] client.HBaseAdmin: Started disable of hbase_sample_table 2020-01-09 10:43:50,080 INFO [main] client.HBaseAdmin: Operation: DISABLE, Table Name: default:hbase_sample_table, procId: 41 completed 2020-01-09 10:43:50,550 INFO [main] client.HBaseAdmin: Operation: DELETE, Table Name: default:hbase_sample_table, procId: 43 completed 2020-01-09 10:43:50,550 INFO [main] examples.HBaseExample: Drop table successfully. 2020-01-09 10:43:50,550 INFO [main] examples.HBaseExample: Exiting dropTable. 2020-01-09 10:43:50,550 INFO [main] client.ConnectionImplementation: Closing master protocol: MasterService 2020-01-09 10:43:50,556 INFO [main] examples.TestMain: -----------finish to test HBase API------------------- 在Windows环境运行样例代码时会出现下面的异常,但是不影响业务: java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 日志说明 日志级别默认为INFO,可以通过调整日志打印级别(DEBUG,INFO,WARN,ERROR,FATAL)来显示更详细的信息。可以通过修改log4j.properties文件来实现,如: hbase.root.logger=INFO,console log4j.logger.org.apache.zookeeper=INFO #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG log4j.logger.org.apache.hadoop.hbase=INFO # Make these two classes DEBUG-level. Make them DEBUG to see more zk debug. log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
  • 场景说明 假定某个业务Kafka每30秒就会收到5个用户的消费记录。Hbase的table1表存储用户历史消费的金额信息。 现table1表有10条记录,表示有用户名分别为1-10的用户,用户的历史消费金额初始化都是0元。 基于某些业务要求,开发的Spark应用程序实现如下功能: 实时累加计算用户的消费金额信息:即用户总消费金额=用户的消费金额(kafka数据) + 用户历史消费金额(table1表的值),更新到table1表。
  • 数据规划 创建HBase表,并插入数据。 通过HBase创建名为table1的表,命令如下。 create 'table1', 'cf' 通过HBase执行如下命令,将数据插入table1表中。 put 'table1', '1', 'cf:cid', '0' put 'table1', '2', 'cf:cid', '0' put 'table1', '3', 'cf:cid', '0' put 'table1', '4', 'cf:cid', '0' put 'table1', '5', 'cf:cid', '0' put 'table1', '6', 'cf:cid', '0' put 'table1', '7', 'cf:cid', '0' put 'table1', '8', 'cf:cid', '0' put 'table1', '9', 'cf:cid', '0' put 'table1', '10', 'cf:cid', '0' Spark Streaming样例工程的数据存储在Kafka中。 确保集群安装完成,包括HDFS、Yarn、Spark。 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动样例代码的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行Spark应用章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient/*:{JAR_PATH} com.huawei.bigdata.spark.examples.streaming.StreamingExampleProducer {BrokerList} {Topic} 如果开启了kerberos认证,需要将客户端的配置文件“spark-defaults.conf”和sparkJDBC服务端中的配置项spark.yarn.security.credentials.hbase.enabled置为true。 {zkQuorum}格式为zkIp:2181。 JAR_PATH为程序jar包所在路径。 brokerlist格式为brokerIp:9092。
  • HBase应用开发常用概念 过滤器 过滤器提供了非常强大的特性来帮助用户提高HBase处理表中数据的效率。用户不仅可以使用HBase中预定义好的过滤器,而且可以实现自定义的过滤器。 协处理器 允许用户执行region级的操作,并且可以使用与RDBMS中触发器类似的功能。 Client 客户端直接面向用户,可通过Java API或HBase Shell访问服务端,对HBase的表进行读写操作。本文中的HBase客户端特指从装有HBase服务的MRS Manager上下载的HBase client安装包,里面包含通过Java API访问HBase的样例代码。 父主题: HBase应用开发概述
  • 查看HBase应用调测结果 HBase应用程序运行完成后,可直接通过运行结果查看应用程序运行情况,也可以通过HBase日志获取应用运行情况。 运行结果会有如下成功信息: 2018-01-17 19:44:28,068 INFO [main] examples.HBaseExample: Entering dropTable. 2018-01-17 19:44:28,074 INFO [main] client.HBaseAdmin: Started disable of hbase_sample_table 2018-01-17 19:44:30,310 INFO [main] client.HBaseAdmin: Disabled hbase_sample_table 2018-01-17 19:44:31,727 INFO [main] client.HBaseAdmin: Deleted hbase_sample_table 2018-01-17 19:44:31,727 INFO [main] examples.HBaseExample: Drop table successfully. 2018-01-17 19:44:31,727 INFO [main] examples.HBaseExample: Exiting dropTable. 2018-01-17 19:44:31,727 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 2018-01-17 19:44:31,733 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x13002d37b3933708 2018-01-17 19:44:31,736 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x13002d37b3933708 2018-01-17 19:44:31,737 INFO [main] zookeeper.ZooKeeper: Session: 0x13002d37b3933708 closed 2018-01-17 19:44:31,750 INFO [main] examples.TestMain: -----------finish HBase ------------------- 父主题: 在Linux中调测HBase应用
  • 调测HBase Python样例程序 仅MRS 1.9.x及之前版本支持HBase python样例代码调测。 HBase支持使用自带的ThriftServer2服务通过python来访问HBase服务。python样例仅支持在Linux环境中运行,调测HBase python样例程序需有与集群环境网络相通的ECS,详情请参见准备本地应用开发环境,并需要安装python环境,安装包下载详情请参见:https://www.python.org/。当前以在集群的master节点上运行样例为例。 搭建样例运行环境。 获取运行样例程序时python依赖,请从https://pypi.org/地址中搜索下载decorator、gssapi、kerberos、krbcontext、pure-sasl、thrift包(未开启Kerberos认证的普通集群仅需安装thrift包),并上传到master节点上,例如:新建目录“/opt/hbase-examples/python”,并上传到该目录下。 decorator-4.3.2.tar.gz gssapi-1.5.1.tar.gz kerberos-1.3.0.tar.gz krbcontext-0.8.tar.gz pure-sasl-0.6.1.tar.gz thrift-0.11.0.tar.gz 将样例工程中的“hbase-python-example”文件夹上传到集群Master节点的“/opt/hbase-examples”下,并上传从准备HBase应用开发用户中获取的认证文件至该目录下。 在“/opt/hbase-examples”新建hbasepython.properties文件,并修改配置内容如下。 clientHome=/opt/client exampleCodeDir=/opt/hbase-examples/hbase-python-example pythonLib=/opt/hbase-examples/python keyTabFile=/opt/hbase-examples/user.keytab userName=hbaseuser thriftIp=xxx.xxx.xx.xxx clientHome为集群客户端路径 exampleCodeDir为hbase-python-example文件路径 pythonLib为1中python依赖存放路径 keyTabFile为从准备HBase应用开发用户获取的用户认证凭据user.keytab userName为准备HBase应用开发用户中开发用户名 thriftIp为安装了thriftserver2的节点的IP地址 执行如下命令创建表名为example的HBase表。 source /opt/client/bigdata_env kinit 用户名 echo "create 'example','family1','family2'" | hbase shell 安装python环境并运行程序。 cd /opt/hbase-examples/hbase-python-example sh initEnvAndRunDemo.sh /opt/hbase-examples/hbasepython.properties 在运行程序之前需要格式化initEnvAndRunDemo.sh脚本。例如执行,dos2unix /opt/hbase-examples/hbasepython.properties 在执行脚本前,请确定“example”表包含列族'family1','family2'并已经存在于集群中。 再次运行程序时只需进入“/opt/hbase-examples/hbase-python-example”目录下,执行如下命令执行程序调测样例:python DemoClient.py HBase Python应用程序运行完成后,可直接通过运行结果查看应用程序运行情况。 图1 程序运行成功信息 父主题: 调测HBase应用
  • HBase简介 HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase设计目标是用来解决关系型数据库在处理海量数据时的局限性。 HBase使用场景有如下几个特点: 处理海量数据(TB或PB级别以上)。 具有高吞吐量。 在海量数据中实现高效的随机读取。 具有很好的伸缩能力。 能够同时处理结构化和非结构化的数据。 不需要完全拥有传统关系型数据库所具备的ACID特性。ACID特性指原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。 HBase中的表具有如下特点: 大:一个表可以有上亿行,上百万列。 面向列:面向列(族)的存储和权限控制,列(族)独立检索。 稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
  • 接口类型简介 由于HBase本身是由java语言开发出来的,且java语言具有简洁通用易懂的特性,推荐用户使用java语言进行HBase应用程序开发。 HBase采用的接口与Apache HBase保持一致,请参见:http://hbase.apache.org/apidocs/index.html。 HBase通过接口调用,可提供的功能如表1所示。 表1 HBase接口提供的功能 功能 说明 CRUD数据读写功能 增查改删 高级特性 过滤器、协处理器 管理功能 表管理、集群管理
  • 使用场景 HBase文件存储模块(HBase FileStream,简称HFS)是HBase的独立模块,它作为对HBase与HDFS接口的封装,应用在MRS的上层应用,为上层应用提供文件的存储、读取、删除等功能。 在Hadoop生态系统中,无论是HDFS,还是HBase,在面对海量文件存储的时候,在某些场景下,都会存在一些很难解决的问题: 如果把海量小文件直接保存在HDFS中,会给NameNode带来极大的压力。 由于HBase接口以及内部机制的原因,一些较大的文件也不适合直接保存到HBase中。 HFS的出现,就是为了解决需要在Hadoop中存储海量小文件,同时也要存储一些大文件的混合场景。简单来说,就是在HBase表中,需要存放大量的小文件(10MB以下),同时又需要存放一些比较大的文件(10MB以上)。 HFS为以上场景提供了统一的操作接口,这些操作接口与HBase的函数接口类似。必须在HBase的配置参数“hbase.coprocessor.master.classes”中增加一个值:“org.apache.hadoop.hbase.filestream.coprocessor.FileStreamMasterObserver”。 如果只有小文件,确定不会有大文件的场景下,建议使用HBase的原始接口进行操作。 HFS接口需要同时对HBase和HDFS进行操作,所以客户端用户需要同时拥有这两个组件的操作权限。 直接存放在HDFS中的大文件,HFS在存储时会加入一些元数据信息,所以存储的文件不是直接等于原文件的。不能直接从HDFS中移动出来使用,而需要用HFS的接口进行读取。 使用HFS接口存储在HDFS中的数据,暂不支持备份与容灾。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的dropTable方法中 public void dropTable() { LOG.info("Entering dropTable."); Admin admin = null; try { admin = conn.getAdmin(); if (admin.tableExists(tableName)) { // Disable the table before deleting it. admin.disableTable(tableName);//注[1] // Delete table. admin.deleteTable(tableName); } LOG.info("Drop table successfully."); } catch (IOException e) { LOG.error("Drop table failed ", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed ", e); } } } LOG.info("Exiting dropTable."); }
  • 代码样例 hbase.root.logger=INFO,console,RFA //hbase客户端日志输出配置,console:输出到控制台;RFA:输出到日志文件 hbase.security.logger=DEBUG,console,RFAS //hbase客户端安全相关的日志输出配置,console:输出到控制台;RFAS:输出到日志文件 hbase.log.dir=/var/log/Bigdata/hbase/client/ //日志路径,根据实际路径修改,但目录要有写入权限 hbase.log.file=hbase-client.log //日志文件名 hbase.log.level=INFO //日志级别,如果需要更详细的日志定位问题,需要修改为DEBUG,修改完需要重启进程才能生效 hbase.log.maxbackupindex=20 //最多保存的日志文件数目 # Security audit appender hbase.security.log.file=hbase-client-audit.log //审计日志文件命令