场景与准备

准备工作

  • 1、具备华为云账号。
  • 2、购买MapReduce服务、弹性云服务器、虚拟私有云和对象存储服务。

教程任务

  • 1、使用Kafka实时收集数据,并利用Storm分析套牌车,进行套牌车实时告警。实时收集和分析道路检查点数据。如果在一定的时间和范围内发现两张相同的牌照,车牌号码,位置和时间会立即显示在地图上,以便及时通知交警进行调查。
  • 2、使用Kafka实时收集数据,并利用Storm计算卡口流量,进行卡口实时流量监控。实时收集和分析道路检查点数据。实时统计每个道路检查点车辆信息,并实时计算哪些道路检查点车辆拥挤最多。
  • 3、将历史数据存储在OBS中,利用Spark/MR进行分析统计,实现卡口流量分析。根据海量数据每小时统计每个道路检查点的交通流量,发现当天最拥挤的道路检查点。
  • 4、将历史数据存储在OBS中,利用Spark/MR进行分析统计,车辆出行路径分析。道路检查站的相关性分析,勾勒出城市车辆路线景观。

使用产品

  • MapReduce服务 MRS

    MapReduce服务(MapReduce Service)提供租户完全可控的企业级大数据集群云服务,轻松运行Hadoop、Spark、HBase、Kafka、Storm等大数据组件。

    了解详情
  • 虚拟私有云

    虚拟私有云(Virtual Private Cloud)是用户在华为云上申请的隔离的、私密的虚拟网络环境。用户可配置VPC内的IP地址段、子网、安全组等子服务,可申请弹性带宽和弹性IP搭建业务系统。

    了解详情
  • 弹性云服务器

    弹性云服务器(Elastic Cloud Server)是一种可随时自助获取、可弹性伸缩的云服务器,帮助用户打造可靠、安全、灵活、高效的应用环境,确保服务持久稳定运行,提升运维效率。

    了解详情
  • 对象存储服务

    稳定、安全、高效、易用的云存储服务,具备标准Restful API接口,可存储任意数量和形式的非结构化数据,提供99.999999999%的数据可靠性。

    了解详情

实现方法

  • Step1   准备工作
  • Step2   申请MRS流式集群
  • Step3   使用Kafka实时收集数据
  • Step4   启动Storm分析套牌车程序
  • Step5   启动Storm计算卡口流量程序
  • Step6   申请MRS分析集群
  • Step7   分析卡口流量和车辆出行路径
  • Step8   通过WebUI显示数据和报告
准备工作

1 申请虚拟私有云

操作场景

VPC即虚拟私有云,是通过逻辑方式进行网络隔离,提供安全、隔离的网络环境。

操作步骤

步骤 1 登录华为云控制台,选择左上角区域为“华北-北京一”。

图1 区域

步骤 2 选择“网络 > 虚拟私有云”,在“网络控制台”页面右上角单击“申请虚拟私有云”。

图2 申请虚拟私有云

步骤 3 根据界面提示配置虚拟私有云参数后,单击“立即创建”。

  • •  “名称”配置为“vpc-mrs-demo”,也可以按规范命名。
  • •  “网段”保持默认值。
  • •  “标签”保持默认值,不填写。
  • •  “可用分区”保持默认值“可用区2”。
  • •  “子网名称”配置为“subnet-mrs-demo”,也可以按规范命名。
  • •  “子网网段”保持默认值。
  • •  “高级配置”中“网关”保持默认值。
  • •  “高级配置”中“DNS服务器地址1”和“DNS服务器地址2”保持默认值。
  • •  “高级配置”“子网标签”保持默认值。

参数配置完成后,如图3所示。

图3 配置VPC参数

申请MRS流式集群

2 申请MRS流式集群

步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,进入MapReduce服务页面。

图4 控制台

步骤 2 在MapReduce服务管理控制台页面右上角,单击“购买集群”,进入“购买集群页面”。

图5 创建密钥对

步骤 3 配置集群基本信息。

