华为云用户手册

  • 描述 调用指定的存储过程。 存储过程由各个连接(connectors)提供,实现数据操作或者管理任务。例如,系统连接器(System Connector)就定义了存储过程可以取消一个正在运行的查询。有些数据源,例如PostgreSQL,其系统有定义自己的存储过程,这与连接器定义的存储过程不同,是无法被CALL调用的。 检查并更新metastore中分区数组,它支持3种模式: ADD:将文件系统中存在但metastore里没有的分区系统同步到metastore中。 DROP:drop元数据表中存在但文件系统中不存在的分区。 FULL:同时进行ADD和DROP操作。
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下要求: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • HoodieDeltaStreamer流式写入 Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能: 支持Kafka,DFS多种数据源接入 。 支持管理检查点、回滚和恢复,保证exactly once语义。 支持自定义转换操作。 示例: 准备配置文件kafka-source.properties #hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=age hoodie.upsert.shuffle.parallelism=100 #hive config hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition hoodie.datasource.hive_sync.partition_fields=age hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.support_timestamp=true # Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition #checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition # Kafka props # The kafka cluster we want to ingest from bootstrap.servers= xx.xx.xx.xx:xx auto.offset.reset=earliest #auto.offset.reset=latest group.id=hoodie-delta-streamer offset.rang.limit=10000 指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令: spark-submit --master yarn --jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径 --driver-memory 1g --executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal --props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。 --target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径 --table-type MERGE_ON_READ // 指定要写入的hudi表类型 --target-table hudimor_deltastreamer_partition // 指定hudi表名 --source-ordering-field name // 指定hudi表预合并列 --source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源为JsonKafkaSource, 该参数根据不同数据源指定不同的source类 --schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema --transformer-class com.huawei.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制 --enable-hive-sync // 开启hive同步,同步hudi表到hive --continuous // 指定流处理模式为连续模式
  • Grouping 当group by语句带with rollup/cube选项时,Grouping才有意义。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 ROLLUP生成的结果集显示了所选列中值的某一层次结构的聚合。 Grouping:当用CUBE或ROLLUP运算符添加行时,附加的列输出值为1;当所添加的行不是由CUBE或ROLLUP产生时,附加列值为0。 例如,Hive中有一张表“table_test”,表结构如下所示: +----------------+-------------------+--+ | table_test.id | table_test.value | +----------------+-------------------+--+ | 1 | 10 | | 1 | 15 | | 2 | 20 | | 2 | 5 | | 2 | 13 | +----------------+-------------------+--+ 执行如下语句: select id,grouping(id),sum(value) from table_test group by id with rollup; 得到如下结果: +-------+-----------------+------+--+ | id | groupingresult | sum | +-------+-----------------+------+--+ | 1 | 0 | 25 | | NULL | 1 | 63 | | 2 | 0 | 38 | +-------+-----------------+------+--+
  • EXCEPT、INTERSECT EXCEPT返回两个结果集的差(即从左查询中返回右查询没有找到的所有非重复值)。 INTERSECT返回两个结果集的交集(即两个查询都返回的所有非重复值)。 例如,Hive中有两张表“test_table1”、“test_table2”。 “test_table1”表结构如下所示: +-----------------+--+ | test_table1.id | +-----------------+--+ | 1 | | 2 | | 3 | | 4 | +-----------------+--+ “test_table2”表结构如下所示: +-----------------+--+ | test_table2.id | +-----------------+--+ | 2 | | 3 | | 4 | | 5 | +-----------------+--+ 执行如下的EXCEPT语句: select id from test_table1 except select id from test_table2; 显示如下结果: +--------------+--+ | _alias_0.id | +--------------+--+ | 1 | +--------------+--+ 执行INTERSECT语句: select id from test_table1 intersect select id from test_table2; 显示如下结果: +--------------+--+ | _alias_0.id | +--------------+--+ | 2 | | 3 | | 4 | +--------------+--+
  • 操作场景 Hive支持对表的某一列或者多列进行加密;在创建Hive表时,可以指定要加密的列和加密算法。当使用insert语句向表中插入数据时,即可实现将对应列的 数据加密 。只支持对存储在HDFS上的TextFile和SequenceFile文件格式的Hive表进行列加密,不支持视图以及Hive over HBase场景。 Hive列加密机制目前支持的加密算法有两种,在建表时指定: AES:对应加密类名称为“org.apache.hadoop.hive.serde2.AESRewriter”。 SM4:也称为 SMS 4,对应加密类名称为“org.apache.hadoop.hive.serde2.SMS4Rewriter”。 将原始数据从普通Hive表导入到Hive列加密表后,在不影响其他业务情况下,建议删除普通Hive表上原始数据,因为保留一张未加密的表存在安全风险。
  • spark-defaults.conf相关参数 登录客户端节点,在“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”文件中配置表4相关参数。 表4 spark-defaults.conf中的Spark配置参考 参数 默认值 描述 spark.driver.memory 4G 指定用于driver端进程的内存,其中SparkContext已初始化。 说明: 在客户端模式下,不要使用SparkConf在应用程序中设置该参数,因为驱动程序JVM已经启动。要配置该参数,请在--driver-memory命令行选项或默认属性文件中进行配置。 spark.executor.memory 4GB 指定每个执行程序进程使用的内存。 spark.sql.crossJoin.enabled true 如果查询包含交叉连接,请启用此属性,以便不会发生错误,此时使用交叉连接而不是连接,可实现更好的性能。
  • Web安全 Flink Web安全加固,支持白名单过滤,Flink Web只能通过YARN代理访问,支持安全头域增强。在Flink集群中,各部件的监测端口支持范围可配置。 编码规范: 说明:Web Service客户端和服务器间使用相同的编码方式,是为了防止出现乱码现象,也是实施输入校验的基础。 安全加固:web server响应消息统一采用UTF-8字符编码。 支持IP白名单过滤: 说明:防止非法用户登录,需在web server侧添加IP Filter过滤源IP非法的请求。 安全加固:支持IP Filter实现Web白名单配置,配置项是“jobmanager.web.allow-access-address”,默认情况下只支持YARN用户接入。 安装客户端之后需要将客户端节点IP追加到jobmanager.web.allow-access-address配置项中。 禁止将文件绝对路径发送到客户端: 说明:文件绝对路径发送到客户端会暴露服务端的目录结构信息,有助于攻击者遍历了解系统,为攻击者攻击提供帮助。 安全加固:Flink配置文件中所有配置项中如果包含以/开头的,则删掉第一级目录。 同源策略: 说明:如果两个URL的协议,主机和端口均相同,则它们同源;如果不同源,默认不能相互访问;除非被访问者在其服务端显示指定访问者的来源。 安全加固:响应头“Access-Control-Allow-Origin”头域默认配置为YARN集群ResourceManager的IP地址,如果源不是来自YARN的,则不能互相访问。 防范敏感信息泄露: 说明:带有敏感数据的Web页面都应该禁止缓存,以防止敏感信息泄漏或通过代理服务器上网的用户数据互窜现象。 安全加固:添加“Cache-control”、“Pragma”、“Expires”安全头域,默认值为:“Cache-Control:no-store”,“Pragma :no-cache”,“Expires : 0”。实现了安全加固,Flink和web server交互的内容将不会被缓存。 防止劫持: 说明:由于点击劫持(ClickJacking)和框架盗链都利用到框架技术,所以需要采用安全措施。 安全加固:添加“X-Frame-Options”安全头域,给浏览器提供允许一个页面可否在“iframe”、“frame”或“object”网站中的展现页面的指示,如果默认配置为“X-Frame-Options: DENY”,则确保任何页面都不能被嵌入到别的“iframe”、“frame”或“object”网站中,从而避免了点击劫持 (clickjacking) 的攻击。 对Web Service接口调用记录日志: 说明:对“Flink webmonitor restful”接口调用进行日志记录。 安全加固:“access log”支持配置:“jobmanager.web.accesslog.enable”,默认为“true”。且日志保存在单独的“webaccess.log”文件中。 跨站请求( CS RF)伪造防范: 说明:在B/S应用中,对于涉及服务器端数据改动(如增加、修改、删除)的操作必须进行跨站请求伪造的防范。跨站请求伪造是一种挟制终端用户在当前已登录的Web应用程序上执行非本意的操作的攻击方法。 安全加固:现有请求修改的接口有2个post,1个delete,其余均是get请求,非get请求的接口均已删除。 异常处理: 说明:应用程序出现异常时,捕获异常,过滤返回给客户端的信息,并在日志中记录详细的错误信息。 安全加固:默认的错误提示页面,进行信息过滤,并在日志中记录详细的错误信息。新加四个配置项,默认配置为 FusionInsight 提供的跳转URL,错误提示页面跳转到固定配置的URL中,防止暴露不必要的信息。 表1 四个配置项参数介绍 参数 描述 是否必选配置 jobmanager.web.403-redirect-url web403页面,访问如果遇到403错误,则会重定向到配置的页面。 是 jobmanager.web.404-redirect-url web404页面,访问如果遇到404错误,则会重定向到配置的页面。 是 jobmanager.web.415-redirect-url web415页面,访问如果遇到415错误,则会重定向到配置的页面。 是 jobmanager.web.500-redirect-url web500页面,访问如果遇到500错误,则会重定向到配置的页面。 是 HTML5安全: 说明:HTML5是下一代的Web开发规范,为开发者提供了许多新的功能并扩展了标签。这些新的标签及功能增加了攻击面,存在被攻击的风险(例如跨域资源共享、客户端存储、WebWorker、WebRTC、WebSocket等)。 安全加固:添加“Access-Control-Allow-Origin”配置,如运用到跨域资源共享功能,可对HTTP响应头的“Access-Control-Allow-Origin”属性进行控制。 Flink不涉及如客户端存储、WebWorker、WebRTC、WebSocket等安全风险。
  • Flink认证和加密 Flink集群中,各部件支持认证。 Flink集群内部各部件和外部部件之间,支持和外部部件如YARN、HDFS、ZooKeeper进行kerberos认证。 Flink集群内部各部件之间,如Flink client和JobManager、JobManager和TaskManager、TaskManager和TaskManager之间支持security cookie认证。 Flink集群中,各部件支持SSL加密传输;集群内部各部件之间,如Flink client和JobManager、JobManager和TaskManager、TaskManager和TaskManager之间支持SSL加密传输。
  • ACL控制 在HA模式下,支持ACL控制。 Flink在HA模式下,支持用ZooKeeper来管理集群和发现服务。ZooKeeper支持SASL ACL控制,即只有通过SASL(kerberos)认证的用户,才有往ZK上操作文件的权限。如果要在Flink上使用SASL ACL控制,需要在Flink配置文件中设置如下配置: high-availability.zookeeper.client.acl: creator zookeeper.sasl.disable: false 具体配置项介绍请参考HA。
  • UDTF(User Defined Timeseries Generating Function) 编写一个UDTF需要继承“org.apache.iotdb.db.query.udf.api.UDTF”类,并至少实现“beforeStart”方法和一种“transform”方法。 表2是所有可供用户实现的接口说明。 表2 接口说明 接口定义 描述 是否必须 void validate(UDFParameterValidator validator) throws Exception 在初始化方法“beforeStart”调用前执行,用于检测“UDFParameters”中用户输入的参数是否合法。 否 void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception 初始化方法,在UDTF处理输入数据前,调用用户自定义的初始化行为。用户每执行一次UDTF查询,框架就会构造一个新的UDF类实例,该方法在每个UDF类实例被初始化时调用一次。在每一个UDF类实例的生命周期内,该方法只会被调用一次。 是 void transform(Row row, PointCollector collector) throws Exception 该方法由框架调用。当在“beforeStart”中选择以“RowByRowAccessStrategy”的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以“Row”的形式传入,输出结果通过“PointCollector”输出。需要在该方法内自行调用“collector”提供的数据收集方法,以决定最终的输出数据。 与“transform(RowWindow rowWindow, PointCollector collector) ”方法二选一 void transform(RowWindow rowWindow, PointCollector collector) throws Exception 该方法由框架调用。当在“beforeStart”中选择以“SlidingSizeWindowAccessStrategy”或者“SlidingTimeWindowAccessStrategy”的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以“RowWindow”的形式传入,输出结果通过“PointCollector”输出。需要在该方法内自行调用“collector”提供的数据收集方法,以决定最终的输出数据。 与“transform(Row row, PointCollector collector)”方法二选一 void terminate(PointCollector collector) throws Exception 该方法由框架调用。该方法会在所有的“transform”调用执行完成后,在“beforeDestroy”方法执行前被调用。在一个UDF查询过程中,该方法会且只会调用一次。需要在该方法内自行调用“collector”提供的数据收集方法,以决定最终的输出数据。 否 void beforeDestroy() UDTF的结束方法。此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后被调用。 否 调用顺序: void validate(UDFParameterValidator validator) throws Exception void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception void transform(Row row, PointCollector collector) throws Exception或者void transform(RowWindow rowWindow, PointCollector collector) throws Exception void terminate(PointCollector collector) throws Exception void beforeDestroy() 框架每执行一次UDTF查询,都会构造一个全新的UDF类实例,查询结束时,对应的UDF类实例即被销毁,因此不同UDTF查询(即使是在同一个SQL语句中)UDF类实例内部的数据都是隔离的。可以在UDTF中维护一些状态数据,无需考虑并发对UDF类实例内部状态数据的影响。 使用方法: void validate(UDFParameterValidator validator) throws Exception “validate”方法能够对用户输入的参数进行验证。 在该方法中限制输入序列的数量和类型,检查用户输入的属性或者进行自定义逻辑的验证。 void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception “beforeStart”方法有以下作用: 帮助用户解析SQL语句中的UDF参数。 配置UDF运行时必要的信息,即指定UDF访问原始数据时采取的策略和输出结果序列的类型。 创建资源,比如建立外部链接,打开文件等。
  • UDFParameters UDFParameters的作用是解析SQL语句中的UDF参数(SQL中UDF函数名称后括号中的部分)。参数包括路径(及其序列类型)参数和字符串“key-value”对形式输入的属性参数。 例如: SELECT UDF(s1, s2, 'key1'='iotdb', 'key2'='123.45') FROM root.sg.d; 用法: void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // parameters for (PartialPath path : parameters.getPaths()) { TSDataType dataType = parameters.getDataType(path); // do something } String stringValue = parameters.getString("key1"); // iotdb Float floatValue = parameters.getFloat("key2"); // 123.45 Double doubleValue = parameters.getDouble("key3"); // null int intValue = parameters.getIntOrDefault("key4", 678); // 678 // do something // configurations // ... }
  • 前提条件 需确保FlinkServer所在集群和 GaussDB (DWS)所在集群网络互通,确保“可用区”、“虚拟私有云”、“安全组”配置相同。 FlinkServer所在集群(安全模式): 集群中已安装HDFS、Yarn、Kafka、ZooKeeper和Flink服务。 包含Kafka服务的客户端已安装,安装路径如:/opt/client。 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser。 待对接的GaussDB(DWS)所在集群(安全模式): 可参考如下命令连接数据库并创建接受数据的表: gsql -d postgres -h IP -U username –p port -W password –r postgres:需要连接的数据库名称。 IP:GaussDB(DWS) 集群地址。如果通过公网地址连接,请指定为集群“公网访问 域名 ”,如果通过内网地址连接,请指定为集群“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。 username和password:连接数据库的用户名及密码。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。 port :Coordinator的端口号,请根据实际情况替换,可使用gs_om -t status --detail查询Coordinator数据路径,在该路径下的“postgresql.conf”文件中查看端口号信息。 创建用于接受数据的空表,如表“customer_t1”: CREATE TABLE customer_t1 ( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) with (orientation = column,compression=middle) distribute by hash (c_customer_name);
  • 操作场景 FlinkServer支持对接8.1.x及之后版本的GaussDB(DWS)数据库,本章节介绍GaussDB(DWS)作为Source表、Sink表以及维表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。FlinkSQL与GaussDB(DWS)数据类型对应关系如下表所示。 本示例以安全模式FlinkServer、Kafka为例,对接安全模式GaussDB(DWS)。 根据安全需求,FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。 本章节仅适用于 MRS 3.2.0至MRS 3.3.0版本集群,MRS 3.3.1及之后的版本请参考dws-connector-flink章节。
  • Hive SQL扩展语法说明 Hive SQL支持Hive-3.1.0版本中的所有特性,详情请参见https://cwiki.apache.org/confluence/display/hive/languagemanual。 MRS系统提供的扩展Hive语句如表1所示。 表1 扩展Hive语句 扩展语法 语法说明 语法示例 示例说明 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ...... [TBLPROPERTIES ("groupId"=" group1 ","locatorId"="locator1")] ...; 创建一个hive表,并指定表数据文件分布的locator信息。详细说明请参见使用HDFS Colocation存储Hive表。 CREATE TABLE tab1 (id INT, name STRING) row format delimited fields terminated by '\t' stored as RCFILE TBLPROPERTIES("groupId"=" group1 ","locatorId"="locator1"); 创建表tab1,并指定tab1的表数据分布在locator1节点上。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ... [TBLPROPERTIES ('column.encode.columns'='col_name1,col_name2'| 'column.encode.indices'='col_id1,col_id2','column.encode.classname'='encode_classname')]...; 创建一个hive表,并指定表的加密列和加密算法。详细说明请参见配置Hive列加密功能。 create table encode_test(id INT, name STRING, phone STRING, address STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('column.encode.indices'='2,3', 'column.encode.classname'='org.apache.hadoop.hive.serde2.SMS4Rewriter') STORED AS TEXTFILE; 创建表encode_test,并指定插入数据时对第2、3列加密,加密算法类为org.apache.hadoop.hive.serde2.SMS4Rewriter。 REMOVE TABLE hbase_tablename [WHERE where_condition]; 删除hive on hbase表中符合条件的数据。详细说明请参见删除Hive on HBase表中的单行记录。 remove table hbase_table1 where id = 1; 删除表中符合条件“id =1”的数据。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] STORED AS inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建hive表,并设定表可以指定自定义行分隔符。详细说明请参见自定义Hive表行分隔符。 create table blu(time string, num string, msg string) row format delimited fields terminated by ',' stored as inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建表blu,指定inputformat为SpecifiedDelimiterInputFormat,以便查询时可以指定表的查询行分隔符。 父主题: Hive常见SQL语法说明
  • 参数描述 参数 描述 是否必填 table String类型,表名或者库名.表名 否,table和path二选一 path String类型,表的绝对路径 否,table和path二选一 enabled String类型,是否开启TTL 否,false或者true,默认是false strategy String类型,TTL的触发模式 否,NUM_COMMITS或者TIME_ELAPSED,默认是NUM_COMMITS value String类型,TTL的触发间隔时间 否,默认是10 resolveConflictsBy String类型,TTL的冲突解决策略 否,MAX_TTL或者MIN_TTL,默认是MAX_TTL runInline String类型,TTL是否在写操作完成时触发 否,true或者false,默认是false
  • 示例 alter table h0 add columns(ext0 string); alter table h0 add columns(new_col int not null comment 'add new column' after col1); alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);
  • Ranger用户类型 Ranger中的用户可分为Admin、User、Auditor等类型,不同用户具有的Ranger管理界面查看和操作权限不同。 Admin:Ranger安全管理员,可查看所有页面内容,进行服务权限管理插件及权限访问控制策略的管理操作,可查看审计信息内容,可进行用户类型设置。 Auditor:Ranger审计管理员,可查看服务权限管理插件及权限访问控制策略的内容。 User:普通用户,可以被Ranger管理员赋予具体权限。
  • 基于HBase本地二级索引查询数据 在具有索引的用户表中,可以使用Filter来查询数据。对于创建单索引和组合索引的用户表,使用过滤器查询的结果与没有使用索引的表相同,但数据查询性能高于没有使用索引的表。 索引的使用规则如下: 对于为一个或多个列创建单个索引的情况: 当将此列用于AND或OR查询筛选时,使用索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2)。 当在查询中使用“索引列”和“非索引列”进行过滤时,此索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1)。 当在查询中使用“索引列”或“非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)。 对于为多个列创建组合索引的情况: 当用于查询的列是组合索引的全部或部分列并且与组合索引具有相同的顺序时,使用索引会提高查询性能。 例如,为C1,C2和C3创建组合索引。 该索引在以下情况下生效: Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2) FILTER_CONDITION(IndexCol1) 该索引在下列情况下不生效: Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol3) FILTER_CONDITION(IndexCol2) FILTER_CONDITION(IndexCol3) 当在查询中使用“索引列”和“非索引列”进行过滤时,使用索引可提高查询性能。 例如: Filter_Condition(IndexCol1)AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1) 当在查询中使用“索引列”或“非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如: Filter_Condition(IndexCol1)OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2))OR(Filter_Condition(NonIndexCol1)) 当多个列用于查询时,只能为组合索引中的最后一列指定值范围,而其他列只能设置为指定值。 例如,为C1,C2和C3创建组合索引。在范围查询中,只能为C3设置数值范围,过滤条件为“C1 = XXX,C2 = XXX,C3 = 数值范围”。
  • HBase本地二级索引相关接口 使用HIndex的API都在类org.apache.hadoop.hbase.hindex.client.HIndexAdmin中,相关接口介绍如下: 表2 HBase Index相关接口 操作 接口 描述 注意事项 添加索引 addIndices() 将索引添加到没有数据的表中。调用此接口会将用户指定的索引添加到表中,但会跳过生成索引数据。因此,在此操作之后,索引不能用于scan/filter操作。该接口使用场景为用户想要在具有大量预先存在用户数据的表上批量添加索引,其具体操作为使用诸如TableIndexer工具之类的外部工具来构建索引数据。 索引一旦添加则不能修改。如果要修改,则需先删除旧的索引然后重新创建。 应注意不要在具有不同索引名称的相同列上创建两个索引,否则将会导致数据存储和查询处理的资源浪费。 索引不能添加到系统表中。 向索引列put数据时不支持append和increment操作。 如果客户端出现任何故障,除非发生DoNotRetryIOException,否则应该重试。 索引列族按以下优先级从数据表中已存在的列族选取,优先级从高到低依次为: d、#、@、$、%、#0、@0、$0、%0、#1、@1 ...上至#255、@255、$255和%255 创建索引时,系统会在表中按以上优先级顺序检查是否存在以上列族,如果不存在,则将第一个不存在的列族设为索引列族。 例如: 数据表中仅存在d列族,则索引列族默认为#。 数据表中已存在d和#列族,则默认索引列族默认为@。 数据表中已存在d、#和$列族,则索引列族默认为@。 可以通过HIndex TableIndexer工具添加索引而无需建立索引数据。 addIndicesWithData() 将索引添加到有数据的表中。此方法将用户指定的索引添加到表中,并会对已经存在的用户数据创建对应的索引数据,也可先调用该方法生成索引再在存入用户数据的同时生成索引数据。在此操作之后,这些索引立即可用于scan/filter操作。 删除索引 dropIndices() 仅删除索引。该API从表中删除用户指定的索引,但跳过相应的索引数据。在此操作之后,索引不能用于scan/filter操作。集群在major compaction期间会自动删除旧的索引数据。 此API使用场景为表中包含大量索引数据且dropIndicesWithData()不可行。另外,也可以通过TableIndexer工具删除索引以及索引数据。 在索引的状态为ACTIVE,INACTIVE和DROPPING时,允许禁用索引的操作。 对于使用dropIndices()删除索引的操作,用户必须确保在将索引添加到具有相同索引名的表之前,相应的索引数据已被删除(即major compaction已完成)。 用户删除相应的索引会删除: 一个带有索引的列族。 组合索引所有列族中的任一个列族。 索引可以通过HIndex TableIndexer工具与索引数据一起删除。 dropIndicesWithData() 删除索引数据。此API删除用户指定的索引,并删除用户表中与这些索引对应的所有索引数据。在此操作之后,删除的索引完全从表中删除,不再可用于scan/filter操作。 启用/禁用索引 disableIndices() 该API禁用所有用户指定的索引,使其不再可用于scan/filter操作。 在索引的状态为ACTIVE,INACTIVE和BUILDING时允许启用索引的操作。 在索引的状态为ACTIVE和INACTIVE时允许禁用索引操作。 在禁用索引之前,用户必须确保索引数据与用户数据一致。如果在索引处于禁用状态期间没有在表中添加新的数据,索引数据与用户数据将保持一致。 启用索引时,可以通过使用TableIndexer工具构建索引来保证数据一致性。 enableIndices() 该API启用所有用户指定的索引,使其可用于scan/filter操作。 查看已创建的索引 listIndices() 该API可用于列出给定表中的所有索引。 无
  • 回答 问题根因: 上述两个问题是由于多主实例模式或者多租户模式下,使用spark-beeline通过add jar的方式创建function,此function在各个JD BCS erver实例之间是不可见的。执行drop function时,如果该session连接的JDBCServer实例不是创建function的JDBCServer实例,则在该session中找不到该function,而且hive默认将“hive.exec.drop.ignorenonexistent”设置为“true”,即当function不存在时,删除function操作不会报错,这样就表现出了用户没有drop function的权限,执行drop时却没有报错,让用户误以为drop成功;但重新起session时又连到创建function的JDBCServer上,因此执行show function,function仍然存在。该行为是hive的社区行为。 修改方案: 在执行drop function命令之前先执行add jar命令,则该function在有权限的情况下才能drop成功,且drop成功之后不会出现show function仍然存在的现象。
  • 问题 问题一: 用户没有drop function的权限,能够drop成功。具体场景如下: 在FusionInsight Manager页面上添加user1用户,给予用户admin权限,执行下列操作: set role admin;add jar /home/smartcare-udf-0.0.1-SNAPSHOT.jar;create database db4;use db4;create function f11 as 'com.huawei.smartcare.dac.hive.udf.UDFArrayGreaterEqual';create function f12 as 'com.huawei.smartcare.dac.hive.udf.UDFArrayGreaterEqual'; 修改user1用户,取消admin权限,执行下列操作: drop functiondb4.f11; 结果显示drop成功,如图1所示。 图1 用户没有权限却drop成功结果 问题二: 用户drop function成功,show function的时候,function仍然存在。具体场景如下: 在FusionInsight Manager页面上添加user1用户,给予用户admin权限,进入spark-beeline执行下列操作: set role admin;create database db2;use db2;add jar /home/smartcare-udf-0.0.1-SNAPSHOT.jar;create function f11 as 'com.huawei.smartcare.dac.hive.udf.UDFArrayGreaterEqual';create function f12 as 'com.huawei.smartcare.dac.hive.udf.UDFArrayGreaterEqual'; 退出后再进入spark-beeline执行下列操作: set role admin;use db2;drop function db2.f11; 退出后再进入spark-beeline执行下列操作: use db2;show functions; 结果显示,被drop的function仍然存在,如图2所示。 图2 执行show functions操作后的结果
  • 70.13.5更新内容 login 和 logout 相关接口的回调中增加用户的userUuid(原来uuid字段)、account、thirdAccount,详见登录返回HWMLoginResult、退出返回HWMLogoutResult。如果是老版本升级到70.13.5版本,需要修改回调函数的定义。 新增会议来电状态变更通知,具体请见会议来电状态变更通知 新增呼叫来电通知。 新增开启共享声音功能,具体请见开启共享声音功能 本地录制默认路径变更,接入方App需要申请下载文件夹权限,具体请见工程文件配置。 父主题: 版本更新内容
  • 70.13.5更新内容 login 和 logout 相关接口的回调中增加用户的userUuid(原来uuid字段)、account、thirdAccount,详见登录返回HWMLoginResult、退出返回HWMLogoutResult。如果是老版本升级到70.13.5版本,需要修改回调函数的定义。 支持设置是否显示外部标签字段hideExternalLabel增加场景(转移主持人、聊天),详见初始化信息HWMOpenSDKConfig 新增被邀加入会议来电通知,详见被邀加入会议来电通知 父主题: 版本更新内容
  • 修改配置信息 为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置用户名和密码 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="********"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 username和password:开启SASL认证所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,请您自行设定。 #如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置用户名和密码 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="********"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 group.id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 username和password:开启SASL认证所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。
  • 生产消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##证书文件。 context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL认证时所使用的用户名和密码。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要生产消息的Topic名称。
  • 消费消息 SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumer import ssl ##连接信息 conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##证书文件。 context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 sasl_plain_username和sasl_plain_password:开启SASL认证时所使用的用户名和密码。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。 非SASL认证方式 注意,加粗内容需要替换为实例自有信息。 from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'consumer_id': 'consumer_id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer') 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap_servers:MQS连接地址和端口。 topic_name:要消费消息的Topic名称。 consumer_id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
  • 方法详情 public void produceWithDirectExchange(String routingKey, String props, String message) 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 输入参数 routingKey:消息路由键 props:消息持久化设置,非必填 message:消息内容 public void produceWithTopicExchange(String bindingKey, String routingKey, String props, String message) 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 输入参数 bindingKey:队列绑定键 routingKey:消息路由键 props:消息持久化设置,非必填 message:消息内容 produceWithFanoutExchange(String props, String message) 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 输入参数 props:消息持久化设置,非必填 message:消息内容
  • 方法列表 返回类型 方法和说明 void produceWithDirectExchange(String routingKey, String props, String message) 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 void produceWithTopicExchange(String bindingKey, String routingKey, String props, String message) 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 void produceWithFanoutExchange(String props, String message) 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
  • 使用示例 用direct交换器生产消息,把消息路由到bindingKey与routingKey完全匹配的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer); importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig); importClass(com.roma.apic.livedata.config.v1.QueueConfig); importClass(com.roma.apic.livedata.config.v1.ExchangeConfig); importClass(com.roma.apic.livedata.config.v1.ConnectionConfig); function execute(data) { var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456"); var queueConfig = new QueueConfig("directQueue", false, false, false, null); var exchangeConfig = new ExchangeConfig("directExchange", "direct", true, false, false, null); var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig); var producer = new RabbitMqProducer(config); producer.produceWithDirectExchange("direct.exchange", "PERSISTENT_TEXT_PLAIN", "direct exchange message"); return "produce successful."; } 用topic交换器生产消息,把消息路由到bindingKey与routingKey模糊匹配的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer); importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig); importClass(com.roma.apic.livedata.config.v1.QueueConfig); importClass(com.roma.apic.livedata.config.v1.ExchangeConfig); importClass(com.roma.apic.livedata.config.v1.ConnectionConfig); function execute(data) { var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456"); var queueConfig = new QueueConfig(“topicQueue”, false, false, false, null); var exchangeConfig = new ExchangeConfig("topicExchange", "topic", true, false, false, null); var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig); var producer = new RabbitMqProducer(config); producer.produceWithTopicExchange("topic.#", “topic.A”, null, “message”); return "produce successful."; } 用fanout交换器生产消息,把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 importClass(com.roma.apic.livedata.client.v1.RabbitMqProducer); importClass(com.roma.apic.livedata.config.v1.RabbitMqConfig); importClass(com.roma.apic.livedata.config.v1.QueueConfig); importClass(com.roma.apic.livedata.config.v1.ExchangeConfig); importClass(com.roma.apic.livedata.config.v1.ConnectionConfig); function execute(data) { var connectionConfig = new ConnectionConfig("10.10.10.10", 5672, "admin", "123456"); var queueConfig = new QueueConfig(“fanoutQueue”, false, false, false, null); var exchangeConfig = new ExchangeConfig(“fanoutExchange”, "fanout", true, false, null); var config = new RabbitMqConfig(connectionConfig, queueConfig, exchangeConfig); var producer = new RabbitMqProducer(config); producer.produceWithFanoutExchange( null, “message”); return "produce successful." }
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全