云服务器内容精选

  • 如何开发网关 网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。 使用AbstractGateway类 继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口: 1 2 3 4 5 6 7 public abstract void onSubdevCommand(String requestId, Command command); public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet); public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet); public abstract void onSubdevMessage(DeviceMessage message); iot-gateway-demo代码介绍 工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类: SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发 StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息 SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存 Session:设备会话类,保存了设备id和TCP的channel的对应关系 SimpleGateway类 添加或删除子设备处理 添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口 删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) { for (DeviceInfo subdevice : subDevicesInfo.getDevices()) { Session session = nodeIdToSesseionMap.get(subdevice.getNodeId()); if (session != null) { if (session.getChannel() != null) { session.getChannel().close(); channelIdToSessionMap.remove(session.getChannel().id().asLongText()); nodeIdToSesseionMap.remove(session.getNodeId()); } } } return super.onDeleteSubDevices(subDevicesInfo); } 下行消息处理 网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public void onSubdevMessage(DeviceMessage message) { //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即 //deviceId = productId_nodeId String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId()); if (nodeId == null) { return; } //通过nodeId获取session,进一步获取channel Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { log.error("subdev is not connected " + nodeId); return; } if (session.getChannel() == null){ log.error("channel is null " + nodeId); return; } //直接把消息转发给子设备 session.getChannel().writeAndFlush(message.getContent()); log.info("writeAndFlush " + message); } 属性读写: 属性读写包括属性设置和属性查询。 属性设置: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) { if (propsSet.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet)); //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应 getClient().respondPropsSet(requestId, IotResult.SUCCESS); log.info("writeAndFlush " + propsSet); } 属性查询: 1 2 3 4 5 6 7 @Override public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) { //不建议平台直接读子设备的属性,这里直接返回失败 log.error("not supporte onSubdevPropertiesGet"); deviceClient.respondPropsSet(requestId, IotResult.FAIL); } 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public void onSubdevCommand(String requestId, Command command) { if (command.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command)); //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应 getClient().respondCommand(requestId, new CommandRsp(0)); log.info("writeAndFlush " + command); } 上行消息处理 上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话: 如果子设备信息不存在,这里会创建会话失败,直接拒绝连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s); //如果是首条消息,创建session //如果是首条消息,创建session Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText()); if (session == null) { String nodeId = s; session = simpleGateway.createSession(nodeId, incoming); //创建会话失败,拒绝连接 if (session == null) { log.info("close channel"); ctx.close(); } } 如果会话存在,则进行消息转发: 1 2 3 4 5 6 7 else { //如果需要上报属性则调用reportSubDeviceProperties DeviceMessage deviceMessage = new DeviceMessage(s); deviceMessage.setDeviceId(session.getDeviceId()); simpleGateway.reportSubDeviceMessage(deviceMessage, null); } 到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。 iot-gateway-demo的使用 创建子设备的产品,步骤可参考创建产品。 在创建的产品中定义模型,添加服务,服务ID为parameter。并且新增alarm和temperature两个属性,如下图所示 图8 模型定义-子设备产品 修改StringTcpServer的main函数,替换构造参数,然后运行该类。 1 2 3 simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(), "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883", "5e06bfee334dd4f33759f5b3_demo", "mysecret"); 在平台上看到该网关在线后,添加子设备。 图9 设备-添加子设备 表1 子设备参数 参数描述 所属产品 子设备所属的产品,选择步骤1创建的产品。 设备名称 即device_name,可自定义,如subdev_name 设备标识码 即node_id,填写subdev。 设备ID 即devicee_id,可不填写,自动生成。 此时网关上日志打印: 2024-04-16 21:00:01 INFO SubDevicesFilePersistence:112 - add subdev, the nodeId is subdev 运行TcpDevice类,建立连接后,输入步骤3中注册的子设备的nodeId,如subdev。 图10 子设备连接 此时网关设备日志打印: 2024-04-16 21:00:54 INFO StringTcpServer:196 - initChannel: /127.0.0.1:21889 2024-04-16 21:01:00 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is subdev 2024-04-16 21:01:00 INFO SimpleGateway:100 - create new session ok, the session is Session{nodeId='subdev', channel=[id: 0xf9b89f78, L:/127.0.0.1:8080 - R:/127.0.0.1:21889], deviceId='subdev_deviceId'} 在平台上看到子设备上线。 图11 设备列表-设备在线 子设备上报消息 图12 子设备上报消息 查看日志看到上报成功 2024-04-16 21:02:36 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is hello 2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":"subdev_deviceId"] 2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/gateway/sub_devices/properties/report, msg = {"devices":[{"services":[{"properties":{"temprature":2,"alarm":1},"service_id":"parameter","event_time":null}],"device_id":"subdev_deviceId"}]] 查看消息跟踪 在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪: 图13 消息跟踪-直连设备
  • 创建产品 为了方便体验,我们提供了一个烟感的产品模型,烟感会上报烟雾值、温度、湿度、烟雾报警、还支持响铃报警命令。以烟感例,体验消息上报、属性上报等功能。 访问设备接入服务,单击“管理控制台”进入设备接入控制台,查看MQTTS设备接入域名,保存该地址。 单击左侧导航栏“产品”,单击页面左侧的“创建产品”。 根据页面提示填写参数,然后单击“确定”完成产品的创建。 基本信息 所属资源空间 平台自动将新创建的产品归属在默认资源空间下。如需归属在其他资源空间下,下拉选择所属的资源空间。如无对应的资源空间,请先创建资源空间。 产品名称 自定义。支持字母、数字、下划线(_)、连字符(-)的字符组合。 协议类型 选择“MQTT”。 数据格式 选择“JSON”。 设备类型选择 选择”自定义类型” 设备类型 填写”smokeDetector” 高级配置 产品ID 不填写 产品描述 请根据实际情况填写。
  • 设备初始化 创建设备时,需要写入在注册设备时获取的设备ID、密码,以及1中获取的设备对接信息,注意格式为ssl://域名信息:端口号 或 ssl://IP地址:端口号 1 2 3 //例如在iot-device-demo文件 MessageSample.java中修改以下参数 IoTDevice device = new IoTDevice("ssl://域名信息:8883", "5e06bfee334dd4f33759f5b3_demo", "mysecret", file); 所有涉及设备ID和密码的文件均需要修改成对应的信息。 建立连接。调用init接口,该接口是阻塞调用,如果建立连接成功会返回0。 1 2 3 if (device.init() != 0) { return; } 如果连接成功就会打印: 1 2023-07-17 17:22:59 INFO MqttConnection:105 - Mqtt client connected. address :ssl://域名信息:8883 创建设备并连接成功后,可以开始使用设备进行通信。调用IoT Device 的getClient接口获取设备客户端,客户端提供了消息、属性、命令等通讯接口。
  • 准备工作 开发环境要求:已经安装JDK(版本1.8以上)和maven 访问SDK下载页面,下载SDK,整个工程包含以下子工程: iot-device-sdk-java:sdk代码 iot-device-demo:普通直连设备的demo代码 iot-gateway-demo:网关设备的demo代码 iot-bridge-sdk:网桥的sdk代码 iot-bridge-demo:网桥的demo代码,用来演示如何将tcp设备桥接到平台 iot-bridge-sample-tcp-protocol:子设备使用tcp协议链接网桥的样例 iot-device-code-generator:设备代码生成器,可以根据产品模型自动生成设备代码 编译安装:进入到SDK根目录,执行mvn install
  • 如何开发网关 网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。 使用AbstractGateway类 继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口: 1 2 3 4 5 6 7 public abstract void onSubdevCommand(String requestId, Command command); public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet); public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet); public abstract void onSubdevMessage(DeviceMessage message); iot-gateway-demo代码介绍 工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类: SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发 StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息 SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存 Session:设备会话类,保存了设备id和TCP的channel的对应关系 SimpleGateway类 添加或删除子设备处理 添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口 删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) { for (DeviceInfo subdevice : subDevicesInfo.getDevices()) { Session session = nodeIdToSesseionMap.get(subdevice.getNodeId()); if (session != null) { if (session.getChannel() != null) { session.getChannel().close(); channelIdToSessionMap.remove(session.getChannel().id().asLongText()); nodeIdToSesseionMap.remove(session.getNodeId()); } } } return super.onDeleteSubDevices(subDevicesInfo); } 下行消息处理 网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public void onSubdevMessage(DeviceMessage message) { //平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即 //deviceId = productId_nodeId String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId()); if (nodeId == null) { return; } //通过nodeId获取session,进一步获取channel Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { log.error("subdev is not connected " + nodeId); return; } if (session.getChannel() == null){ log.error("channel is null " + nodeId); return; } //直接把消息转发给子设备 session.getChannel().writeAndFlush(message.getContent()); log.info("writeAndFlush " + message); } 属性读写: 属性读写包括属性设置和属性查询。 属性设置: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) { if (propsSet.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet)); //为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应 getClient().respondPropsSet(requestId, IotResult.SUCCESS); log.info("writeAndFlush " + propsSet); } 属性查询: 1 2 3 4 5 6 7 @Override public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) { //不建议平台直接读子设备的属性,这里直接返回失败 log.error("not supporte onSubdevPropertiesGet"); deviceClient.respondPropsSet(requestId, IotResult.FAIL); } 命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public void onSubdevCommand(String requestId, Command command) { if (command.getDeviceId() == null) { return; } String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId()); if (nodeId == null) { return; } Session session = nodeIdToSesseionMap.get(nodeId); if (session == null) { return; } //这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换 session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command)); //为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应 getClient().respondCommand(requestId, new CommandRsp(0)); log.info("writeAndFlush " + command); } 上行消息处理 上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话: 如果子设备信息不存在,这里会创建会话失败,直接拒绝连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s); //如果是首条消息,创建session //如果是首条消息,创建session Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText()); if (session == null) { String nodeId = s; session = simpleGateway.createSession(nodeId, incoming); //创建会话失败,拒绝连接 if (session == null) { log.info("close channel"); ctx.close(); } } 如果会话存在,则进行消息转发: 1 2 3 4 5 6 7 else { //如果需要上报属性则调用reportSubDeviceProperties DeviceMessage deviceMessage = new DeviceMessage(s); deviceMessage.setDeviceId(session.getDeviceId()); simpleGateway.reportSubDeviceMessage(deviceMessage, null); } 到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。 iot-gateway-demo的使用 在平台上为网关注册开户。 修改StringTcpServer的main函数,替换构造参数,然后运行该类。 1 2 3 simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(), "ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883", "5e06bfee334dd4f33759f5b3_demo", "mysecret"); 在平台上看到该网关在线后,添加子设备。 此时网关上日志打印: 2023-01-05 19:14:32 INFO SubDevicesFilePersistence:83 - add subdev: 456gefw3fh 运行TcpDevice类,建立连接后,输入子设备的nodeId。 此时网关设备日志打印: 2023-01-05 19:15:13 INFO StringTcpServer:118 - channelRead0/127.0.0.1:60535 msg :subdev2 2023-01-05 19:15:13 INFO SimpleGateway:68 - create new session okSession{nodeId='456gefw3fh', channel=[id: 0x42c9dc24, L:/127.0.0.1:8080 - R:/127.0.0.1:60535], deviceId='5e06bfee334dd4f337589c1de_subdev2'} 在平台上看到子设备上线。 子设备上报消息 查看日志看到上报成功 查看消息跟踪 在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪: