华为云用户手册

  • 注意事项 当前二级索引不支持使用SubstringComparator类定义的对象作为Filter的比较器。 例如,如下示例中的用法当前不支持: Scan scan = new Scan();filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier),CompareOperator.EQUAL, new SubstringComparator(substring)));scan.setFilter(filterList);
  • 功能介绍 针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。 HIndex支持的Filter类型为“SingleColumnValueFilter”,“SingleColumnValueExcludeFilter”以及“SingleColumnValuePartitionFilter”。 HIndex支持的Comparator为“BinaryComparator”,“BitComparator”,“LongComparator”,“DecimalComparator”,“DoubleComparator”,“FloatComparator”,“IntComparator”,“NullComparator”。 二级索引的使用规则如下: 针对某一列或者多列创建了单索引的场景下: 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1) 针对多个列创建的联合索引场景下: 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1)) 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。
  • 回答 建议将"blob.storage.directory"配置选项设置成“/tmp”或者“/opt/huawei/Bigdata/tmp”。 当用户将"blob.storage.directory"配置选项设置成自定义目录时,需要手动赋予用户该目录的owner权限。以下以 FusionInsight 的admin用户为例。 修改Flink客户端配置文件conf/flink-conf.yaml,配置blob.storage.directory: /home/testdir/testdirdir/xxx。 创建目录/home/testdir(创建一层目录即可),设置该目录为admin用户所属。 图1 创建目录 /home/testdir/下的testdirdir/xxx目录在启动Flink集群时会在每个节点下自动创建。 进入客户端路径,执行命令./bin/yarn-session.sh -jm 2048 -tm 3072,可以看到yarn-session正常启动并且成功创建目录。 图2 执行命令
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testCreateTable方法中。 /** * Create Table */ public void testCreateTable() { LOG .info("Entering testCreateTable."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Create table String createTableSQL = "CREATE TABLE IF NOT EXISTS TEST (id integer not null primary key, name varchar, " + "account char(6), birth date)"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()) { // Execute Create SQL stat.executeUpdate(createTableSQL); LOG.info("Create table successfully."); } catch (Exception e) { LOG.error("Create table failed.", e); } LOG.info("Exiting testCreateTable."); } /** * Drop Table */ public void testDrop() { LOG.info("Entering testDrop."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Delete table String dropTableSQL = "DROP TABLE TEST"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()) { stat.executeUpdate(dropTableSQL); LOG.info("Drop successfully."); } catch (Exception e) { LOG.error("Drop failed.", e); } LOG.info("Exiting testDrop."); }
  • 注意事项 如果yarn-session.sh使用-z配置特定的zookeeper的namespace,则在使用flink run时必须使用-yid指出applicationID,使用-yz指出zookeeper的namespace,前后namespace保持一致。 举例: bin/yarn-session.sh -z YARN101 bin/flink run -yid application_****_**** -yz YARN101 examples/streaming/WindowJoin.jar
  • 准备本地应用开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的: X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发HBase应用程序的工具,版本要求:2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Junit插件 开发环境的基本配置。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件。 支持7-Zip 16.04版本。 父主题: 准备HBase应用开发环境
  • 注意事项 注[1]:创建联合索引 HBase支持在多个字段上创建二级索引,例如在列name和age上。 HIndexSpecification iSpecUnite = new HIndexSpecification(indexName); iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String); iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.String);
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的dropTable方法中。 public void dropTable() { LOG.info("Entering dropTable."); Admin admin = null; try { admin = conn.getAdmin(); if (admin.tableExists(tableName)) { // Disable the table before deleting it. admin.disableTable(tableName); // Delete table. admin.deleteTable(tableName);//注[1] } LOG.info("Drop table successfully."); } catch (IOException e) { LOG.error("Drop table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting dropTable."); }
  • 问题 Flink任务配置State Backend为RocksDB时,运行报如下错误: Caused by: java.lang.UnsatisfiedLinkError: /srv/BigData/hadoop/data1/nm/usercache/***/appcache/application_****/rocksdb-lib-****/librocksdbjni-linux64.so: /lib64/libpthread.so.0: version `GLIBC_2.12` not found (required by /srv/BigData/hadoop/***/librocksdbjni-linux64.so)at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1965) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1890) at java.lang.Runtime.load0(Runtime.java:795) at java.lang.System.load(System.java:1062) at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:734)... 11 more
  • 配置安全登录 请根据实际情况,在“com.huawei.bigdata.hbase.examples”包的“TestMain”类中修改“userName”为实际用户名,例如“developuser”。 private static void login() throws IOException { if (User.isHBaseSecurityEnabled(conf)) { userName = "developuser"; //In Windows environment String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator; //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; /* * if need to connect zk, please provide jaas info about zk. of course, * you can do it as below: * System.setProperty("java.security.auth.login.config", confDirPath + * "jaas.conf"); but the demo can help you more : Note: if this process * will connect more than one zk cluster, the demo may be not proper. you * can contact us for more help */ LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName, userKeytabFile); LoginUtil.login(userName, userKeytabFile, krb5File, conf); }}
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。HBase应用开发需要进行ZooKeeper和Kerberos安全认证。用于ZooKeeper认证的文件为“jaas.conf”,用于Kerberos安全认证文件为keytab文件和krb5.conf文件。具体使用方法在样例代码的“README.md”中会有详细说明。 安全认证主要采用代码认证方式。支持Oracle JAVA平台和IBM JAVA平台。 以下代码在“com.huawei.bigdata.hbase.examples”包的“TestMain”类中。
  • 提供分流能力 表8 提供分流能力的相关接口 API 说明 def split(selector: OutputSelector[T]): SplitStream[T] 传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 def select(outputNames: String*): DataStream[T] 从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 提供设置eventtime属性的能力 表6 提供设置eventtime属性的能力的相关接口 API 说明 def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 为了能让event time窗口可以正常触发窗口计算操作,需要从记录中提取时间戳。 def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]
  • 提供Join能力 表12 提供Join能力的相关接口 API 说明 def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] 通过给定的key在一个窗口范围内join两条数据流。 join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。 def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] 通过给定的key在一个窗口范围内co-group两条数据流。 coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testPut方法中。 /** * Put data */ public void testPut() { LOG.info("Entering testPut."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Insert String upsertSQL = "UPSERT INTO TEST VALUES(1,'John','100000', TO_DATE('1980-01-01','yyyy-MM-dd'))"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()){ // Execute Update SQL stat.executeUpdate(upsertSQL); conn.commit(); LOG.info("Put successfully."); } catch (Exception e) { LOG.error("Put failed.", e); } LOG.info("Exiting testPut."); }
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 问题 Flink内核升级到1.3.0之后,当Kafka调用带有非static的KafkaPartitioner类对象为参数的FlinkKafkaProducer010去构造函数时,运行时会报错。 报错内容如下: org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.
  • 代码样例 下面代码片段在com.huawei.bigdata.hbase.examples包的“TestMain”类的init方法中。 private static void init() throws IOException { // Default load from conf directory conf = HBaseConfiguration.create(); //In Windows environment String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator; //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; conf.addResource(new Path(userdir + "core-site.xml"), false); conf.addResource(new Path(userdir + "hdfs-site.xml"), false); conf.addResource(new Path(userdir + "hbase-site.xml"), false); }
  • 功能简介 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名。创建表有两种方式(强烈建议采用预分Region建表方式): 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 回答 首先查看ZooKeeper中/flink_base的目录权限是否为:'world,'anyone: cdrwa;如果不是,请修改/flink_base的目录权限为:'world,'anyone: cdrwa,然后继续根据步骤二排查;如果是,请根据步骤二排查。 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink_base/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink_base/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testCreateTable方法中。 public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(tableName);(1) // Set the column family name to info. ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"));(2) // Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SNAANPPY hcd.setCompressionType(Compression.Algorithm.SNAPPY);//注[1] htd.setColumnFamily(hcd.build()); (3) Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); (4) if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd.build());//注[2] (5) LOG.info(admin.getClusterMetrics().toString()); LOG.info(admin.listNamespaceDescriptors().toString()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin " ,e); } } } LOG.info("Exiting testCreateTable."); }
  • 注意事项 注[1] 可以设置列族的压缩方式,代码片段如下: //设置编码算法,HBase提供了DIFF,FAST_DIFF,PREFIX三种编码算法。 hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); //设置文件压缩方式,HBase默认提供了GZ和SNAPPY两种压缩算法 //其中GZ的压缩率高,但压缩和解压性能低,适用于冷数据 //SNAPPY压缩率低,但压缩解压性能高,适用于热数据 //建议默认开启SNAPPY压缩 hcd.setCompressionType(Compression.Algorithm.SNAPPY); 注[2] 可以通过指定起始和结束RowKey,或者通过RowKey数组预分Region两种方式建表,代码片段如下: // 创建一个预划分region的表 byte[][] splits = new byte[4][]; splits[0] = Bytes.toBytes("A"); splits[1] = Bytes.toBytes("H"); splits[2] = Bytes.toBytes("O"); splits[3] = Bytes.toBytes("U"); admin.createTable(htd, splits);
  • HBase简介 HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase设计目标是解决关系型数据库在处理海量数据时的局限性。 HBase使用场景有如下几个特点: 处理海量数据(TB或PB级别以上)。 具有高吞吐量。 在海量数据中实现高效的随机读取。 具有很好的伸缩能力。 能够同时处理结构化和非结构化的数据。 不需要完全拥有传统关系型数据库所具备的ACID特性。ACID特性指原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。 HBase中的表具有如下特点: 大:一个表可以有上亿行,上百万列。 面向列:面向列(族)的存储和权限控制,列(族)独立检索。 稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
  • 代码样例 以下为代码片段: hbase.root.logger=INFO,console,RFA //hbase客户端日志输出配置,console:输出到控制台;RFA:输出到日志文件hbase.security.logger=DEBUG,console,RFAS //hbase客户端安全相关的日志输出配置,console:输出到控制台;RFAS:输出到日志文件hbase.log.dir=/var/log/Bigdata/hbase/client/ //日志路径,根据实际路径修改,但目录要有写入权限hbase.log.file=hbase-client.log //日志文件名hbase.log.level=INFO //日志级别,如果需要更详细的日志定位问题,需要修改为DEBUG,修改完需要重启进程才能生效hbase.log.maxbackupindex=20 //最多保存的日志文件数目# Security audit appenderhbase.security.log.file=hbase-client-audit.log //审计日志文件命令
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testScanData方法中。 public void testScanData() { LOG.info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the cache size. scan.setCaching(1000); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data successfully."); } catch (IOException e) { LOG.error("Scan data failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testScanData."); }
  • 回答 由于浏览器所在的计算机IP地址未加到Web访问白名单导致。用户可以通过修改客户端的配置文件“conf/flink-conf.yaml”来解决问题。 确认配置项“jobmanager.web.ssl.enabled”的值是否是“false”,若不是,请修改为“false”。 确认配置项“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”中是否已经添加浏览器所在的计算机IP地址。如果没有添加,可以通过这两项配置项进行添加。例如: jobmanager.web.access-control-allow-origin: 浏览器所在的计算机IP地址jobmanager.web.allow-access-address: 浏览器所在的计算机IP地址
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testDelete方法中。 public void testDelete() { LOG.info("Entering testDelete."); byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Instantiate an HTable object. table = conn.getTable(tableName); // Instantiate an Delete object. Delete delete = new Delete(rowKey); // Submit a delete request. table.delete(delete); LOG.info("Delete table successfully."); } catch (IOException e) { LOG.error("Delete table failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testDelete."); } 如果被删除的cell所在的列族上设置了二级索引,也会同步删除索引数据。
  • 接口类型简介 由于HBase本身是由java语言开发出来的,且java语言具有简洁通用易懂的特性,推荐用户使用java语言进行HBase应用程序开发。 HBase采用的接口与Apache HBase保持一致。 HBase通过接口调用,可提供的功能如表1所示。 表1 HBase接口提供的功能 功能 说明 CRUD数据读写功能 增查改删 高级特性 过滤器、二级索引,协处理器 管理功能 表管理、集群管理
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据。 数据包含两个属性:分别是Int和String类型。 配置文件。 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如: nettyconnector.ssl.enabled: true nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 安全认证配置: Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置,具体配置请参见配置管理Flink。 SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见加密传输。 接口说明。 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySink的并发度相同,否则无法正常创建连接。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全