配置信息按照如下内容填写:

  • •  “计费模式”选择按需付费。
  • •  “当前区域”默认选择“北京一”。
  • •  “可用分区”选择“可用区2”。
  • •  “集群名称”填写“mrs_demo”或按命名规范命名。
  • •  “集群版本”选择最新的“MRS 1.7.1”。
  • •  “Kerberos认证”保持默认值不开启。
  • •  “集群类型”选择“流式集群”。
  • •  “组件选择”勾选所有流式集群组件。
  • •  “虚拟私有云”和“子网”选择准备工作中创建的虚拟私有云和子网。
  • •  “安全组”默认选择“自动创建”。
  • •  “集群高可用”默认保持开启状态。
  • •  “集群节点”中Master和Core的实例规格保持默认值。实例数量、数据盘类型及大小保持默认值。不添加Task节点。
  • •  “登录方式”选择“密码”,并为root用户输入密码及确认密码。
  • •  “日志记录”保持默认开启,并勾选OBS桶的确认信息。
  • •  “高级配置”选择暂不配置。

图6 基本信息

图7 网络及节点配置

图8 登录方式及日志记录

步骤 4 配置完成后,在右下角单击“立即申请”。

步骤 5 确认集群详情无误后,单击右下角“提交申请”,成功提交集群创建任务。

步骤 6 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。

集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。

2.1 添加安全组规则并绑定弹性IP

操作场景

安全组与虚拟私有云、子网配合使用,通过设置对虚拟私有云内部的弹性云服务器设置出入方向访问规则,进行网络流量的访问控制,增强公有云服务的网络安全。

操作步骤

步骤 1 登录MapReduce服务管理控制台。

步骤 2 在左侧导航树中选择“集群列表”。

步骤 3 在集群列表中,单击集群名称“mrs_demo”,在“节点信息”页签中找到类型为“Master2”的节点,并单击其名称,跳转至云服务器控制台上的该弹性云服务器详情页面。

图9 节点信息

步骤 4 在弹性云服务器详情页面,选择“安全组”页签,单击安全组名称,在展开的安全组中单击该安全组ID。

图10 安全组

步骤 5 在弹出的“安全组”页面中,单击“添加规则”,规则添加完成后,单击“确定”。

为了快速体验MRS服务,减少这些规则的约束,建议对出方向和入方向的协议都为“any”,源地址设为所有。如图11所示。

图11 添加规则

步骤 6 在弹性云服务器详情页面,选择“弹性IP”页签,并单击,在页面中单击“绑定弹性IP”。

图12 弹性IP

步骤 7 在“绑定弹性IP”界面中选择一个可用的弹性IP,单击“确定”,绑定弹性IP。如果没有可用的弹性IP,可单击“查看弹性IP”,申请一个新的弹性IP。

图13 绑定弹性IP

使用Kafka实时收集数据

3 使用Kafka实时收集数据

步骤 1 使用MobaXterm工具登录master2节点,登录用户名为“root”,按下图配置后,单击“OK”后,按提示信息输入密码。

MobaXterm工具下载地址:https://mobaxterm.mobatek.net/

图14 MobaXterm详细配置

步骤 2 登录成功后,执行以下命令切换到root用户。

sudo -s

步骤 3 执行以下命令配置环境变量。

source /opt/client/bigdata_env

步骤 4 登录MRS服务管理控制台,在“集群列表 > 现有集群”列表中,单击名称“mrs_demo”,进入集群基本信息页面,在“集群管理页面”右侧,单击“点击查看”,跳转至MRS Manager页面。

图15 点击查看

步骤 5 在MRS Manager页面,选择“服务管理 > ZooKeeper > 实例”,查看ZooKeeper角色实例的IP地址。

图16 ZooKeeper角色实例的IP地址

记录ZooKeeper角色实例中任意一个的IP地址即可。如“192.168.0.111”。

步骤 6 创建Kafka的输入topic。

topic名称为“input_topic”,其中Zookeeper集群IP需要替换成实际的IP地址。

