华为云用户手册

  • 参数解释 “coordinator.xml”中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 frequency 流程定时执行的时间间隔 start 定时流程任务启动时间 end 定时流程任务终止时间 workflowAppUri Workflow流程任务在HDFS上的存放路径 resourceManager MapReduce ResourceManager地址 queueName 任务处理时使用的MapReduce队列名 nameNode HDFS NameNode集群地址 “${变量名}”表示:该值来自job.properties所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见配置Oozie作业运行参数)
  • 通过Java API提交Oozie作业开发思路 通过典型场景,用户可以快速学习和掌握Oozie的开发过程,并且对关键的接口函数有所了解。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,代码示例只涉及了MapReduce作业,其他作业的API调用代码是一样的,仅job配置“job.properties”与工作流配置文件“workflow.xml”需根据实际情况设置。 完成导入并配置Oozie样例工程操作后即可执行通过Java API提交MapReduce作业和查询作业状态。 父主题: 通过Java API提交Oozie作业
  • 准备本地应用开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 Windows系统,支持Windows 7以上版本。 开发和运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK,支持1.8版本;IBM JDK,支持1.8.5.11版本。 TaiShan客户端:OpenJDK,支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 安装和配置IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备 MRS 应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-zip 16.04版本。 父主题: 准备Oozie应用开发环境
  • 开发流程 工作流配置文件“workflow.xml”(“coordinator.xml”是对工作流进行调度,“bundle.xml”是对一组Coordinator进行管理)与“job.properties”。 如果有实现代码,需要开发对应的jar包,例如Java Action;如果是Hive,则需要开发SQL文件。 上传配置文件与jar包(包括依赖的jar包)到HDFS,上传的路径取决于“workflow.xml”中的“oozie.wf.application.path”参数配置的路径。 提供三种方式对工作流进行操作,详情请参见Oozie应用开发常见问题。 Shell命令 Java API Hue Oozie客户端提供了比较完整的examples示例供用户参考,包括各种类型的Action,以及Coordinator以及Bundle的使用。以客户端安装目录为“/opt/client”为例,examples具体目录为“/opt/client/Oozie/oozie-client-*/examples”。 如下通过一个MapReduce工作流的示例演示如何配置,并通过Shell命令调用。
  • 参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: DataStream程序样例工程(Java/Scala) flink-dist_*.jar flink-dist_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 向Kafka生产并消费数据程序样例工程(Java/Scala) kafka-clients-*.jar flink-connector-kafka_*.jar flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 异步Checkpoint机制程序样例工程(Java/Scala) flink-dist_*.jar pipeline程序样例工程(Java/Scala) flink-connector-netty_*.jar flink-dist_*.jar flink-shaded-curator-*.jar curator-client-2.12.0.jar curator-framework-2.12.0.jar flink-shaded-curator-*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-connector-netty_*.jar、curator-client-2.12.0.jar、curator-framework-2.12.0.jar可在二次开发样例代码编译后产生的lib文件夹下获取。 flink-shaded-curator-*.jar仅适用于MRS 3.0.X集群。 curator-client-2.12.0.jar、curator-framework-2.12.0.jar仅适用于MRS 3.1.X集群。 Stream SQL Join样例工程(Java) kafka-clients-*.jar flink-connector-kafka_2.11*.jar flink-connector-kafka-base_*.jar flink-connector-kafka_*.jar flink-dist_2.11*.jar flink-table_2.11*.jar flink-connector-kafka-base_*.jar、flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的“opt”目录下获取。 flink-connector-kafka-base_*.jar、flink-connector-kafka_*.jar仅适用于MRS 3.0.X集群。 flink-connector-kafka_2.11*.jar、flink-dist_2.11*.jar和flink-table_2.11*.jar仅适用于MRS 3.1.X集群。
  • 数据规划 DataStream样例工程的数据存储在文本中。 将log1.txt和log2.txt放置在指定路径下,例如"/opt/log1.txt"和"/opt/log2.txt"。 数据文件若存放在本地文件系统,需在所有部署Yarn NodeManager的节点指定目录放置,并设置运行用户访问权限。 或将数据文件放置于HDFS,并指定程序中读取文件路径HDFS路径,例如"hdfs://hacluster/path/to/file"。
  • 场景说明 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: DataStream应用程序可以在Windows环境和Linux环境中运行。 实时统计总计网购时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 操作步骤 修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换用户keytab文件名称,用户principal名称,和Jar文件地址。 使用remoteSubmit方式提交应用程序 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.REMOTE); } 根据实际情况修改userJarFilePath为实际的拓扑Jar包地址 private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "D:\\example.jar"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPO LOG Y_NAME, config, builder.createTopology()); } 安全模式下需要执行安全准备,根据实际情况修改userKeyTablePath和userPrincipal为导入并配置Storm样例工程章节的步骤2中所获取用户的keytab文件路径和principal private static void securityPrepare(Config config) throws IOException { String userKeyTablePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "user.keytab"; String userPrincipal = "StreamingDeveloper"; String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator +"krb5.conf"; //windows路径下分隔符替换 userKeyTablePath = userKeyTablePath.replace("\\", "\\\\"); krbFilePath = krbFilePath.replace("\\", "\\\\"); String principalInstance = String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE)); LoginUtil.setKrb5Config(krbFilePath); LoginUtil.setZookeeperServerPrincipal("zookeeper/" + principalInstance); LoginUtil.setJaasFile(userPrincipal, userKeyTablePath); } 执行WordCountTopology.java类的Main方法提交应用程序。
  • REST API增强 SQL相关的命令:获取所有SQL语句和执行时间最长的SQL语句 SparkUI命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 可以在命令后的url路径增加相应的参数设置,搜索对应的SQL语句。 例如,查看100条sql语句: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?limit=100" 查看正在运行的参数: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?completed=false" JobHistory命令: curl -k -i --negotiate -u: "https://192.168.227.16:18080/api/v1/applications/application_1478570725074_0004/SQL" 其中192.168.227.16为JobHistory节点的业务IP,18080为JobHistory的端口号,application_1478570725074_0004为应用ID。 结果: SparkUI命令和JobHistory命令的查询结果均为: { "longestDurationOfCompletedSQL" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] } ], "sqls" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] }]} 结果分析: 通过这个命令,可以查询当前应用的所有SQL语句的信息(即结果中“sqls”的部分),执行时间最长的SQL语句的信息(即结果中“longestDurationOfCompletedSQL”的部分)。每个SQL语句的信息如下表3。 表3 SQL的常用信息 参数 描述 id SQL语句的ID status SQL语句的执行状态,有RUNNING、COMPLETED、FAILED三种 runningJobs SQL语句产生的job中,正在执行的job列表 successedJobs SQL语句产生的job中,执行成功的job列表 failedJobs SQL语句产生的job中,执行失败的job列表 JDBC Server相关的命令:获取连接数,正在执行的SQL数,所有session信息,所有SQL的信息 命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/sqlserver" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 结果: { "sessionNum" : 1, "runningSqlNum" : 0, "sessions" : [ { "user" : "spark", "ip" : "192.168.169.84", "sessionId" : "9dfec575-48b4-4187-876a-71711d3d7a97", "startTime" : "2016/10/29 15:21:10", "finishTime" : "", "duration" : "1 minute 50 seconds", "totalExecute" : 1 } ], "sqls" : [ { "user" : "spark", "jobId" : [ ], "groupId" : "e49ff81a-230f-4892-a209-a48abea2d969", "startTime" : "2016/10/29 15:21:13", "finishTime" : "2016/10/29 15:21:14", "duration" : "555 ms", "statement" : "show tables", "state" : "FINISHED", "detail" : "== Parsed Logical Plan ==\nShowTablesCommand None\n\n== Analyzed Logical Plan ==\ntableName: string, isTemporary: boolean\nShowTablesCommand None\n\n== Cached Logical Plan ==\nShowTablesCommand None\n\n== Optimized Logical Plan ==\nShowTablesCommand None\n\n== Physical Plan ==\nExecutedCommand ShowTablesCommand None\n\nCode Generation: true" } ]} 结果分析: 通过这个命令,可以查询当前JDBC应用的session连接数,正在执行的SQL数,所有的session和SQL信息。每个session的信息如下表4,每个SQL的信息如下表5。 表4 session常用信息 参数 描述 user 该session连接的用户 ip session所在的节点IP sessionId session的ID startTime session开始连接的时间 finishTime session结束连接的时间 duration session连接时长 totalExecute 在该session上执行的SQL数 表5 sql常用信息 参数 描述 user SQL执行的用户 jobId SQL语句包含的job id列表 groupId SQL所在的group id startTime SQL开始时间 finishTime SQL结束时间 duration SQL执行时长 statement 对应的语句 detail 对应的逻辑计划,物理计划 JDBC api增强通过beeline里面获取的executionID 取消当前正在执行的SQL 命令: curl -k -i --negotiate -X PUT -u: "https://192.168.195.232:8090/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/cancel/execution?executionId=8" 结果: 取消executionId 执行序号为8的job任务。 补充说明: spark-beeline里面执行SQL语句,如果该SQL语句产生spark任务,该SQL的executionId将会被打印在beeline里面,这个时候如果想取消这条sql的执行,可以用上述命令。 Streaming相关的命令:获取平均输入频率,平均调度时延,平均执行时长,总时延平均值 命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/streaming/statistics" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1477722033672_0008为在YARN中的应用ID。 结果: {"startTime" : "2018-12-25T08:58:10.836GMT", "batchDuration" : 1000, "numReceivers" : 1, "numActiveReceivers" : 1, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 373, "numRetainedCompletedBatches" : 373, "numActiveBatches" : 0, "numProcessedRecords" : 1, "numReceivedRecords" : 1, "avgInputRate" : 0.002680965147453083, "avgSchedulingDelay" : 14, "avgProcessingTime" : 47, "avgTotalDelay" : 62} 结果分析: 通过这个命令,可以查询当前Streaming应用的平均输入频率(events/sec),平均调度时延(ms),平均执行时长(ms),总时延平均值(ms)。
  • REST接口 通过以下命令可跳过REST接口过滤器获取相应的应用信息。 安全模式下,JobHistory仅支持https协议,故在如下命令的url中请使用https协议。 安全模式下,需要设置spark.ui.customErrorPage=false并重启spark2x服务 (JobHistory2x、JD BCS erver2x和SparkResource2x三个实例对应的参数都需要修改)。 升级更新节点环境上的curl版本。具体curl版本升级方法如下: 下载curl安装包(http://curl.haxx.se/download/)。 使用如下命令进行安装包解压: tar -xzvf curl-x.x.x.tar.gz 使用如下命令覆盖安装: cd curl-x.x.x ./configure make make install 使用如下命令更新curl的动态链接库: ldconfig 安装成功后,重新登录节点环境,使用如下命令查看curl版本是否更新成功: curl --version 获取JobHistory中所有应用信息: 命令: curl -k -i --negotiate -u: "https://192.168.227.16:18080/api/v1/applications" 其中192.168.227.16为JobHistory节点的业务IP,18080为JobHistory的端口号。 结果: [ { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433 CS T", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ]}, { "id" : "application_1517290848707_0145", "name" : "Spark shell", "attempts" : [ { "startTime" : "2018-01-31T15:20:31.286CST", "endTime" : "1970-01-01T07:59:59.999CST", "lastUpdated" : "2018-01-31T15:20:47.086CST", "duration" : 0, "sparkUser" : "admintest", "completed" : false, "startTimeEpoch" : 1517383231286, "endTimeEpoch" : -1, "lastUpdatedEpoch" : 1517383247086 } ]}] 结果分析: 通过这个命令,可以查询当前集群中所有的Spark应用(包括正在运行的应用和已经完成的应用),每个应用的信息如下表1。 表1 应用常用信息 参数 描述 id 应用的ID name 应用的Name attempts 应用的尝试,包含了开始时间、结束时间、执行用户、是否完成等信息 获取JobHistory中某个应用的信息: 命令: curl -k -i --negotiate -u: "https://192.168.227.16:18080/api/v1/applications/application_1517290848707_0008" 其中192.168.227.16为JobHistory节点的业务IP,18080为JobHistory的端口号,application_1517290848707_0008为应用的id。 结果: { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ]} 结果分析: 通过这个命令,可以查询某个Spark应用的信息,显示的信息如表1所示。 获取正在执行的某个应用的Executor信息: 针对alive executor命令: curl -k -i --negotiate -u: "https://192.168.169.84:8090/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/executors" 针对全部executor(alive&dead)命令: curl -k -i --negotiate -u: "https://192.168.169.84:8090/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/allexecutors" 其中192.168.169.84为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1478570725074_0046为在YARN中的应用ID。 结果: [{ "id" : "driver", "hostPort" : "192.168.169.84:23886", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 278019440, "executorLogs" : { }}, { "id" : "1", "hostPort" : "192.168.169.84:23902", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 1, "maxTasks" : 1, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 139, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 555755765, "executorLogs" : { "stdout" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stdout?start=-4096", "stderr" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stderr?start=-4096" }} ] 结果分析: 通过这个命令,可以查询当前应用的所有Executor信息(包括Driver),每个Executor的信息包含如下表2所示的常用信息。 表2 Executor常用信息 参数 描述 id Executor的ID hostPort Executor所在节点的ip:端口 executorLogs Executor的日志查看路径
  • 功能简介 Spark的REST API以JSON格式展现Web UI的一些指标,提供用户一种更简单的方法去创建新的展示和监控的工具,并且支持查询正在运行的app和已经结束的app的相关信息。开源的Spark REST接口支持对Jobs、Stages、Storage、Environment和Executors的信息进行查询, FusionInsight 版本中添加了查询SQL、JDBC Server和Streaming的信息的REST接口。开源REST接口完整和详细的描述请参考官网上的文档以了解其使用方法:https://archive.apache.org/dist/spark/docs/3.3.1/monitoring.html#rest-api。
  • 提供Join能力 表12 提供Join能力的相关接口 API 说明 def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] 通过给定的key在一个窗口范围内join两条数据流。 join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。 def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] 通过给定的key在一个窗口范围内co-group两条数据流。 coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。
  • 提供分流能力 表8 提供分流能力的相关接口 API 说明 def split(selector: OutputSelector[T]): SplitStream[T] 传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 def select(outputNames: String*): DataStream[T] 从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 提供设置eventtime属性的能力 表6 提供设置eventtime属性的能力的相关接口 API 说明 def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 为了能让event time窗口可以正常触发窗口计算操作,需要从记录中提取时间戳。 def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据 数据包含两个属性:分别是Int和String类型 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
  • 参数解释 MapReduce Action节点中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 name map-reduce action的名称 resourceManager MapReduce ResourceManager地址 name-node HDFS NameNode地址 queueName 任务处理时使用的MapReduce队列名 mapred.mapper.class Mapper类名 mapred.reducer.class Reducer类名 mapred.input.dir MapReduce处理数据的输入目录 mapred.output.dir MapReduce处理后结果数据输出目录 mapred.map.tasks MapReduce map任务个数 “${变量名}”表示:该值来自job.properties所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见配置Oozie作业运行参数)
  • 操作场景 本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。 本章节只适用Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 应用开发操作步骤 确认华为MRS产品Storm组件已经安装,且正常运行。 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。 工程导入后,修改样例工程“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。 #配置JDBC服务端IP地址JDBC_SERVER_NAME=#配置JDBC服务端端口JDBC_PORT_NUM=#配置JDBC登录用户名JDBC_USER_NAME=#配置JDBC登录用户密码#密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全JDBC_PASSWORD=#配置database表名JDBC_BASE_TBL= 在Linux环境下安装Storm客户端。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
  • 数据库配置—Derby数据库配置过程 首先应下载一个数据库,可根据具体场景选择最适合的数据库。 该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。 Derby数据库的获取。在官网下载最新版的Derby数据库,将下载下来的数据库将传入Linux客户端(如"/opt"),并解压。 在Derby的安装目录下,进入bin目录,输入如下命令: export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:. export DERBY_HOME=/opt/db-derby-10.12.1.1-bin . setNetworkServerCP ./startNetworkServer -h 主机名 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定) CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT ); CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT ); INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
  • Spark2x样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Spark2x相关样例工程: 表1 Spark2x相关样例工程 样例工程位置 描述 sparksecurity-examples/SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的Java示例程序。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 sparksecurity-examples/SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 sparksecurity-examples/SparkHbasetoHbasePythonExample sparksecurity-examples/SparkHbasetoHbaseScalaExample sparksecurity-examples/SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 sparksecurity-examples/SparkHivetoHbasePythonExample sparksecurity-examples/SparkHivetoHbaseScalaExample sparksecurity-examples/SparkJavaExample Spark Core任务的Java/Python/Scala/R示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkRExample示例不支持未开启Kerberos认证的集群。 sparksecurity-examples/SparkPythonExample sparksecurity-examples/SparkRExample sparksecurity-examples/SparkScalaExample sparksecurity-examples/SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 sparksecurity-examples/SparkLauncherScalaExample sparksecurity-examples/SparkOnClickHouseJavaExample Spark通过ClickHouse JDBC的原生接口,以及Spark JDBC驱动,实现对ClickHouse数据库和表的创建、查询、插入等操作样例代码。 sparksecurity-examples/SparkOnClickHousePythonExample sparksecurity-examples/SparkOnClickHouseScalaExample sparksecurity-examples/SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 sparksecurity-examples/SparkOnHbasePythonExample sparksecurity-examples/SparkOnHbaseScalaExample sparksecurity-examples/SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 sparksecurity-examples/SparkOnHudiPythonExample sparksecurity-examples/SparkOnHudiScalaExample sparksecurity-examples/SparkOnMultiHbaseScalaExample Spark同时访问两个集群中的HBase的Scala示例程序。 sparksecurity-examples/SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 sparksecurity-examples/SparkSQLPythonExample sparksecurity-examples/SparkSQLScalaExample sparksecurity-examples/SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 sparksecurity-examples/SparkStreamingKafka010PythonExample sparksecurity-examples/SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 sparksecurity-examples/SparkStreamingtoHbasePythonExample010 sparksecurity-examples/SparkStreamingtoHbaseScalaExample010 sparksecurity-examples/SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 sparksecurity-examples/SparkStructuredStreamingPythonExample sparksecurity-examples/SparkStructuredStreamingScalaExample sparksecurity-examples/SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JDBCServer的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 sparksecurity-examples/SparkThriftServerScalaExample sparksecurity-examples/StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 sparksecurity-examples/StructuredStreamingStateScalaExample 在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 父主题: Spark2x开发指南(安全模式)
  • 问题现象 Spark能对接很多的第三方工具,因此在使用过程中经常会依赖一堆的三方包。而有一些包MRS已经自带,这样就有可能造成代码使用的jar包版本和集群自带的jar包版本不一致,在使用过程中就有可能出现jar包冲突的情况。 常见的jar包冲突报错有: 1、报错类找不到:java.lang.NoClassDefFoundError 2、报错方法找不到:java.lang.NoSuchMethodError
  • 原因分析 以自定义UDF为例: 报错信息显示是找不到类。 首先需要确认的是这个类属于的jar包是否在jvm的classpath里面, spark自带的jar都在“spark客户端目录/jars/”。 确认是否存在多个jar包拥有这个类。 如果是其他依赖包,可能是没有使用--jars添加到任务里面。 如果是已经添加到任务里面,但是依旧没有取到,可能是因为配置文件的driver或者executor的classpath配置不正确,可以查看日志确认是否加载到环境。 另外可能报错是类初始化失败导致后面使用这个类的时候出现上述报错,需要确认是否在之前就有初始化失败或者其他报错的情况发生。 报错信息显示找不到方法。 确认这个方法对应的类所在的jar包是否加载到jvm的classpath里面,spark自带的类都在“spark客户端目录/jars/”。 确认是否有多个jar包包含这个类(尤其注意相同工具的不同版本)。 如果报错是Hadoop相关的包,有可能是因为使用的Hadoop版本不一致导致部分方法已经更改。 如果报错的是三方包里面的类,可能是因为Spark已经自带了相关的jar包,但是和代码中使用的版本不一致。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接收到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample SparkOnHbaseJavaExample.jar /tmp/hfile bulkload-table-test python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar /tmp/hfile bulkload-table-test python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkLoadPythonExample文件: # -*- coding:utf-8 -*-"""【说明】(1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现(2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseBulkLoadExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.HBaseBulkLoadPythonExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.HBaseBulkLoadPythonExample().hbaseBulkLoad(spark._jsc, sys.argv[1], sys.argv[2])# 停止SparkSessionspark.stop()
  • 数据规划 StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中编包并运行Spark程序章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 运行Python样例代码无需通过Maven打包,只需要上传user.keytab、krb5.conf 文件到客户端所在服务器上。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全