华为云用户手册

  • 工具使用 下载安装客户端,例如安装目录为“/opt/client”。进入 目录“/opt/client/Spark2x/spark/bin”, 执行start-prequery.sh。 参考表1,配置prequeryParams.properties。 表1 参数列表 参数 说明 示例 spark.prequery.period.max.minute 预热的最大时长,单位分钟 60 spark.prequery.tables 表名配置database.table:int,表名支持通配符*,int代表预热多长时间内有更新的表,单位为天。 default.test*:10 spark.prequery.maxThreads 预热时并发的最大线程数 50 spark.prequery.sslEnable 集群安全模式为true,非安全模式为false true spark.prequery.driver JD BCS erver的地址ip:port,如需要预热多个Server则需填写多个Server的IP,多个IP:port用逗号隔开。 192.168.0.2:22550 spark.prequery.sql 预热的sql语句,不同语句冒号隔开 SELECT COUNT(*) FROM %s;SELECT * FROM %s LIMIT 1 spark.security.url 安全模式下jdbc所需url ;saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.hadoop.com@HADOOP.COM; spark.prequery.sql 配置的语句在每个所预热的表中都会执行,表名用%s代替。 脚本使用 命令形式:sh start-prequery.sh 执行此条命令需要:将user.keytab或jaas.conf(二选一),krb5.conf(必须)放入conf目录中。 此工具暂时只支持Carbon表。 此工具会初始化Carbon环境和预读取表的元数据到JDB CS erver,所以更适合在多主实例、静态分配模式下使用。
  • Flink WebUI特点 Flink WebUI主要有以下特点: 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。 “数据连接类型”为“Kafka”时,认证类型不支持“KERBEROS”。 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。 图形化作业管理:简单易用。
  • Flink WebUI关键能力 Flink WebUI关键能力如表1: 表1 Flink WebUI关键能力 关键能力分类 描述 批流一体 支持一套FlinkSQL定义批作业和流作业。 Flink SQL内核能力 Flink SQL支持自定义大小窗、24小时以内流计算、超出24小时批处理。 FlinkSQL支持Kafka、HDFS读取;支持写入Kafka和HDFS。 支持同一个作业定义多个FlinkSQL,多个指标合并在一个作业计算。当一个作业是相同主键、相同的输入和输出时,该作业支持多个窗口的计算。 支持AVG、SUM、COUNT、MAX和MIN统计方法。 Flink SQL可视化定义 集群连接管理,配置Kafka、HDFS等服务所属的集群信息。 数据连接管理,配置Kafka、HDFS等服务信息。 数据表管理,定义Sql访问的数据表信息,用于生成DDL语句。 FlinkSQL作业定义,根据用户输入的Sql,校验、解析、优化、转换成Flink作业并提交运行。 Flink作业可视化管理 支持可视化定义流作业和批作业。 支持作业资源、故障恢复策略、Checkpoint策略可视化配置。 流作业和批作业的状态监控。 Flink作业运维能力增强,包括原生监控页面跳转。 性能&可靠性 流处理支持24小时窗口聚合计算,毫秒级性能。 批处理支持90天窗口聚合计算,分钟级计算完成。 支持对流处理和批处理的数据进行过滤配置,过滤无效数据。 读取HDFS数据时,提前根据计算周期过滤。 作业定义平台故障、服务降级,不支持再定义作业,但是不影响已有作业计算。 作业故障有自动重启机制,重启策略可配置。
  • 操作场景 该操作指导用户对ZooKeeper的znode设置权限。 ZooKeeper通过访问控制列表(ACL)来对znode进行访问控制。ZooKeeper客户端为znode指定ACL,ZooKeeper服务器根据ACL列表判定某个请求znode的客户端是否有对应操作的权限。ACL设置涉及如下四个方面。 查看ZooKeeper中znode的ACL。 增加ZooKeeper中znode的ACL。 修改ZooKeeper中znode的ACL。 删除ZooKeeper中znode的ACL。 ZooKeeper的ACL权限说明: ZooKeeper目前支持create,delete,read,write,admin五种权限,且ZooKeeper对权限的控制是znode级别的,而且不继承,即对父znode设置权限,其子znode不继承父znode的权限。ZooKeeper中znode的默认权限为world:anyone:cdrwa,即任何用户都有所有权限。 ACL有三部分: 第一部分是认证类型,如world指所有认证类型,sasl是kerberos认证类型; 第二部分是账号,如anyone指的是任何人; 第三部分是权限,如cdrwa指的是拥有所有权限。 特别的,由于普通模式启动客户端不需要认证,sasl认证类型的ACL在普通模式下将不能使用。本文所有涉及sasl方式的鉴权操作均是在安全集群中进行。 表1 Zookeeper的五种ACL 权限说明 权限简称 权限详情 创建权限 create(c) 可以在当前znode下创建子znode 删除权限 delete(d) 删除当前的znode 读权限 read(r) 获取当前znode的数据,可以列出当前znode所有的子znodes 写权限 write(w) 向当前znode写数据,写入子znode 管理权限 admin(a) 设置当前znode的权限
  • 回答 Linux的netcat命令没有与Zookeeper服务器安全通信的选项,所以当启用安全的netty配置时,它不能支持Zookeeper四个字母的命令。 为了避免这个问题,用户可以使用下面的Java API来执行四个字母的命令。 org.apache.zookeeper.client.FourLetterWordMain 例如: String[] args = new String[]{host, port, "stat"};org.apache.zookeeper.client.FourLetterWordMain.main(args); netcat命令只能用于非安全的netty配置。
  • 回答 Kafka重启成功后应用会按照batch时间把2017/05/11 10:57:00~2017/05/11 10:58:00缺失的RDD补上(如图2所示),尽管UI界面上显示读取的数据个数为“0”,但实际上这部分数据在补的RDD中进行了处理,因此,不存在数据丢失。 Kafka重启时间段的数据处理机制如下。 Spark Streaming应用使用了state函数(例如:updateStateByKey),在Kafka重启成功后,Spark Streaming应用生成2017/05/11 10:58:00 batch任务时,会按照batch时间把2017/05/11 10:57:00~2017/05/11 10:58:00缺失的RDD补上(Kafka重启前Kafka上未读取完的数据,属于2017/05/11 10:57:00之前的batch),如图2所示。 图2 重启时间段缺失数据处理机制
  • 回答 HDFS_DELEGATION_TOKEN到期的异常是由于token没有更新或者超出了最大生命周期。 在token的最大生命周期内确保下面的参数值大于作业的运行时间。 “dfs.namenode.delegation.token.max-lifetime”=“604800000”(默认是一星期) 参考修改集群服务配置参数,进入HDFS“全部配置”页面,在搜索框搜索该参数。 建议在token的最大生命周期内参数值为多倍小时数。
  • 回答 JobHistory服务更新页面上的app时,会根据HDFS上的part文件大小变更与否判断是否刷新首页面的app显示信息。若文件为第一次查看,则将当前文件大小与0作比较,如果大于0则读取该文件。 分组的情况下,如果执行的app没有job处于执行状态,则part文件为空,即JobHistory服务不会读取该文件,此app也不会显示在JobHistory页面上。但若part文件大小之后有更新,JobHistory又会显示该app。
  • 回答 NodeManager有重启恢复机制,详情请参见: https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/NodeManager.html#NodeManager_Restart 可以参考修改集群服务配置参数,进入Yarn“全部配置”页面。需将NodeManager的“yarn.nodemanager.recovery.enabled”配置项为“true”后才生效,默认为“true”,这样在YARN重启的异常场景时会定时删除多余的本地日志,避免问题的出现。
  • 问题 在Spark Streaming应用执行过程中重启Kafka时,应用无法从Kafka获取topic offset,从而导致生成Job失败。如图1所示,其中2017/05/11 10:57:00~2017/05/11 10:58:00为Kafka重启时间段。2017/05/11 10:58:00重启成功后对应的“Input Size”的值显示为“0 records”。 图1 Web UI界面部分batch time对应Input Size为0 records
  • 回答 这是RM的使用限制,应用程序运行过程中移动到别的队列,此时RM重启,RM并不会在状态存储中存储新队列的信息。 假设用户提交一个MR任务到叶子队列test11上。当任务运行时,删除叶子队列test11,这时提交队列自动变为lost_and_found队列(找不到队列的任务会被放入lost_and_found队列中),任务暂停运行。要启动该任务,用户将任务移动到叶子队列test21上。在将任务移动到叶子队列test21后,任务继续运行,此时RM重启,重启后显示提交队列为lost_and_found队列,而不是test21队列。 发生上述情况的原因是,任务未完成时,RM状态存储中存储的还是应用程序移动前的队列状态。唯一的解决办法就是等RM重启后,再次移动应用程序,将新的队列状态信息写入状态存储中。
  • 回答 JDBCServer方式使用了ShuffleService功能,Reduce阶段所有的Executor会从NodeManager中获取数据,当数据量达到一个级别(10T级别),会出现NodeManager单点瓶颈(ShuffleService服务在NodeManager进程中),就会出现某些Task获取数据超时,从而出现该问题。 因此,当数据量达到10T级别以上的Spark任务,建议用户关闭ShuffleService功能,即在“Spark-defaults.conf”配置文件中将配置项“spark.shuffle.service.enabled”配置为“false”。
  • 问题 Spark执行应用时上报如下类似错误并导致应用结束。 2016-04-20 10:42:00,557 | ERROR | [shuffle-server-2] | Connection to 10-91-8-208/10.18.0.115:57959 has been quiet for 180000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. | org.apache.spark.network.server.TransportChannelHandler.userEventTriggered(TransportChannelHandler.java:128)2016-04-20 10:42:00,558 | ERROR | [shuffle-server-2] | Still have 1 requests outstanding when connection from 10-91-8-208/10.18.0.115:57959 is closed | org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:102)2016-04-20 10:42:00,562 | WARN | [yarn-scheduler-ask-am-thread-pool-160] | Error sending message [message = DoShuffleClean(application_1459995017785_0108,319)] in 1 attempts | org.apache.spark.Logging$class.logWarning(Logging.scala:92)java.io.IOException: Connection from 10-91-8-208/10.18.0.115:57959 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)2016-04-20 10:42:00,573 | INFO | [dispatcher-event-loop-14] | Starting task 177.0 in stage 1492.0 (TID 1996351, linux-254, PRO CES S_LOCAL, 2106 bytes) | org.apache.spark.Logging$class.logInfo(Logging.scala:59)2016-04-20 10:42:00,574 | INFO | [task-result-getter-0] | Finished task 85.0 in stage 1492.0 (TID 1996259) in 191336 ms on linux-254 (106/3000) | org.apache.spark.Logging$class.logInfo(Logging.scala:59)2016-04-20 10:42:00,811 | ERROR | [Yarn application state monitor] | Yarn application has already exited with state FINISHED! | org.apache.spark.Logging$class.logError(Logging.scala:75)
  • 问题 向动态分区表中插入数据时,shuffle过程中大面积shuffle文件损坏(磁盘掉线、节点故障等)后,为什么会在重试的task中出现"Failed to CREATE_FILE"异常? 2016-06-25 15:11:31,323 | ERROR | [Executor task launch worker-0] | Exception in task 15.0 in stage 10.1 (TID 1258) | org.apache.spark.Logging$class.logError(Logging.scala:96)org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /user/hive/warehouse/testdb.db/web_sales/.hive-staging_hive_2016-06-25_15-09-16_999_8137121701603617850-1/-ext-10000/_temporary/0/_temporary/attempt_201606251509_0010_m_000015_0/ws_sold_date=1999-12-17/part-00015 for DFSClient_attempt_201606251509_0010_m_000015_0_353134803_151 on 10.1.1.5 because this file lease is currently owned by DFSClient_attempt_201606251509_0010_m_000015_0_-848353830_156 on 10.1.1.6
  • 回答 动态分区表插入数据的最后一步是读取shuffle文件的数据,再写入到表对应的分区文件中。 当大面积shuffle文件损坏后,会引起大批量task失败,然后进行job重试。重试前Spark会将写表分区文件的句柄关闭,大批量task关闭句柄时HDFS无法及时处理。在task进行下一次重试时,句柄在NameNode端未被及时释放,即会发生"Failed to CREATE_FILE"异常。 这种现象仅会在大面积shuffle文件损坏时发生,出现异常后task会重试,重试耗时在毫秒级,影响较小,可以忽略不计。
  • 回答 在executor核数等于1的情况下,遵循以下规则对调优Spark Streaming运行参数有所帮助。 Spark任务处理速度和Kafka上partition个数有关,当partition个数小于给定executor个数时,实际使用的executor个数和partition个数相同,其余的将会被空闲。所以应该使得executor个数小于或者等于partition个数。 当Kafka上不同partition数据有倾斜时,数据较多的partition对应的executor将成为数据处理的瓶颈,所以在执行Producer程序时,数据平均发送到每个partition可以提升处理的速度。 在partition数据均匀分布的情况下,同时提高partition和executor个数,将会提升Spark处理速度(当partition个数和executor个数保持一致时,处理速度是最快的)。 在partition数据均匀分布的情况下,尽量保持partition个数是executor个数的整数倍,这样将会使资源得到合理利用。
  • 操作场景 Spark支持两种方式的序列化 : Java原生序列化JavaSerializer Kryo序列化KryoSerializer 序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略。 KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持,兼容性不好,所以需要手动注册类。 序列化功能用在两个地方:序列化任务和序列化数据。Spark任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和KryoSerializer。
  • 操作步骤 Spark程序运行时,在shuffle和RDD Cache等过程中,会有大量的数据需要序列化,默认使用JavaSerializer,通过配置让KryoSerializer作为数据序列化器来提升序列化性能。 在开发应用程序时,添加如下代码来使用KryoSerializer作为数据序列化器。 实现类注册器并手动注册类。 package com.etl.common;import com.esotericsoftware.kryo.Kryo;import org.apache.spark.serializer.KryoRegistrator; public class DemoRegistrator implements KryoRegistrator{ @Override public void registerClasses(Kryo kryo) { //以下为示例类,请注册自定义的类 kryo.register(AggrateKey.class); kryo.register(AggrateValue.class); }} 您可以在Spark客户端对spark.kryo.registrationRequired参数进行配置,设置是否需要Kryo注册序列化。 当参数设置为true时,如果工程中存在未被序列化的类,则会发生异常。如果设置为false(默认值),Kryo会自动将未注册的类名写到对应的对象中。此操作会对系统性能造成影响。设置为true时,用户需手动注册类,针对未序列化的类,系统不会自动写入类名,而是发生异常,相对比false,其性能较好。 配置KryoSerializer作为数据序列化器和类注册器。 val conf = new SparkConf()conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryo.registrator", "com.etl.common.DemoRegistrator")
  • 配置场景 Spark SQL的表中,经常会存在很多小文件(大小远小于HDFS块大小),每个小文件默认对应Spark中的一个Partition,也就是一个Task。在很多小文件场景下,Spark会起很多Task。当SQL逻辑中存在Shuffle操作时,会大大增加hash分桶数,严重影响性能。 在小文件场景下,您可以通过如下配置手动指定每个Task的数据量(Split Size),确保不会产生过多的Task,提高性能。 当SQL逻辑中不包含Shuffle操作时,设置此配置项,不会有明显的性能提升。
  • 操作场景 Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
  • 配置描述 要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 描述 默认值 spark.sql.files.maxPartitionBytes 在读取文件时,将单个分区打包的最大字节数。 单位:byte。 134217728(即128M) spark.files.openCostInBytes 打开文件的预估成本, 按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。 4M
  • 使用示例 --在default数据库和default_cluster集群下创建名为test表CREATE TABLE default.test ON CLUSTER default_cluster( `EventDate` DateTime, `id` UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/test', '{replica}')PARTITION BY toYYYYMM(EventDate)ORDER BY id
  • 使用示例 --删除表t1drop table t1 SYNC; 在删除复制表时,因为复制表需要在Zookeeper上建立一个路径,存放相关数据。ClickHouse默认的库引擎是原子数据库引擎,删除Atomic数据库中的表后,它不会立即删除,而是会在480秒后删除。在删除表时,加上SYNC字段,即可解决该问题,例如:drop table t1 SYNC; 删除本地表和分布式表,则不会出现该问题,可不带SYNC字段,例如:drop table t1;
  • 基本语法 方法一:在指定的“database_name”数据库中创建一个名为“table_name ”的表。 如果建表语句中没有包含“database_name”,则默认使用客户端登录时选择的数据库作为数据库名称。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = engine_name() [PARTITION BY expr_list] [ORDER BY expr_list] ClickHouse在创建表时建议携带PARTITION BY创建表分区。因为ClickHouse数据迁移工具是基于表的分区进行数据迁移,在创建表时如果不携带PARTITION BY创建表分区,则在集群内ClickHouseServer节点间数据迁移界面无法对该表进行数据迁移。 方法二:创建一个与database_name2.table_name2具有相同结构的表,同时可以对其指定不同的表引擎声明。 如果没有表引擎声明,则创建的表将与database_name2.table_name2使用相同的表引擎。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name AS [database_name2.]table_name2 [ENGINE = engine_name] 方法三:使用指定的引擎创建一个与SELECT子句的结果具有相同结构的表,并使用SELECT子句的结果填充它。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name ENGINE = engine_name AS SELECT ...
  • 单表并发控制配置 参数 描述 默认值 hoodie.write.lock.provider 指定lock provider,不建议使用默认值,使用org.apache.hudi.hive.HiveMetastoreBasedLockProvider org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.hivemetastore.database Hive的database 无 hoodie.write.lock.hivemetastore.table Hive的table name 无 hoodie.write.lock.client.num_retries 重试次数 10 hoodie.write.lock.client.wait_time_ms_between_retry 重试间隔 10000 hoodie.write.lock.conflict.resolution.strategy lock provider类,必须是ConflictResolutionStrategy的子类 org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy hoodie.write.lock.zookeeper.base_path 存放ZNodes的路径,同一张表的并发写入需配置一致 无 hoodie.write.lock.zookeeper.lock_key ZNode的名称,建议与Hudi表名相同 无 hoodie.write.lock.zookeeper.connection_timeout_ms zk连接超时时间 15000 hoodie.write.lock.zookeeper.port zk端口号 无 hoodie.write.lock.zookeeper.url zk的url 无 hoodie.write.lock.zookeeper.session_timeout_ms zk的session过期时间 60000 父主题: Hudi常见配置参数
  • 问题 Hive同步数据时报错: Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions :__col1,__col2
  • 配置Spark2x 使用SparkSql编辑器之前需要先修改Spark2x配置。 进入Spark2x的全部配置页面,具体操作请参考修改集群服务配置参数。 设置Spark2x多实例模式,搜索并修改Spark2x服务的以下参数: 参数名称 值 spark.thriftserver.proxy.enabled false spark.scheduler.allocation.file #{conf_dir}/fairscheduler.xml 进入JDBCServer2x自定义界面,在“spark.core-site.customized.configs”参数内,添加如下两个自定义项: 表1 自定义参数 名称 值 hadoop.proxyuser.hue.groups * hadoop.proxyuser.hue.hosts * 保存配置,重启Spark2x服务。
  • Ranger统一鉴权特性 特性说明:在Kafka 2.4.0之前版本,Kafka组件仅支持社区自带的SimpleAclAuthorizer鉴权插件,Kafka 2.4.0及之后版本, MRS Kafka同时支持Ranger鉴权插件和社区自带鉴权插件。默认使用Ranger鉴权,基于Ranger鉴权插件,可进行细粒度的Kafka Acl管理。 服务端使用Ranger鉴权插件时,若“allow.everyone.if.no.acl.found”配置为“true”,使用非安全端口访问时,所有行为将直接放行。建议使用Ranger鉴权插件的安全集群,不要开启“allow.everyone.if.no.acl.found”。
  • 执行SparkSql语句 在“Database”右侧下拉列表选择一个SparkSql中的数据库,默认数据库为“default”。 系统将自动显示数据库中的所有表。可以输入表名关键字,系统会自动搜索包含此关键字的全部表。 图1 选择数据库 单击指定的表名,可以显示表中所有的列。 光标移动到表所在的行,单击 可以查看列的详细信息。 在SparkSql语句编辑区输入查询语句。 单击后的三角并选择“解释”,编辑器将分析输入的查询语句是否有语法错误以及执行计划,如果存在语法错误则显示“Error while compiling statement”。 单击开始执行SparkSql语句。 图2 执行语句 如果希望下次继续使用已输入的SparkSql语句,请单击保存。 高级查询配置: 单击右上角的,对文件、功能、设置等信息进行配置。 查看快捷键: 单击右上角的,可查看语法和键盘快捷方式信息。 格式化SparkSql语句,请单击后的三角选择“格式” 删除已输入的SparkSql语句,请单击后的三角选择“清除” 查看历史: 单击“查询历史记录”,可查看SparkSql运行情况,支持显示所有语句或只显示保存的语句的运行情况。历史记录存在多个结果时,可以在输入框使用关键字进行搜索。
  • Kafka Idempotent 特性 特性说明:Kafka从0.11.0.0版本引入了创建幂等性Producer的功能,开启此特性后,Producer自动升级成幂等性Producer,当Producer发送了相同字段值的消息后,Broker会自动感知消息是否重复,继而避免数据重复。需要注意的是,这个特性只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区内不出现重复消息;只能实现单会话上的幂等性,这里的会话指的是Producer进程的一次运行,即重启Producer进程后,幂等性不保证。 开启方法: 二次开发代码中添加 “props.put(“enable.idempotence”,true)”。 客户端配置文件中添加 “enable.idempotence = true”。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全