kafka-topics.sh --create --zookeeper <ZooKeeper角色实例所在节点IP:2181/kafka> --partitions 2 --replication-factor 2 --topic input_topic

步骤 7 使用WinSCP工具将压缩包“upload_kafka_tool.tar.gz”上传到master2节点上,执行以下命令解压。

tar -zxvf upload_kafka_tool.tar.gz

步骤 8 修改upload_kafka_tool目录下的dis.properties,将其中的“broker_list” 修改为集群中Kafka broker的IP地址,并检查“topic_name”为以上步骤6中创建的topic名称。

#Kakfa配置信息 (Topic 创建命令bin/kafka-topics.sh --create --partitions 1 --replication 2 --zookeeper 192.168.0.17:2181 --topic topic0929)

broker_list=192.168.0.220:9092,192.168.0.246:9092,192.168.0.242:9092

topic_name=input_topic

#Sink类型 (kafka或者dis)

sink_type=kafka

步骤 9 执行以下命令启动造数据程序。

sh upload_kafka_tool/bin/linux/startProducer.sh

步骤 10 依次执行以下命令从input_topic读取数据,确保数据以及写入到Kafka中。

source /opt/client/bigdata_env

kafka-console-consumer.sh --zookeeper <ZooKeeper角色实例所在节点IP:2181/kafka> --topic input_topic --from-beginning

启动Storm分析套牌车程序

4 启动Storm分析套牌车程序

步骤 1 使用MobaXterm工具登录master2节点,登录用户名为"root",按下图配置后,单击“OK”后,按提示信息输入密码。

MobaXterm工具下载地址:https://mobaxterm.mobatek.net/

图17 MobaXterm详细配置

步骤 2 使用WinSCP工具将“car_analysis.jar”的jar包上传到master2节点上。

步骤 3 获取Kafka角色实例所在节点IP。

在MRS Manager页面,选择“服务管理 > Kafka > 实例”,查看Kafka角色实例的IP地址。记录Kafka角色实例中任意一个的IP地址即可。如“192.168.0.111”

步骤 4 执行以下命令启动套牌车分析程序。

source /opt/client/bigdata_env

storm jar car_analysis.jar com.huawei.storm.hcc.SearchXCarTopology xcar input_topic fake_car_output <Kafka角色实例所在节点IP:9092> 200 10 30 30

相关参数说明如下

  • •  0 topology_name(拓扑名称)
  • •  1 stream_name(输入流名称,即kafka输入topic名称)
  • •  2 stream_name(输出流名称,即输出到kafka的topic名称)
  • •  3 broker list,即kafka集群broker列表
  • •  4 滑动窗口大小
  • •  5 更新频率(单位:秒); window length
  • •  6 套牌车规则中的时间(单位:分钟)
  • •  7 套牌车规则中的距离(单位:公里)

以上命令会提交一个storm的程序,该程序读取input_topic的数据,计算其中是否有套牌车,如果发现套牌车,则将结果发送到名称为fake_car_output的Kafka topic中。

步骤 5 程序运行后可以执行以下命令查看计算出来的套牌车数据。

source /opt/client/bigdata_env

kafka-console-consumer.sh --zookeeper <ZooKeeper角色实例所在节点IP:2181/kafka> --topic fake_car_output --from-beginning

启动Storm计算卡口流量程序

5 启动Storm计算卡口流量程序

步骤 1 使用MobaXterm工具登录master2节点,登录用户名为"root",按下图配置后,单击“OK”后,按提示信息输入密码。

MobaXterm工具下载地址:https://mobaxterm.mobatek.net/

图18 MobaXterm详细配置

步骤 2 使用WinSCP工具将“car_analysis.jar”的jar包上传到master2节点上。

步骤 3 执行以下命令启动套牌车分析程序。

source /opt/client/bigdata_env

storm jar car_analysis.jar com.huawei.storm.hcc.SearchCarSumTopology xcar-sum input_topic sum_car_output <Kafka集群IP:9092> 60 5

