华为云用户手册

  • 注意事项 如果yarn-session.sh使用-z配置特定的zookeeper的namespace,则在使用flink run时必须使用-yid指出applicationID,使用-yz指出zookeeper的namespace,前后namespace保持一致。 举例: bin/yarn-session.sh -z YARN101 bin/flink run -yid application_****_**** -yz YARN101 examples/streaming/WindowJoin.jar
  • 接口类型介绍 由于HBase本身是由java语言开发出来的,且java语言具有简洁通用易懂的特性,推荐用户使用java语言进行HBase应用程序开发。 HBase采用的接口与Apache HBase保持一致。 HBase通过接口调用,可提供的功能如表1所示。 表1 HBase接口提供的功能 功能 说明 CRUD数据读写功能 增查改删 高级特性 过滤器、二级索引,协处理器 管理功能 表管理、集群管理
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testDelete方法中 public void testDelete() { LOG .info("Entering testDelete."); byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Instantiate an HTable object. table = conn.getTable(tableName); // Instantiate an Delete object. Delete delete = new Delete(rowKey); // Submit a delete request. table.delete(delete); LOG.info("Delete table successfully."); } catch (IOException e) { LOG.error("Delete table failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testDelete."); } 如果被删除的cell所在的列族上设置了二级索引,也会同步删除索引数据。
  • HBase介绍 HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase设计目标是用来解决关系型数据库在处理海量数据时的局限性。 HBase使用场景有如下几个特点: 处理海量数据(TB或PB级别以上)。 具有高吞吐量。 在海量数据中实现高效的随机读取。 具有很好的伸缩能力。 能够同时处理结构化和非结构化的数据。 不需要完全拥有传统关系型数据库所具备的ACID特性。ACID特性指原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。 HBase中的表具有如下特点: 大:一个表可以有上亿行,上百万列。 面向列:面向列(族)的存储和权限控制,列(族)独立检索。 稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
  • SpringBoot样例工程的命令行形式运行 在IDEA界面使用Maven执行install。 当输出“BUILD SUC CES S”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“flink-dws-sink-example-1.0.0-SNAPSHOT”字段的Jar包。 在Linux上进入客户端安装目录,如“/opt/client/Flink/flink/conf”作为作为运行目录,将1中生成的“target”目录下包名中含有“flink-dws-sink-example-1.0.0-SNAPSHOT”字段的Jar包放进该路径。 执行以下命令创建yarn-session。 yarn-session.sh -t ssl/ -nm "session-spring11" -d 执行以下命令启动SpringBoot服务。 执行 GaussDB (DWS)样例 flink run flink-dws-sink-example.jar
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 功能介绍 本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式: 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。 目前表名长度最长为128,字段名长度最长为128,字段注解长度最长为4000,WITH SERDEPROPERTIES 中key长度最长为256,value长度最长为4000。以上的长度均表示字节长度。
  • 扩展使用 配置Hive中间过程的 数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见创建Hive用户自定义函数。
  • 代码样例 以下为代码示例: hbase.root.logger=INFO,console,RFA //hbase客户端日志输出配置,console:输出到控制台;RFA:输出到日志文件hbase.security.logger=DEBUG,console,RFAS //hbase客户端安全相关的日志输出配置,console:输出到控制台;RFAS:输出到日志文件hbase.log.dir=/var/log/Bigdata/hbase/client/ //日志路径,根据实际路径修改,但目录要有写入权限hbase.log.file=hbase-client.log //日志文件名hbase.log.level=INFO //日志级别,如果需要更详细的日志定位问题,需要修改为DEBUG,修改完需要重启进程才能生效hbase.log.maxbackupindex=20 //最多保存的日志文件数目# Security audit appenderhbase.security.log.file=hbase-client-audit.log //审计日志文件命令
  • 注意事项 注[1]:创建联合索引 HBase支持在多个字段上创建二级索引,例如在列name和age上。 HIndexSpecification iSpecUnite = new HIndexSpecification(indexName); iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String); iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.String);
  • 使用Python提交Flink SQL作业 获取样例工程“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”和“datagen2kafka.sql”。 参考准备本地应用开发环境将准备好的Python虚拟环境打包,获取“venv.zip”文件。 zip -q -r venv.zip venv/ 以root用户登录主管理节点,将1和2获取的“venv.zip”、“pyflink-sql.py”和“datagen2kafka.sql”文件上传至客户端环境。 per-job模式:将上述文件上传到“客户端安装目录/Flink/flink”。 yarn-application模式:将上述文件上传到“客户端安装目录/Flink/flink/yarnship”。 yarn-session模式:将上述文件上传到“客户端安装目录/Flink/flink/conf/ssl”。 修改“pyflink-sql.py”中的“file_path”路径。 per-job模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/datagen2kafka.sql yarn-application模式:修改为os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" yarn-session模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/conf/ssl//datagen2kafka.sql 执行下面命令指定运行环境。 export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3 执行以下命令运行程序。 per-job模式 ./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_sql -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-sql.py 运行结果: yarn-application模式 ./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_sql -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-sql 运行结果: yarn-session模式 在启动yarnsession之前需要参考准备本地应用开发环境章节准备运行环境,使用下面命令启动yarn-session: bin/yarn-session.sh -jm 1024 -tm 4096 -t conf/ssl/ -d 使用下面命令提交任务: ./bin/flink run --detached -t yarn-session -Dyarn.application.name=py_sql -Dyarn.application.id=application_1685505909197_0285 -pyarch conf/ssl/venv.zip -pyexec conf/ssl/venv.zip/venv/bin/python3 -py conf/ssl/pyflink-sql.py 运行结果: 父主题: PyFlink样例程序
  • HBase应用开发流程 本文档主要基于Java API对HBase进行应用开发。 开发流程中各阶段的说明如图1和表1所示。 图1 HBase应用程序开发流程 表1 HBase应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HBase的基本概念,了解场景需求,设计表等。 常用概念 准备开发和运行环境 HBase的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 HBase的运行环境即HBase客户端,请根据指导完成客户端的安装和配置。 准备HBase应用开发和运行环境 准备工程 HBase提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置HBase样例工程 根据场景开发工程 提供了Java语言的样例工程,包含从建表、写入到删除表全流程的样例工程。 开发HBase应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HBase应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测HBase应用 父主题: HBase开发指南(普通模式)
  • Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); }} 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20)) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );create TABLE datagen_source( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' );INSERT INTO kafka_sinkSELECT *FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序
  • 使用Python提交Flink普通作业 获取样例工程“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”和“insertData2kafka.sql”。 参考准备本地应用开发环境将准备好的Python虚拟环境打包,获取“venv.zip”文件。 zip -q -r venv.zip venv/ 以root用户登录主管理节点,将1和2获取的“venv.zip”、“pyflink-kafka.py”和“insertData2kafka.sql”文件上传至客户端环境。 per-job模式:将上述文件上传到“客户端安装目录/Flink/flink”。 yarn-application模式:将上述文件和“flink-connector-kafka-实际版本号.jar”包上传到“客户端安装目录/Flink/flink/yarnship”。 修改“pyflink-kafka.py”中的“specific_jars”路径。 per-job模式:修改为SQL文件的实际路径。如:file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-实际版本号.jar yarn-application模式:修改为:file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-实际版本号.jar 修改“pyflink-kafka.py”中的“file_path”路径。 per-job模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/insertData2kafka.sql yarn-application模式:修改为os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" 执行以下命令指定运行环境。 export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3 执行以下命令运行程序。 per-job模式: ./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_kafka -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-kafka.py 运行结果: yarn-application模式 ./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_kafka -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-kafka 运行结果: 父主题: PyFlink样例程序
  • Flink应用程序开发流程 Flink开发流程参考如下步骤: 图1 Flink应用程序开发流程 表1 Flink应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Flink的基本概念。 Flink基本概念 准备开发和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 导入并配置Flink样例工程 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 开发Flink应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编译并调测Flink应用 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看Flink应用调测结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 组件操作指南中的“Flink性能调优”
  • 问题 Flink任务配置State Backend为RocksDB时,运行报如下错误: Caused by: java.lang.UnsatisfiedLinkError: /srv/BigData/hadoop/data1/nm/usercache/***/appcache/application_****/rocksdb-lib-****/librocksdbjni-linux64.so: /lib64/libpthread.so.0: version `GLIBC_2.12` not found (required by /srv/BigData/hadoop/***/librocksdbjni-linux64.so)at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1965) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1890) at java.lang.Runtime.load0(Runtime.java:795) at java.lang.System.load(System.java:1062) at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:734)... 11 more
  • 回答 首先查看ZooKeeper中/flink_base的目录权限是否为:'world,'anyone: cdrwa;如果不是,请修改/flink_base的目录权限为:'world,'anyone: cdrwa,然后继续根据2排查;如果是,请根据2排查。 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink_base/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink_base/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • 操作步骤 客户端机器必须安装有Python3,其版本不低于3.6,最高不能超过3.8。 在客户端机器的命令行终端输入python3可查看Python版本号。如下显示Python版本为3.8.2。 Python 3.8.2 (default, Jun 23 2020, 10:26:03)[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linuxType "help", "copyright", "credits" or "license" for more information. 客户端机器必须安装有setuptools,版本可取47.3.1。 具体软件,请到对应的官方网站获取。 https://pypi.org/project/setuptools/#files 将下载的setuptools压缩文件复制到客户端机器上,解压后进入解压目录,在客户端机器的命令行终端执行python3 setup.py install。 如下内容表示安装setuptools的47.3.1版本成功。 Finished processing dependencies for setuptools==47.3.1 若提示setuptools的47.3.1版本安装不成功,则需要检查环境是否有问题或是Python自身原因导致的。 安装Python客户端到客户端机器。 参考获取 MRS 应用开发样例工程,获取样例代码解压目录中“src\hive-examples”目录下的样例工程文件夹“python3-examples”。 进入“python3-examples”文件夹。 根据python3的版本,选择进入“dependency_python3.6”或“dependency_python3.7”或“dependency_python3.8”或“dependency_python3.9”(MRS 3.3.0及之后版本支持)文件夹。 执行whereis easy_install命令,找到easy_install程序路径。如果有多个路径,使用easy_install --version确认选择setuptools对应版本的easy_install,如/usr/local/bin/easy_install。 MRS 3.2.0之前版本,使用对应的easy_install命令,依次安装dependency_python3.x文件夹下的egg文件。如: /usr/local/bin/easy_install future-0.18.2-py3.8.egg 输出以下关键内容表示安装egg文件成功。 Finished processing dependencies for future==0.18.2 对于“dependency_python3.x”文件夹下同时存在aarch64与x86_64版本的“egg”文件,需要根据操作系统选取其中一个版本安装即可,使用uname -p命令确认当前操作系统架构。 MRS 3.2.0及之后版本,使用对应的easy_install命令,安装dependency_python3.x文件夹下的egg文件,egg文件存在依赖关系,可使用通配符安装,如: “dependency_python3.6”目录: /usr/local/bin/easy_install future*egg six*egg python*egg sasl-*linux-$(uname -p).egg thrift-*egg thrift_sasl*egg “dependency_python3.7”目录: /usr/local/bin/easy_install future*egg six*egg sasl-*linux-$(uname -p).egg thrift-*egg thrift_sasl*egg “dependency_python3.8”目录: /usr/local/bin/easy_install future*egg six*egg python*egg sasl-*linux-$(uname -p).egg thrift-*linux-$(uname -p).egg thrift_sasl*egg “dependency_python3.9”目录(MRS 3.3.0及之后版本): /usr/local/bin/easy_install future*egg six*egg sasl-*linux-$(uname -p).egg six-*.egg thrift-*linux-$(uname -p).egg thrift_sasl*egg 每个egg文件安装输出以下关键内容表示安装成功。 Finished processing dependencies for *** 安装成功后,“python3-examples/pyCLI_nosec.py”为Python客户端样例代码,“python3-examples/pyhive/hive.py”为Python客户端接口API。
  • 准备开发环境 Hive组件可以使用Java(JDBC和HCatalog)、Python、Python3接口进行应用开发,要准备的开发和运行环境分别如下表所示。 表1 JDBC/Hcatalog开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 表2 Python开发环境 准备项 说明 操作系统 开发环境和运行环境:Linux系统。 安装Python 用于开发Hive应用程序的工具,版本要求不低于2.6.6,最高不超过2.7.13。 安装setuptools Python开发环境的基本配置,版本要求5.0以上。 Python开发工具的详细安装配置可参见配置Hive Python样例工程。 表3 Python3开发环境 准备项 说明 操作系统 开发环境和运行环境:Linux系统。 安装Python3 用于开发Hive应用程序的工具,版本要求不低于3.6,最高不超过3.8。 安装setuptools Python3开发环境的基本配置,版本要求为47.3.1。 Python3开发工具的详细安装配置可参见配置Hive Python3样例工程。
  • 功能简介 使用REST服务,传入对应host与port组成的url以及指定的Namespace,通过HTTP协议,对Namespace进行创建、查询、删除,获取指定Namespace中表的操作。 HBase表以“命名空间:表名”格式进行存储,若在创建表时不指定命名空间,则默认存储在“default”中。其中,“hbase”命名空间为系统表命名空间,请不要对该系统表命名空间进行业务建表或数据读写等操作。
  • 查看访问日志 当您配置了消息传输日志,可以查看消息传输日志的详细信息。 日志显示格式如下: {"message_id":"$message_id","project_id":"$project_id","topic_urn":"$topic_urn","subscriber_urn":"$subscriber_urn","protocol_name":"$protocol_name","endpoint":"$endpoint","status":"$status","http_code":$http_code,"create_time":"$create_time","send_time":"$send_time"} 不支持修改日志格式。日志字段说明如表1所示。 表1 字段说明 参数 参数类型 描述 message_id String 消息ID。 project_id String 项目ID。 topic_urn String Topic的唯一的资源标识 subscriber_urn String 订阅者的唯一资源标识。 protocol_name String 不同协议对应不同的endpoint(接受消息的接入点)。目前支持的协议包括: “email”:邮件传输协议,endpoint为邮箱地址。 “sms”:短信传输协议,endpoint为手机号码。 “functiongraph” FunctionGraph(函数)传输协议,endpoint为一个函数 “functionstage”:。FunctionStage(工作流)传输协议,endpoint为一个 函数工作流 “http”、“https”:HTTP/HTTPS传输协议,endpoint为URL。 endpoint String 接受消息的接入点。 status String 消息状态。目前包括以下状态: “DELIVERED”:已送达。 “FAIL_DELIVERED”:发送失败。 “REJE CTS ”:已拒绝。触发流控机制。 http_code Integer HTTP返回码,仅支持HTTP/HTTPS协议消息。 create_time String 消息创建时间。时间格式为UTC时间,YYYY-MM-DDTHH:MM:SSZ。 send_time String 消息发送时间。时间格式为UTC时间,YYYY-MM-DDTHH:MM:SSZ。
  • 日志示例 {"message_id":"1ae49922602a42fc83acb9689a2eb5f4","project_id":"5a9f32e4f1ec4bbe9695ff9da51c2925","topic_urn":"urn:smn:regionid:5a9f32e4f1ec4bbe9695ff9da51c2925:demo","subscriber_urn":"urn:smn:regionid:5a9f32e4f1ec4bbe9695ff9da51c2925:demo:b55c3c6fa7cd471b9f24818d530a8740","protocol_name":"https","endpoint":"https://127.0.0.1:443/https","status":"DELIVERED","http_code":200,"create_time":"2022-11-01T00:00:00Z","send_time":"2022-11-01T00:00:10Z"} 以上日志示例对应的字段如下: 表2 日志示例对应的字段 参数 示例 message_id 1ae49922602a42fc83acb9689a2eb5f4 project_id 5a9f32e4f1ec4bbe9695ff9da51c2925 topic_urn urn:smn:regionid:5a9f32e4f1ec4bbe9695ff9da51c2925:demo subscriber_urn urn:smn:regionid:5a9f32e4f1ec4bbe9695ff9da51c2925:demo:b55c3c6fa7cd471b9f24818d530a8740 protocol_name https endpoint https://127.0.0.1:443/https status DELIVERED http_code 200 create_time 2022-11-01T00:00:00Z send_time 2022-11-01T00:00:10Z
  • 取消订阅 不同的订阅协议对应不同的取消订阅方式: 短信取消订阅:因为短信长度有限,无法每次推送消息时候附带取消订阅的链接,用户需通过请求确认的链接进入WEB页面进行取消订阅。 具体流程如下: 添加订阅后, 消息通知 服务会向订阅终端发送订阅确认短信,如图1所示,短信中包含订阅确认的链接。 图1 订阅确认短信 单击短信中的确认链接 ,进入WEB页面提示“订阅成功”。 该页面下方有提示信息:如果您不希望订阅这个主题,点击“这里”取消订阅。 图2 订阅成功 单击“这里”,进入取消订阅页面,确认待取消订阅的相关信息,包含主题名称、主题URN、订阅URN。 图3 确认取消订阅 确认无误后,单击“确定”,取消订阅成功。 图4 取消订阅成功 邮件取消订阅:消息通知服务向指定邮箱地址发送电子邮件消息,其中包含取消订阅的链接,订阅者单击进入取消订阅的页面。如果取消成功,页面会显示取消订阅成功,稍后消息通知服务会向这个订阅者发送一封确认取消订阅的邮件(48小时内有效)。如果误操作了取消订阅,可以通过该消息重新订阅。 具体流程如下: 添加订阅后,消息通知服务会向指定邮箱地址发送电子邮件消息,如图5所示,邮件中包含订阅确认的链接。 图5 订阅确认邮件 单击邮件中的“订阅确认”链接 ,进入WEB页面提示“订阅成功”。 该页面下方有提示信息:如果您不希望订阅这个主题,点击“这里”取消订阅。 图6 订阅成功 单击“这里”,进入取消订阅页面,确认待取消订阅的相关信息,包含主题名称、主题URN、订阅URN。 图7 确认取消订阅 确认无误后,单击“确定”,取消订阅成功。 图8 取消订阅成功 HTTP(S)取消订阅,消息通知服务向指定的URL发送消息。消息体中包含取消订阅的链接,订阅者需要访问该链接,如果取消订阅成功,将获取HTTPS协议的200返回码,稍后消息通知服务会向这个订阅者发送一次取消订阅的消息(48小时内有效)。如果误操作了取消订阅,可以通过访问重新订阅的链接重新订阅。HTTP(S)消息头和消息体参数含义请参见HTTP(S)终端节点使用样例简介。
  • 添加订阅 在已创建的主题下添加以下两种订阅。添加订阅详细操作步骤请参见订阅主题。 用户在主题下添加订阅终端A,并且希望在每周周日和周三的早上或下午将消息通知给A。则用户可以在创建订阅A时配置如下订阅筛选策略: { "filter_polices": [ { "name": "week", "string_equals": [ "sunday", "wednesday" ] }, { "name": "time", "string_equals": [ "morning", "afternoon" ] } ]} 用户在主题下添加订阅终端B,并且希望在每周周一、周二和周三的晚上或深夜将消息通知给B。则用户在创建订阅B时配置如下订阅筛选策略: { "filter_polices": [ { "name": "week", "string_equals": [ "monday", "tuesday", "wednesday" ] }, { "name": "time", "string_equals": [ "night", "midnight" ] } ]}
  • JSON格式体说明 JSON格式主要用来处理用户想对不同的订阅者类型发送不同的消息,目前支持的协议Default、HTTP、HTTPS、email、FunctionGraph(函数)、FunctionGraph(工作流),其中Default是必须的,Email的内容将发给邮件类型的订阅者,其它协议依次如下所示: { "default": "Dear Sir or Madam, this is a default message.", "email": "Dear Sir or Madam, this is an email message.", "http": "{'message':'Dear Sir or Madam, this is an HTTP message.'}", "https": "{'message':'Dear Sir or Madam, this is an HTTPS message.'}", "sms": "This is an SMS message.", "functionstage": "Dear Sir or Madam, this is a functiongraph(function) message.", "functiongraph": "Dear Sir or Madam, this is a functiongraph(workflow) message." } 推荐将Default设置为通用的消息内容,对于特殊的订阅者类型再设置相应消息内容。 以下示例展现由于短信内容长度有限制,将短信设置为特殊的消息,其中HTTP、HTTPS、Email、FunctionGraph(函数)、FunctionGraph(工作流)类型的订阅收到"Dear Sir or Madam, this is a default message."的内容,而SMS类型的订阅者收到“This is an SMS message”。 { "sms": "This is an SMS message.", "default": "Dear Sir or Madam, this is a default message." }
  • JSON消息计算 选择不同的协议,生成的JSON格式消息的大小不同。JSON格式消息的字节数包含消息框中显示的花括号、双引号、空格、回车、协议名称和消息内容累计字节数。具体字节计算方式以输入消息“This is a default message.”为例。 消息:“This is a default message.”,消息本身包含26个字节。 输入消息,选择协议,生成JSON消息时,系统默认生成Default协议的消息。 { "default": "This is a default message.", "protocol1": "This is a default message.", "protocol2": "This is a default message.", ……} 1个Default协议加上用户选择的协议个数,记为N。 消息体包含的固定字节数: 每种协议的消息包含3个空格,共N个协议:3× N = 3N 每种协议的消息包含4个双引号,共N个协议: 4 × N = 4N 每种协议的消息包含1个冒号,共N个协议:1 × N = N 每种协议的消息包含1条消息内容This is a default message.,共N个协议:26 × N = 26N 逗号(N - 1)个:1 × (N - 1) = (N - 1) 回车(N + 1)个:1 ×(N + 1) = (N + 1) 花括号2个:1 × 2 = 2 协议名称 default 1个:7 × 1 = 7 选择的协议字节数: 协议名称 HTTP 1个:4 × 1 = 4 协议名称 HTTPS1个:5 × 1 = 5 协议名称 email 1个:5 × 1 = 5 协议名称 sms 1个:3 × 1 = 3 协议名称 functionstage 1个:13 × 1 = 13 协议名称 functiongraph 1个:13 × 1 = 13 合计:36N + 9 + 选择的协议字节数 例如,您选择了“HTTP”、“HTTPS”和“email”3个协议,生成的消息如下: { "default": "This is a default message.", "email": "This is a default message.", "http": "This is a default message.", "https": "This is a default message."} 加上Default,则N的值为4,生成的JSON消息字节数计算如下: 固定字节数:36 × 4 + 9 = 153 协议名称 HTTP 1个:4 × 1 = 4 协议名称 HTTPS 1个:5 × 1 = 5 协议名称 email 1个:5 × 1 = 5 合计:153 + 4 + 5 + 5 = 167
  • 发布消息 向已创建的主题发布消息,根据添加的不同订阅,会出现以下场景。 发布消息详细操作步骤请参见发布主题消息简介。 场景一: 消息属性字段为: { "name": "week", "type": "STRING", "value": [ "sunday" ]} 发送结果:在消息中只规定了消息属性字段中的week字段,而没有规定time字段,因此本条消息只会发送给订阅A。 场景二: 消息属性字段为: [ { "name": "week", "type": "STRING", "value": [ "sunday" ] }, { "name": "time", "type": "STRING", "value": [ "night" ] }] 发送结果:此条消息含义为发送给周日晚上接收消息的订阅者,订阅A和订阅B均不符合条件。因此本条消息不会发送给订阅A和订阅B。 场景三: 消息属性字段为: [ { "name": "week", "type": "STRING", "value": [ "monday" ] }, { "name": "time", "type": "STRING", "value": [ "night" ] }] 发送结果:此条消息含义为发送给周一晚上接收消息的订阅者,订阅A不符合条件。因此本条消息只发送给订阅B。 场景四: 消息属性字段为: [ { "name": "week", "type": "STRING", "value": [ "sunday" ] }, { "name": "time", "type": "STRING", "value": [ "morning" ] }] 发送结果:此条消息含义为发送给周日早上接收消息的订阅者,订阅A符合条件,订阅B不符合条件。因此本条消息不会发送给订阅B。 场景五: 消息属性字段为: [ { "name": "week", "type": "STRING_ARRAY", "value": [ "sunday", "monday" ] }, { "name": "time", "type": "STRING_ARRAY", "value": [ "morning", "night" ] }] 发送结果:此条消息含义为发送给周日或周一的早上或晚上接收消息的订阅者,订阅A和订阅B均符合条件。因此本条消息会发送给订阅A和订阅B。 场景六: 订阅筛选策略字段没有配置。 发送结果:带有任何消息属性的消息不会发送给未配置订阅筛选策略的订阅。
  • 不同协议消息说明 不同的订阅协议接收到的消息包含的内容可能有所不同。 邮件和HTTP(S)终端收到的消息包含:消息主题、消息内容和取消订阅的链接。 短信终端接收到的消息只包含消息内容。 FunctionGraph(函数)终端收到的消息包含消息属性、消息标题、消息内容、主题URN等信息。消息样例类似如下。字段说明如表1所示。 { "record": [{ "event_version": "1.0", "smn": { "message_attributes": "", "subject": "Welcome", "message_id": "e6fa59c6b3e0424c9c02cbed35b680e7", "topic_urn": "urn:smn:regionId:66e0f4622d6f4e3fb2db2e495298a61a:smn_123", "type": "notification", "message": "Hello", "timestamp": "2017-08-17T10:07:14Z" }, "event_source": "smn", "event_subscription_urn": "urn:cff:regionId:66e0f4622d6f4e3fb2db2e495298a61a:function:DEFAULT:mytest:latest" }]} 表1 字段说明 字段配置项 类型 说明 record JSON对象 消息列表 event_version String 版本 message_attributes String 消息属性 subject String 消息标题 message_id String 消息ID topic_urn String 主题URN type String 消息类型 message String 消息内容 timestamp String 时间戳 event_source String 消息来源 event_subscription_urn String 订阅URN FunctionGraph(工作流)终端收到的消息包含消息标题、消息内容、主题URN等信息。消息样例类似如下。字段说明如表2所示。 { "records":[{ "event_version":"1.0", "smn":{ "topic_urn":"urn:smn:regionId:995960e6a6094fdeb00b7c9991d35791:11-16", "type":"notification", "message":"Hello", "timestamp":"2017-11-27T09:53:41Z", "subject":"test_1127", "message_id":"27d6e9e1249240288b47817bf637192b" }, "event_source":" SMN ", "event_subscription_urn":"urn:smn:regionId:995960e6a6094fdeb00b7c9991d35791:11-16:dcd78adcb194431587d4992c374f8465"}]} 表2 字段说明 字段配置项 类型 说明 records JSON对象 消息列表 event_version String 版本 topic_urn String 主题URN type String 消息类型 message String 消息内容 timestamp String 时间戳 subject String 消息标题 message_id String 消息ID event_source String 消息来源 event_subscription_urn String 订阅URN 父主题: 附录
  • 订阅确认流控说明 为了控制用户对订阅者进行恶意的攻击,云平台对添加订阅时发送订阅确认消息进行了流控。流控策略既适用于从控制台发送订阅确认消息也适用于通过调用API发送订阅确认消息。 根据不同的协议有不同的流控策略: 对于一个邮件订阅者,1个小时最多允许发送20次订阅确认消息,2天最多允许发送40次订阅确认消息。如果超过1小时的流控阈值,则SMN会在接下来的1小时内不对该终端发送订阅确认邮件。如果超过2天的流控阈值,将7天之内不对这个邮件订阅者发送订阅确认邮件。如果这个邮件订阅者对这个用户的确认信息进行了确认,之前的流控状态将清空置零。 对于一个短信订阅者,1个小时最多允许发送10次订阅确认消息,2天最多允许发送20次订阅确认,如果超过1小时的流控阈值,则SMN会在接下来的1小时内不对该终端发送订阅确认短信。如果超过2天的流控阈值,将7天之内不对这个手机订阅者发送订阅确认短信。如果这个手机订阅者对这个用户的确认信息进行了确认,之前的流控状态将清空置零。 对于一个HTTP(S)订阅者,10分钟最多允许发送200次订阅确认消息。 以上参数仅供参考,SMN服务会根据实际业务情况进行调整。 父主题: 附录
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全