华为云用户手册

  • Alluxio简介 Alluxio是一个面向基于云的数据分析和人工智能的开源的数据编排技术。它为数据驱动型应用和存储系统构建了桥梁, 将数据从存储层移动到距离数据驱动型应用更近的位置,从而能够更容易、更快地被访问。同时使得应用程序能够通过一个公共接口连接到许多存储系统。 Alluxio主要特点如下: 提供内存级I/O 吞吐率,同时降低具有弹性扩张特性的数据驱动型应用的成本开销 简化 云存储 和对象存储接入 简化数据管理,提供对多数据源的单点访问 应用程序部署简易
  • 扫描OpenTSDB的指标数据 tsdb命令可以使用“tsdb query”命令批量查询导入的指标数据,例如执行tsdb query 0 1h-ago sum sys.cpu.user host=web01命令。 Start run net.opentsdb.tools.CliQuery, args: 0 1h-ago sum sys.cpu.user host=web01sys.cpu.user 1356998400000 41 {host=web01, cpu=0}sys.cpu.user 1356998401000 42 {host=web01, cpu=0}sys.cpu.user 1356998402000 44 {host=web01, cpu=0}sys.cpu.user 1356998403000 47 {host=web01, cpu=0}sys.cpu.user 1356998404000 42 {host=web01, cpu=0}sys.cpu.user 1356998405000 42 {host=web01, cpu=0}
  • 经验总结 数据倾斜 当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。 需要重新设计key,以更小粒度的key使得task大小合理化。 修改并行度。 调用rebalance操作,使数据分区均匀。 缓冲区超时设置 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下: env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  • 配置进程参数 Flink on YARN模式下,有JobManager和TaskManager两种进程。在任务调度和运行的过程中,JobManager和TaskManager承担了很大的责任。 因而JobManager和TaskManager的参数配置对Flink应用的执行有着很大的影响意义。用户可通过如下操作对Flink集群性能做优化。 配置JobManager内存。 JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务平行度增大时,JobManager内存都需要相应增大。 您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。 在使用yarn-session命令时,添加“-jm MEM”参数设置内存。 在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。 配置TaskManager个数。 每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。 在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。 在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。 配置TaskManager Slot数。 每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。 在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。 在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。 配置TaskManager内存。 TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。 将在使用yarn-session命令时,添加“-tm MEM”参数设置内存。 将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。
  • 配置netty网络通信 Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。 以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。 “taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。 “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。 “taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。 “taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。 “taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
  • 代码样例 用于获取Alluxio上某个指定文件的内容。 以下为部分代码片段: /*** read file* @throws java.io.IOException*/private void read() throws IOException {AlluxioURI path = new AlluxioURI(testFilePath);FileInStream in = null;try{in = fSystem.openFile(path);byte[] buffer = new byte[1024];int len;String content = "";while((len = in.read(buffer)) != -1){String bufferStr = new String(buffer,0, len);content += bufferStr;}System.out.println(content);}catch (Exception e){System.out.println("Failed to read file. Exception:" + e);}finally {close(in);}}
  • Savepoints相关问题解决方案 用户必须为job中的所有算子均分配ID吗? 严格的说,用户只给有状态的算子分配IDs即可,因为在savepoint中仅包括有状态的算子的状态,没有状态的算子并不包含在savepoint中。 在实际应用中,强烈建议用户给所有的算子均分配ID,因为有些Flink的内置算子,如window算子是有状态的。具体哪个算子是有状态的,哪个算子是无状态的,不是十分明显。如果用户十分确定某个算子是无状态的,该算子可以不调用uid()方法分配ID。 如果用户在升级作业时新添加一个有状态的算子有什么影响? 当用户在作业中新添加一个有状态的算子时,由于该算子是新添加的,无保存的旧状态,因此无状态恢复,从0开始运行。 如果用户在升级作业时从作业中删除一个有状态的算子有什么影响? 默认情况下,savepoint会尝试将所有保存的状态恢复。如果用户使用的savepoint中包含已经删除算子的状态,恢复将会失败。 用户可以通过--allowNonRestoredState(简写为-n)参数跳过恢复已经删除的算子的状态: $ bin/flink run -s savepointPath -n [runArgs] 如果用户重新编排有状态的算子的顺序有什么影响? 如果用户已经给这些算子分配IDs,那么这些状态会正常恢复。 如果用户没有给这些算子分配IDs, 这些算子将会按新的顺序自动分配新的ID,这将导致状态恢复失败。 如果用户在作业中删除或添加或更改无状态算子的顺序有什么影响? 如果用户已经给有状态的算子分配ID,那么无状态的算子并不会影响从savepoint进行状态恢复。 如果用户没有分配IDs,有状态算子的IDs由于顺序变化可能会被分配新的IDs,这将导致状态恢复失败。 如果用户在状态恢复时改变了算子的并发度会有什么影响? 如果Flink版本高于1.2.0且不使用已经废弃的状态API,如checkpointed,用户可以从savepoint中进行状态恢复。否则,无法恢复。 父主题: Flink应用开发常见问题
  • MRS 各组件样例工程汇总 样例工程获取地址参见获取MRS应用开发样例工程,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 MRS样例代码库提供了各组件的基本功能样例工程供用户使用,当前版本各组件提供的样例工程汇总参见表1。 表1 各组件样例工程汇总(2.x版本) 组件 样例工程位置 描述 Alluxio alluxio-examples 使用Alluxio通过公共接口连接到存储系统示例程序。可实现写文件、读文件等功能。 Flink flink-examples 该样例工程提供以下样例程序: DataStream程序 Flink构造DataStream的Java/Scala示例程序。本工程示例为基于业务要求分析用户日志数据,读取文本数据后生成相应的DataStream,然后筛选指定条件的数据,并获取结果。 向Kafka生产并消费数据程序 Flink向Kafka生产并消费数据的Java/Scala示例程序。在本工程中,假定某个Flink业务每秒就会收到1个消息记录,启动Producer应用向Kafka发送数据,然后启动Consumer应用从Kafka接收数据,对数据内容进行处理后并打印输出。 异步Checkpoint机制程序 Flink异步Checkpoint机制的Java/Scala示例程序。本工程中,程序使用自定义算子持续产生数据,产生的数据为一个四元组(Long,String,String,Integer)。数据经统计后,将统计结果打印到终端输出。每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 Stream SQL Join程序 Flink SQL Join示例程序。本工程示例调用flink-connector-kafka模块的接口,生产并消费数据。生成Table1和Table2,并使用Flink SQL对Table1和Table2进行联合查询,打印输出结果。 HBase hbase-examples HBase数据读写操作的应用开发示例。 通过调用HBase接口可实现创建用户表、导入用户数据、增加用户信息、查询用户信息及为用户表创建二级索引等功能。 HDFS hdfs-examples HDFS文件操作的Java示例程序。 本工程主要给出了创建HDFS文件夹、写文件、追加文件内容、读文件和删除文件/文件夹等相关接口操作示例。 Hive hive-examples 该样例工程提供以下JDBC/HCatalog样例程序: Hive JDBC处理数据Java示例程序。 本工程使用JDBC接口连接Hive,在Hive中执行相关数据操作。使用JDBC接口实现创建表、加载数据、查询数据等功能。 Hive HCatalog处理数据Java示例程序。 使用HCatalog接口实现通过Hive命令行方式对MRS Hive元数据进行数据定义和查询操作。 Impala impala-examples Impala JDBC处理数据Java示例程序。 本工程使用JDBC接口连接Impala,在Impala中执行相关数据操作。使用JDBC接口实现创建表、加载数据、查询数据等功能。 Kafka kafka-examples Kafka流式数据的处理Java示例程序。 本工程基于Kafka Streams完成单词统计功能,通过读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,然后将统计结果以Key-Value的形式输出。 MapReduce mapreduce-examples MapReduce任务提交Java示例程序。 本工程提供了一个MapReduce统计数据的应用开发示例,实现数据分析、处理,并输出满足用户需要的数据信息。 另外以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 Presto presto-examples 该样例工程提供以下JDBC/HCatalog样例程序: Presto JDBC处理数据Java示例程序。 本工程使用JDBC接口连接Presto,在Presto中执行相关数据操作。使用JDBC接口实现创建表、加载数据、查询数据等功能。 Presto HCatalog处理数据Java示例程序。 OpenTSDB opentsdb-examples 通过调用OpenTSDB接口可实现采集大规模集群中的监控类信息,并可实现数据的秒级查询。该样例程序主要实现写入数据、查询数据、删除数据等功能。 Spark spark-examples SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 SparkHbasetoHbaseScalaExample SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 SparkHivetoHbaseScalaExample SparkJavaExample Spark Core任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkPythonExample SparkScalaExample SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 SparkLauncherScalaExample SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 SparkOnHbaseScalaExample SparkSQLJavaExample Spark SQL任务的Java/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkSQLScalaExample SparkStreamingJavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程示例为基于业务要求分析用户日志数据,读取文本数据后生成相应的DataStream,然后筛选指定条件的数据,并获取结果。 SparkStreamingScalaExample SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 SparkStreamingKafka010ScalaExample SparkStreamingtoHbaseJavaExample Spark Streaming读取Kafka数据并写入HBase的Java/Scala示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 SparkStreamingtoHbaseScalaExample SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 SparkStructuredStreamingScalaExample SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JD BCS erver的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 SparkThriftServerScalaExample Storm storm-examples storm-common-examples 构造Storm拓扑和开发Spout/Bolt样例程序。可实现创建Spout、创建Bolt、创建Topology等功能。 storm-hbase-examples MRS的Storm与HBase组件实现交互的示例程序。实现提交Storm拓扑将数据存储到HBase的WordCount表中。 storm-hdfs-examples MRS的Storm与HDFS组件实现交互的示例程序。实现提交Storm拓扑数据存储到HDFS的功能。 storm-jdbc-examples 使用JDBC访问MRS Storm的样例程序。实现使用Storm拓扑向表中插入数据功能。 storm-kafka-examples MRS的Storm与Kafka组件实现交互的示例程序。实现使用Storm拓扑向Kafka中发送数据并查看。 storm-obs-examples MRS的Storm与OBS实现交互的示例程序。实现提交Storm拓扑数据存储到OBS功能。
  • 添加安全组规则 如需为其他用户开通访问MRS Manager的权限,参考该小节内容添加对应用户访问公网的IP地址为可信范围。 在MRS管理控制台,在“现有集群” 列表,单击指定的集群名称,进入集群信息页面。 单击弹性公网IP后边的“添加安全组规则”,如图3所示。 图3 集群详情 进入“添加安全组规则”页面,添加需要开放权限用户访问公网的IP地址段并勾选“我确认这里设置的授权对象是可信任的公网访问IP范围,禁止使用0.0.0.0/0,否则会有安全风险。”如图4所示。 图4 添加安全组规则 默认填充的是用户访问公网的IP地址,用户可根据需要修改IP地址段,如需开放多个IP段为可信范围,请重复执行步骤6-步骤9。如需对安全组规则进行查看,修改和删除操作,请单击“管理安全组规则”。 单击“确定”完成安全组规则添加。
  • 登录MRS Manager 登录MRS管理控制台页面。 在“现有集群” 列表,单击指定的集群名称,进入集群信息页面。 单击“点击管理”,打开“访问MRS Manager页面”。 若用户创建集群时已经绑定弹性公网IP,如图1所示。 选择待添加的安全组规则所在安全组,该安全组在创建群时配置。 添加安全组规则,默认填充的是用户访问公网IP地址9022端口的规则,如需开放多个IP段为可信范围用于访问MRS Manager页面,请参考添加安全组规则。如需对安全组规则进行查看,修改和删除操作,请单击“管理安全组规则”。 自动获取的访问公网IP与用户本机IP不一致,属于正常现象,无需处理。 9022端口为knox的端口,需要开启访问knox的9022端口权限,才能访问MRS Manager服务。 勾选“我确认xx.xx.xx.xx为可信任的公网访问IP,并允许从该IP访问MRS Manager页面。” 图1 添加访问MRS Manager的安全组规则 若用户创建集群时暂未绑定弹性公网IP,如图2所示。 在弹性公网IP下拉框中选择可用的弹性公网IP或单击“管理弹性公网IP”创建弹性公网IP。 选择待添加的安全组规则所在安全组,该安全组在创建群时配置。 添加安全组规则,默认填充的是用户访问公网IP地址9022端口的规则,如需开放多个IP段为可信范围用于访问MRS Manager页面,请参考添加安全组规则。如需对安全组规则进行查看,修改和删除操作,请单击“管理安全组规则”。 自动获取的访问公网IP与用户本机IP不一致,属于正常现象,无需处理。 9022端口为knox的端口,需要开启访问knox的9022端口权限,才能访问MRS Manager服务。 勾选“我确认xx.xx.xx.xx为可信任的公网访问IP,并允许从该IP访问MRS Manager页面。” 图2 绑定弹性公网IP 单击“确定”,进入MRS Manager登录页面。 输入创建集群时默认的用户名“admin”及设置的密码,单击“登录”进入MRS Manager页面。
  • 操作步骤 运行结果会有如下成功信息: ...2020-01-09 10:43:49,338 INFO [main] examples.HBaseExample: Entering dropTable.2020-01-09 10:43:49,341 INFO [main] client.HBaseAdmin: Started disable of hbase_sample_table2020-01-09 10:43:50,080 INFO [main] client.HBaseAdmin: Operation: DISABLE, Table Name: default:hbase_sample_table, procId: 41 completed2020-01-09 10:43:50,550 INFO [main] client.HBaseAdmin: Operation: DELETE, Table Name: default:hbase_sample_table, procId: 43 completed2020-01-09 10:43:50,550 INFO [main] examples.HBaseExample: Drop table successfully.2020-01-09 10:43:50,550 INFO [main] examples.HBaseExample: Exiting dropTable.2020-01-09 10:43:50,550 INFO [main] client.ConnectionImplementation: Closing master protocol: MasterService2020-01-09 10:43:50,556 INFO [main] examples.TestMain: -----------finish to test HBase API------------------- 在Windows环境运行样例代码时会出现下面的异常,但是不影响业务: java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 日志说明 日志级别默认为INFO,可以通过调整日志打印级别(DEBUG,INFO,WARN,ERROR,FATAL)来显示更详细的信息。可以通过修改log4j.properties文件来实现,如: hbase.root.logger=INFO,consolelog4j.logger.org.apache.zookeeper=INFO#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUGlog4j.logger.org.apache.hadoop.hbase=INFO# Make these two classes DEBUG-level. Make them DEBUG to see more zk debug.log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFOlog4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
  • 问题 Flink内核升级到1.3.0之后,当kafka调用带有非static的KafkaPartitioner类对象为参数的FlinkKafkaProducer010去构造函数时,运行时会报错。 报错内容如下: org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.
  • 回答 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • Impala简介 Impala直接对存储在HDFS,HBase 或 对象存储服务 (OBS)中的Hadoop数据提供快速,交互式SQL查询。除了使用相同的统一存储平台之外,Impala还使用与Apache Hive相同的元数据,SQL语法(Hive SQL),ODBC驱动程序和用户界面(Hue中的Impala查询UI)。这为实时或面向批处理的查询提供了一个熟悉且统一的平台。作为查询大数据的工具补充,Impala不会替代基于MapReduce构建的批处理框架,例如Hive。基于MapReduce构建的Hive和其他框架最适合长时间运行的批处理作业。 Impala主要特点如下: 支持Hive查询语言(HiveQL)中大多数的SQL-92功能,包括 SELECT,JOIN和聚合函数。 HDFS,HBase 和对象存储服务(OBS)存储,包括: HDFS文件格式:基于分隔符的text file,Parquet,Avro,SequenceFile和RCFile。 压缩编解码器:Snappy,GZIP,Deflate,BZIP。 常见的数据访问接口包括: JDBC驱动程序。 ODBC驱动程序。 HUE beeswax和Impala查询UI。 impala-shell命令行接口。 支持Kerberos身份认证。 Impala主要应用于实时查询数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。
  • Alluxio开发环境简介 在进行应用开发时,要准备的本地开发环境如表1所示。同时需要准备运行调测的Linux环境,用于验证应用程序运行是否正常。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Linux系统。 安装JDK和Maven 开发环境的基本配置:Java JDK 8或以上、Maven 3.3.9或以上 安装和配置Eclipse或IntelliJ IDEA 用于开发Alluxio应用程序的工具。 网络 确保客户端与Alluxio服务主机在网络上互通。 父主题: 准备Alluxio应用开发环境
  • 样例代码 -- 从本地文件系统/opt/impala_examples_data/目录下将employee_info.txt加载进employees_info表中. LOAD DATA LOCAL INPATH '/opt/impala_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info;-- 从HDFS上/user/impala_examples_data/employee_info.txt加载进employees_info表中. LOAD DATA INPATH '/user/impala_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; 加载数据的实质是将数据拷贝到HDFS上指定表的目录下。 “LOAD DATA LOCAL INPATH”命令可以完成从本地文件系统加载文件到Impala的需求,但是当指定“LOCAL”时,这里的路径指的是当前连接的“Impalad”的本地文件系统的路径。
  • Impala应用开发环境简介 在进行应用开发时,要准备的本地开发环境如表1所示。同时需要准备运行调测的环境,用于验证应用程序运行正常。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Linux系统。 安装JDK 开发和运行环境的基本配置。版本要求如下: MRS集群的服务端和客户端仅支持自带的Oracle JDK(版本为1.8),不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的,支持Oracle JDK和IBM JDK。 Oracle JDK:支持1.7和1.8版本。 IBM JDK:推荐1.7.8.10、1.7.9.40和1.8.3.0版本。 安装和配置Eclipse 用于开发Impalad应用程序的工具。版本要求如下: JDK使用1.7版本,Eclipse使用3.7.1及以上版本。 JDK使用1.8版本,Eclipse使用4.3.2及以上版本。 说明: 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。 网络 确保客户端与Impala服务主机在网络上互通。 父主题: 准备Impala应用开发环境
  • 回答 问题原因: 在IBM JDK下建立的JDBC connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用relogin也无法得到刷新。 解决措施: 通常情况下,在发现JDBC connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • 代码样例 /*** create file,write file*/private void write() throws IOException {final String content = "hi, I am bigdata. It is successful if you can see me.";FileOutStream out = null;try {AlluxioURI path = new AlluxioURI(testFilePath);out = fSystem.createFile(path);out.write(content.getBytes());}catch (Exception e){System.out.println("Failed to write file. Exception:" + e);}finally {close(out);}}
  • Flink应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Flink应用程序开发流程 表1 Flink应用开发流程说明 阶段 说明 参考文档 了解基本概念 开始开发应用前,需要了解Flink的基本概念。 Flink应用开发常用概念 准备开发环境和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 配置并导入Flink样例工程 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 Flink DataStream应用开发思路 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编译并运行Flink应用 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看Flink应用运行结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 Flink应用性能调优建议 父主题: Flink应用开发概述
  • 数据规划 Flink样例工程的数据存储在Kafka组件中。Flink向Kafka组件发送数据(需要有kafka权限用户),并从Kafka组件获取数据。 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。 创建Topic。 在服务端配置用户创建topic的权限。 开启Kerberos认证的安全集群将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。配置完后重启kafka服务。未开启Kerberos认证的普通集群无需此配置。 用户使用Linux命令创建topic,如果是安全集群,用户执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。 flinkuser需要用户自己创建,并拥有创建Kafka的topic权限。具体操作请参考准备Flink应用开发用户。 创建topic的命令格式: bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic} 表1 参数说明 参数名 说明 {zkQuorum} ZooKeeper集群信息,格式为IP:port。 {partitionNum} topic的分区数。 {replicationNum} topic中每个partition数据的副本数。 {Topic} Topic名称。 示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,Topic名称为topic1的数据为例。 bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181/kafka --partitions 5 --replication-factor 1 --topic topic1 如果集群开启了kerberos, 执行该步骤进行安全认证,否则跳过该步骤。 Kerberos认证配置 客户端配置。 在Flink配置文件“flink-conf.yaml”中,增加kerberos认证相关配置(主要在“contexts”项中增加“KafkaClient”),示例如下: security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytabsecurity.kerberos.login.principal: adminsecurity.kerberos.login.contexts: Client,KafkaClientsecurity.kerberos.login.use-ticket-cache: false 运行参数。 关于“SASL_PLAINTEXT”协议的运行参数示例如下: --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007表示kafka服务器的IP:port
  • Storm接口介绍 Storm采用的接口同开源社区版本保持一致,详情请参见: http://storm.apache.org/documentation/Home.html。 Storm-HDFS采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-hdfs。 Storm-HBase采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-hbase。 Storm-Kafka采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-kafka。 Storm-JDBC采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-jdbc。 父主题: Storm应用开发常见问题
  • 回答 Flink引入了第三方软件包RocksDB的缺陷问题导致该现象的发生。建议用户将checkpoint设置为FsStateBackend方式。 用户需要在应用代码中将checkpoint设置为FsStateBackend。例如: env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/"));
  • 部署运行及结果查看 获取相关配置文件,获取方式如下。 安全模式:参见4获取keytab文件。 普通模式:无。 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。 使用Kafka客户端创建拓扑中所用到的Topic,执行命令。 ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。 安全模式下,需要kafka管理员用户创建Topic。 在Linux系统中完成拓扑的提交。提交命令示例(拓扑名为kafka-test)。 storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test 安全模式下,在提交storm-examples-1.0.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。 安全模式下,kafka需要用户有相应Topic的访问权限,因此首先需要给用户赋权,再提交拓扑。 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。 在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令: ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --new-consumer --consumer.config ../../../Kafka/kafka/config/consumer.properties 同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令: ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../../../Kafka/kafka/config/producer.properties 向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。
  • 应用开发操作步骤 确认MRS产品Storm和Kafka组件已经安装,并正常运行。 已搭建Storm示例代码工程,将storm-examples导入到Eclipse开发环境,参见导入并配置Storm样例工程。 用WinScp工具将Storm客户端安装包导入Linux环境并安装客户端,参见准备Linux客户端环境。 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于认证,并且获取到该用户的keytab文件。将获取到的文件拷贝到示例工程的 src/main/resources目录。 获取的用户需要同时属于storm组和kafka组。 下载并安装Kafka客户端程序,参见《Kafka应用开发》。
  • 调测HBase Python样例程序 仅MRS 1.9.x及之前版本支持HBase python样例代码调测。 HBase支持使用自带的ThriftServer2服务通过python来访问HBase服务。python样例仅支持在Linux环境中运行,调测HBase python样例程序需有与集群环境网络相通的E CS ,详情请参见准备本地应用开发环境,并需要安装python环境,安装包下载详情请参见:https://www.python.org/。当前以在集群的master节点上运行样例为例。 搭建样例运行环境。 获取运行样例程序时python依赖,请从https://pypi.org/地址中搜索下载decorator、gssapi、kerberos、krbcontext、pure-sasl、thrift包(未开启Kerberos认证的普通集群仅需安装thrift包),并上传到master节点上,例如:新建目录“/opt/hbase-examples/python”,并上传到该目录下。 decorator-4.3.2.tar.gzgssapi-1.5.1.tar.gzkerberos-1.3.0.tar.gzkrbcontext-0.8.tar.gzpure-sasl-0.6.1.tar.gzthrift-0.11.0.tar.gz 将样例工程中的“hbase-python-example”文件夹上传到集群Master节点的“/opt/hbase-examples”下,并上传从准备HBase应用开发用户中获取的认证文件至该目录下。 在“/opt/hbase-examples”新建hbasepython.properties文件,并修改配置内容如下。 clientHome=/opt/clientexampleCodeDir=/opt/hbase-examples/hbase-python-examplepythonLib=/opt/hbase-examples/pythonkeyTabFile=/opt/hbase-examples/user.keytabuserName=hbaseuserthriftIp=xxx.xxx.xx.xxx clientHome为集群客户端路径 exampleCodeDir为hbase-python-example文件路径 pythonLib为1中python依赖存放路径 keyTabFile为从准备HBase应用开发用户获取的用户认证凭据user.keytab userName为准备HBase应用开发用户中开发用户名 thriftIp为安装了thriftserver2的节点的IP地址 执行如下命令创建表名为example的HBase表。 source /opt/client/bigdata_env kinit 用户名 echo "create 'example','family1','family2'" | hbase shell 安装python环境并运行程序。 cd /opt/hbase-examples/hbase-python-examplesh initEnvAndRunDemo.sh /opt/hbase-examples/hbasepython.properties 在运行程序之前需要格式化initEnvAndRunDemo.sh脚本。例如执行,dos2unix /opt/hbase-examples/hbasepython.properties 在执行脚本前,请确定“example”表包含列族'family1','family2'并已经存在于集群中。 再次运行程序时只需进入“/opt/hbase-examples/hbase-python-example”目录下,执行如下命令执行程序调测样例:python DemoClient.py HBase Python应用程序运行完成后,可直接通过运行结果查看应用程序运行情况。 图1 程序运行成功信息 父主题: 调测HBase应用
  • 准备认证机制代码 在开启Kerberos认证的环境下,各个组件之间的相互通信不能够简单地互通,而需要在通信之前进行相互认证,以确保通信的安全性。Kafka应用开发需要进行Kafka、ZooKeeper、Kerberos的安全认证,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。提供了LoginUtil相关接口来完成这些配置,如下样例代码中只需要配置用户自己申请的账号名称和对应的keytab文件名称即可,由于人机账号的keytab会随用户密码过期而失效,故建议使用机机账号进行配置。 认证样例代码: 设置keytab认证文件模块 /** * 用户自己申请的账号keytab文件名称 */ private static final String USER_KEYTAB_FILE = "用户自己申请的账号keytab文件名称"; /** * 用户自己申请的账号名称 */ private static final String USER_PRINCIPAL = "用户自己申请的账号名称"; MRS服务Kerberos认证模块,如果服务没有开启kerberos认证,这块逻辑不执行 public static void securityPrepare() throws IOException { String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; String krbFile = filePath + "krb5.conf"; String userKeyTableFile = filePath + USER_KEYTAB_FILE; //windows路径下分隔符替换 userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); krbFile = krbFile.replace("\\", "\\\\"); LoginUtil.setKrb5Config(krbFile); LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); } 如果修改了集群kerberos 域名 ,需要在代码中增加kerberos.domain.name的配置,并按照hadoop.expr=toLowerCase(%{default_realm}%{KerberosServer})规则配置正确的域名信息。例如:修改域名为HUAWEI.COM,则配置为hadoop.huawei.com。
  • 操作场景 本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。 本章节代码样例基于Kafka新API,对应Eclipse工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。 本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • Kafka 样例工程配置文件说明 Conf目录下各配置文件及重要参数配置说明。 Producer API配置项。 表1 producer.properties文件配置项 参数 描述 备注 security.protocol 安全协议类型 生产者使用的安全协议类型,当前Kerberos开启的模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。Kerberos未开启的模式下配置为PLAINTEXT。 kerberos.domain.name 域名 MRS服务集群的Kerberos域名,未开启Kerberos认证的集群无需配置。 sasl.kerberos.service.name 服务名 Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。未开启Kerberos认证的集群无需配置。 Consumer API配置项。 表2 consumer.properties文件配置项 参数 描述 备注 security.protocol 安全协议类型 消费者使用的安全协议类型,当前安全模式下Kerberos开启的模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。Kerberos未开启的模式下配置为PLAINTEXT。 kerberos.domain.name 域名 MRS服务集群的Kerberos域名,未开启Kerberos认证的集群无需配置。 group.id 消费者的group id - auto.commit.interval.ms 是否自动提交offset 布尔值参数,默认值为true sasl.kerberos.service.name 服务名 Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。未开启Kerberos认证的集群无需配置。 客户端信息配置项。 表3 client.properties文件配置项 参数 描述 备注 metadata.broker.list 元数据Broker地址列表 通过此参数值,创建与元数据Broker之间的连接,需要直接访问元数据的API需要用到此参数。访问端口仅支持不开启Kerberos模式下的端口,端口说明详见Kafka安全接口介绍 kafka.client.zookeeper.principal kafka集群访问zookeeper的认证和域名 - bootstrap.servers Broker地址列表 通过此参数值,创建与Broker之间的连接。端口配置项详见Kafka安全接口介绍 zookeeper.connect zookeeper地址列表 通过此参数,访问zookeeper,末尾需要带上kafka服务名kafka MRS服务是否开启Kerberos认证配置项。 表4 kafkaSecurityMode文件配置项 参数 描述 备注 kafka.client.security.mode kafka所在的MRS服务集群是否开启Kerberos认证配置项 若开启了Kerberos认证,设置为yes,否则设置为no。 log4j日志配置项文件log4j.properties log4j日志框架的配置文件,默认情况不输入样例工程运行日志。 父主题: 开发Kafka应用
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全