参数说明如下:

  • 0 topology_name(拓扑名称)
  • 1 stream_name(输入流名称,即kafka输入topic名称)
  • 2 stream_name(输出流名称,即输出到kafka的topic名称)
  • 3 broker list,即kafka集群broker列表
  • 4 窗口大小
  • 5 更新频率

以上命令会提交一个storm的程序,该程序读取input_topic的数据,计算其中每个卡口的车流量,然后将结果写入到名称为sum_car_output的Kafka topic中。

步骤 4 程序运行后可以执行以下命令查看计算出来的卡口流量。

source /opt/client/bigdata_env

kafka-console-consumer.sh --zookeeper <ZooKeeper集群IP:2181/kafka> --topic sum_car_output --from-beginning

申请MRS分析集群

6 申请MRS分析集群

步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,进入MapReduce服务页面。

图19 控制台

步骤 2 在MapReduce服务管理控制台页面右上角,单击“购买集群”,进入“购买集群页面”。

图20 购买集群

步骤 3 配置集群基本信息。

配置信息按照如下内容填写:

  • •  “计费模式”选择按需付费。
  • •  “当前区域”默认选择“北京一”。
  • •  “可用分区”选择“可用区2”。
  • •  “集群名称”填写“mrs_analysis”或按命名规范命名。
  • •  “集群版本”选择最新的“MRS 1.7.1”。
  • •  “Kerberos认证”保持默认值不开启。
  • •  “集群类型”选择“分析集群”。
  • •  “组件选择”勾选所有分析集群组件。
  • •  “虚拟私有云”和“子网”选择准备工作中创建的虚拟私有云和子网。
  • •  “安全组”默认选择“自动创建”。
  • •  “集群高可用”默认保持开启状态。
  • •  “集群节点”中Master和Core的实例规格保持默认值。实例数量、数据盘类型及大小保持默认值。不添加Task节点。
  • •  “登录方式”选择“密码”,并为root用户输入密码及确认密码。
  • •  “日志记录”保持默认开启,并勾选OBS桶的确认信息。
  • •  “高级配置”选择暂不配置。

图21 配置集群1

图22 配置集群2

图23 配置集群3

步骤 4 配置完成后,在右下角单击“立即申请”。

步骤 5 确认集群详情无误后,单击右下角“提交申请”,成功提交集群创建任务。

步骤 6 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。

集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。

分析卡口流量和车辆出行路径

7 分析卡口流量和车辆出行路径

步骤 1 准备数据文件和程序。

步骤 2 上传数据和程序至OBS。

1. 登录对象存储服务控制台。

2. 单击“创建桶”,创建一个名称为“mrscardata”的桶。mrscardata仅为示例,桶名称必须全局唯一,否则会创建桶失败。

创建OBS桶所在的区域要和MRS集群在同一区域。本教程区域统一为“华北-北京一”。

图24 创建桶

3. 在桶列表里单击mrscardata桶名称,进入“概览”页面。在其左侧导航栏,单击“对象”,进入上传文件页面。

4. 单击“新建文件夹”,分别创建program、pro、output、log、input和frontendcollector文件夹,创建完成后如图25所示。

  • − program:存放程序mapreduce-example-normal.jar
  • − pro:存放文件co.csv
  • − input:存放dis.hcc-demo.0UWTDLOkFT2t5LzvJN9
  • − output:存放作业输出的文件
  • − log:存放作业输出的日志文件
  • − frontendcollector:存放车辆行驶路径的输出文件

图25 文件夹列表

5. 进入program文件夹,单击“上传文件”,在“上传文件”对话框单击 ,从本地选择步骤1中下载的程序,然后单击“确定”,等待对象上传成功。

图26 文件夹列表

− 进入pro文件夹,上传co.csv文件。

− 进入input文件夹,上传dis.hcc-demo.0UWTDLOkFT2t5LzvJN9文件。

步骤 3 登录MRS控制台,在左侧导航栏选择“集群列表 > 现有集群”选择,单击名称为“mrs_analysis”的集群。

步骤 4 选择“作业管理”页签,然后在“作业”页签中单击“添加”,进入“添加作业”页面。

步骤 5 在“添加作业”页面配置作业名称为“Preprocess”的参数,参数配置完成后,单击“确定”,等待作业运行成功。

  • •  作业类型选择“MapReduce”。
  • •  作业名称为“Preprocess”。
  • •  执行程序路径配置为OBS上存放程序mapreduce-example-normal.jar的地址。如:s3a://mrscardata/program/mapreduce-example-normal.jar。
  • •  执行程序参数中填写的参数为:com.huawei.mrs.preprocess.Preprocess -Dfs.s3a.access.key=xxxxxx -Dfs.s3a.secret.key=xxxxxx s3a://mrscardata/input/ /tmp/output/ 2018-06-10 2019-12-30 0 s3a://mrscardata/pro/co.csv
  • AK和SK获取方法请参见https://support.huaweicloud.com/api-mrs/mrs_02_0008.html
  • 填写参数中的时间段需要包括当天的时间点,例如您是在2018-06-15日创建作业,这个时间段需要为2018-06-15日前面和后面。
  • 参数“s3a://mrscardata/input/”中的OBS桶名需要替换为实际环境创建的桶名。
  • •  数据输入路径、输出路径和日志路径无需填写。

图27 Preprocess作业参数配置

步骤 6 “Preprocess”作业运行完成后,可以在作业列表中查看到状态为“已完成”,执行结果为“成功”。如图28所示。

图28 作业执行成功

步骤 7 参考步骤7,继续添加作业名称为“Traffic_statics”的作业,用来分析卡口流量。

  • •  作业类型选择“MapReduce”。
  • •  作业名称为“Traffic_statics”。
  • •  执行程序路径配置为OBS上存放程序mapreduce-example-normal.jar的地址。如:s3a://mrscardata/program/mapreduce-example-normal.jar。
  • •  执行程序参数中填写的参数为:com.huawei.mrs.portflowstatistics.FlowCollector -Dfs.s3a.access.key=xxxxxx -Dfs.s3a.secret.key=xxxxxx /tmp/output/ s3a://mrscardata/output/
  • 参数“s3a://mrscardata/input/”的OBS桶名需要替换为实际环境创建的桶名。
  • •  数据输入路径、输出路径和日志路径无需填写。

图29 Traffic_statics作业参数配置

步骤 8 “Traffic_statics”作业运行完成后,可以在作业列表中查看到状态为“已完成”,执行结果为“成功”。

图30 作业执行成功

步骤 9 参考步骤7,继续添加作业名称为“Movement_path”的作业,用来分析车辆出行路径。

  • •  作业类型选择“MapReduce”。
  • •  作业名称为“Movement_path”。
  • •  执行程序路径配置为OBS上存放程序mapreduce-example-normal.jar的地址。如:s3a://mrscardata/program/mapreduce-example-normal.jar。
  • •  执行程序参数中填写的参数为:com.huawei.mrs.pathflowstatistics.FrontEndCollector -Dfs.s3a.access.key=xxxxxx -Dfs.s3a.secret.key=xxxxxx /tmp/output/ s3a://mrscardata/frontendcollector/
  • 参数“s3a://mrscardata/frontendcollector/”的OBS桶名需要替换为实际环境创建的桶名。
  • •  数据输入路径、输出路径和日志路径无需填写。

图31 Movement_path作业参数配置

步骤 10 "Movement_path"作业运行完成后,可以在作业列表中查看到状态为“已完成”,执行结果为“成功”。

图32 作业执行成功

通过WebUI显示数据和报告

8 通过WebUI显示数据和报告

步骤 1 购买Windows弹性云服务器,并绑定弹性IP地址。

Windows弹性云服务器区域为“北京一”,可用区为“可用区2”,虚拟私有云请选择与MRS流式集群为同一个。具体购买操作请参见购买并登录Windows弹性云服务器

步骤 2 远程登录Windows弹性云服务器,在Windows中下载并安装JDK8和tomcat。

JDK安装完成后需要配置环境变量。

步骤 3 下载并拷贝hccDemo.war包至tomcat安装路径的/webapps目录下。

hccDemo.war包获取地址:https://mapreduceservice.obs-website.cn-north-1.myhwclouds.com/demo/hccDemo.war

步骤 4 启动tomcat使其自动解压hccDemo.war至webapps目录下,并自动生成hccDemo目录。

步骤 5 进入webapps/hccDemo/WEB-INF/classes目录,修改app.conf文件中的配置项为自己的环境配置实际值,配置项详细描述见app.conf文件中的注释。

#离线车辆分析MR作业输出的OBS桶对应的Endpoint,可以在OBS桶基本信息中查看。

fs.s3a.endpoint=xx.xx.xx.xx

fs.s3a.path.style.access=true

fs.s3a.connection.ssl.enabled=false

#AK和SK

ak=xxxxxx

sk=xxxxxxxxxxxxxxxxxxxxxxxxxxx

#离线车辆出行路径分析MR作业输出的OBS路径

car.movemement.path=/frontendcollector/carmovement

#离线车辆出行路径分析MR作业输出的OBS桶名

queue=cartest

#离线卡口流量分析MR作业输出的OBS路径

traffic.stat.path=/output/flowsum

#离线卡口流量分析MR作业输出的OBS路径

traffic.stat.time.interval.path=/output/flow

#Traffic stat configuration

#卡口流量分析:高流量的阈值

traffic.stat.high=52354

#卡口流量分析:中流量的阈值

traffic.stat.medium=45383

#set -1 to fetch all the records

traffic.total.record=1000

#Car movement configuration

#车辆出行路径分析:高的阈值

car.movement.high=24239

#车辆出行路径分析:低的阈值

car.movement.low=18979

#set -1 to fetch all the records

car.movement.total.record=1000

#Hight Traffic report configuration

#实时流量监控分析:高流量阈值

high.traffic.high=25

#实时流量监控分析:低流量阈值

high.traffic.medium=10

#General Configurations

#经纬度坐标和卡口名的映射文件

coordinate.file=coordinate-shanghai-with-name.csv

#是否使用自己制造的硬编码数据

use.dummy.data=false

//kafka or dis

# kafka 实时分析数据是通过dis还是kafka,当前为kafka

real.time.data.provider=kafka

#Kafka related changes

# Kafka的Broker List,以逗号分隔,注意开通对应端口的安全组规则

kafka.bootstrap.servers=ip1:9092,ip2:9092,ip3:9092

# 存放套牌车数据的topic

kafka.fakeCar.topic=fake_car_output

# 存放卡口流量数据的topic

kafka.highTraffic.topic=sum_car_output

kafka.fakeCar.partition=0

kafka.highTraffic.partition=0

kafka.highTraffic.max.record=1000

kafka.fakeCar.max.record=1000

kafka.poll.timeout=500

kafka.batchNumber=100

#property starting with kafka.property will be used while constructing KafkaConsumer

kafka.property.group.id=kafka

kafka.property.enable.auto.commit=true

kafka.property.auto.commit.interval.ms=1000

步骤 6 重启tomcat程序。

步骤 7 在浏览器里输入IP为弹性IP地址(EIP),端口为8080的地址:http://{EIP}:8080/hccDemo,进入智能交通大数据的主页,如下图所示。

您也可以进入已搭建好的智能交通大数据网址体验:http://mrs-demo.ei.huaweicloud.com/hccDemo/#/homePage

图33 智能交通大数据主页

常见问题

Q:1、MRS智慧交通实践的样例程序下载地址?

A:样例程序下载地址如下所示:

MR代码 立即下载 | MR Jar包 立即下载 | war包 立即下载 | Storm代码 立即下载 | Storm Jar包 立即下载 | Upload Kafka Tool 立即下载

Q:2、MRS二次开发指南查看地址?

A:MRS二次开发指南查看地址 立即前往

Q:3、本实践资料查看地址?

A:本实践资料查看地址 立即前往