如何开发网关 网关是一个特殊的设备,除具备一般设备功能之外,还具有子设备管理、子设备消息转发的功能。SDK提供了AbstractGateway抽象类来简化网关的实现。该类提供了子设备管理功能,需要从平台获取子设备信息并保存(需要子类提供子设备持久化接口)、子设备下行消息转发功能(需要子类实现转发处理接口)、以及上报子设备列表、上报子设备属性、上报子设备状态、上报子设备消息等接口。 使用AbstractGateway类 继承该类,在构造函数里提供子设备信息持久化接口,实现其下行消息转发的抽象接口: 1
2
3
4
5
6
7 public abstract void onSubdevCommand(String requestId, Command command);
public abstract void onSubdevPropertiesSet(String requestId, PropsSet propsSet);
public abstract void onSubdevPropertiesGet(String requestId, PropsGet propsGet);
public abstract void onSubdevMessage(DeviceMessage message);
iot-gateway-demo代码介绍 工程iot-gateway-demo基于AbstractGateway实现了一个简单的网关,它提供tcp设备接入能力。关键类: SimpleGateway:继承自AbstractGateway,实现子设备管理和下行消息转发 StringTcpServer:基于netty实现一个TCP server,本例中子设备采用TCP协议,并且首条消息为鉴权消息 SubDevicesFilePersistence:子设备信息持久化,采用json文件来保存子设备信息,并在内存中做了缓存 Session:设备会话类,保存了设备id和TCP的channel的对应关系 SimpleGateway类 添加或删除子设备处理 添加子设备:AbstractGateway的onAddSubDevices接口已经完成了子设备信息的保存。我们不需要再增加额外处理,因此SimpleGateway不需要重写onAddSubDevices接口 删除子设备:我们不仅需要修改持久化信息,还需要断开当前子设备的连接。所以我们重写了onDeleteSubDevices接口,增加了拆链处理,然后调用父类的onDeleteSubDevices。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public int onDeleteSubDevices(SubDevicesInfo subDevicesInfo) {
for (DeviceInfo subdevice : subDevicesInfo.getDevices()) {
Session session = nodeIdToSesseionMap.get(subdevice.getNodeId());
if (session != null) {
if (session.getChannel() != null) {
session.getChannel().close();
channelIdToSessionMap.remove(session.getChannel().id().asLongText());
nodeIdToSesseionMap.remove(session.getNodeId());
}
}
}
return super.onDeleteSubDevices(subDevicesInfo);
}
下行消息处理 网关收到平台下行消息时,需要转发给子设备。平台下行消息分为三种:设备消息、属性读写、命令。 设备消息:这里我们需要根据deviceId获取nodeId,从而获取session,从session里获取channel,就可以往channel发送消息。在转发消息时,可以根据需要进行一定的转换处理。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 @Override
public void onSubdevMessage(DeviceMessage message) {
//平台接口带的都是deviceId,deviceId是由nodeId和productId拼装生成的,即
//deviceId = productId_nodeId
String nodeId = IotUtil.getNodeIdFromDeviceId(message.getDeviceId());
if (nodeId == null) {
return;
}
//通过nodeId获取session,进一步获取channel
Session session = nodeIdToSesseionMap.get(nodeId);
if (session == null) {
log.error("subdev is not connected " + nodeId);
return;
}
if (session.getChannel() == null){
log.error("channel is null " + nodeId);
return;
}
//直接把消息转发给子设备
session.getChannel().writeAndFlush(message.getContent());
log.info("writeAndFlush " + message);
}
属性读写: 属性读写包括属性设置和属性查询。 属性设置: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 @Override
public void onSubdevPropertiesSet(String requestId, PropsSet propsSet) {
if (propsSet.getDeviceId() == null) {
return;
}
String nodeId = IotUtil.getNodeIdFromDeviceId(propsSet.getDeviceId());
if (nodeId == null) {
return;
}
Session session = nodeIdToSesseionMap.get(nodeId);
if (session == null) {
return;
}
//这里我们直接把对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换
session.getChannel().writeAndFlush(JsonUtil.convertObject2String(propsSet));
//为了简化处理,我们在这里直接回响应。更合理做法是在子设备处理完后再回响应
getClient().respondPropsSet(requestId, IotResult.SUC
CES S);
log.info("writeAndFlush " + propsSet);
}
属性查询: 1
2
3
4
5
6
7 @Override
public void onSubdevPropertiesGet(String requestId, PropsGet propsGet) {
//不建议平台直接读子设备的属性,这里直接返回失败
log.error("not support onSubdevPropertiesGet");
deviceClient.respondPropsSet(requestId, IotResult.FAIL);
}
命令:处理流程和消息类似,实际场景中可能需要不同的编解码转换。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 @Override
public void onSubdevCommand(String requestId, Command command) {
if (command.getDeviceId() == null) {
return;
}
String nodeId = IotUtil.getNodeIdFromDeviceId(command.getDeviceId());
if (nodeId == null) {
return;
}
Session session = nodeIdToSesseionMap.get(nodeId);
if (session == null) {
return;
}
//这里我们直接把command对象转成string发给子设备,实际场景中可能需要进行一定的编解码转换
session.getChannel().writeAndFlush(JsonUtil.convertObject2String(command));
//为了简化处理,我们在这里直接回命令响应。更合理做法是在子设备处理完后再回响应
getClient().respondCommand(requestId, new CommandRsp(0));
log.info("writeAndFlush " + command);
}
上行消息处理 上行处理在StringTcpServer的channelRead0接口里。如果会话不存在,需要先创建会话: 如果子设备信息不存在,这里会创建会话失败,直接拒绝连接 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 @Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel incoming = ctx.channel();
log.info("channelRead0" + incoming.remoteAddress() + " msg :" + s);
//如果是首条消息,创建session
//如果是首条消息,创建session
Session session = simpleGateway.getSessionByChannel(incoming.id().asLongText());
if (session == null) {
String nodeId = s;
session = simpleGateway.createSession(nodeId, incoming);
//创建会话失败,拒绝连接
if (session == null) {
log.info("close channel");
ctx.close();
}
}
如果会话存在,则进行消息转发: 1
2
3
4
5
6
7 else {
//如果需要上报属性则调用reportSubDeviceProperties
DeviceMessage deviceMessage = new DeviceMessage(s);
deviceMessage.setDeviceId(session.getDeviceId());
simpleGateway.reportSubDeviceMessage(deviceMessage, null);
}
到这里,网关的关键代码介绍完了,其他的部分看源代码。整个demo是开源的,用户可以根据需要进行扩展。比如修改持久化方式、转发中增加消息格式的转换、实现其他子设备接入协议。 iot-gateway-demo的使用 创建子设备的产品,步骤可参考创建产品。 在创建的产品中定义模型,添加服务,服务ID为parameter。并且新增alarm和temperature两个属性,如下图所示 图9 模型定义-子设备产品 修改StringTcpServer的main函数,替换构造参数,然后运行该类。 1
2
3 simpleGateway = new SimpleGateway(new SubDevicesFilePersistence(),
"ssl://iot-acc.cn-north-4.myhuaweicloud.com:8883",
"5e06bfee334dd4f33759f5b3_demo", "mysecret");
在平台上看到该网关在线后,添加子设备。 图10 设备-添加子设备 表1 子设备参数 参数名称 参数描述 所属产品 子设备所属的产品,选择步骤1创建的产品。 设备名称 即device_name,可自定义,如subdev_name 设备标识码 即node_id,填写subdev。 设备ID 即device_id,可不填写,自动生成。 此时网关上日志打印: 2024-04-16 21:00:01 INFO SubDevicesFilePersistence:112 - add subdev, the nodeId is subdev 运行TcpDevice类,建立连接后,输入步骤3中注册的子设备的nodeId,如subdev。 图11 子设备连接 此时网关设备日志打印: 2024-04-16 21:00:54 INFO StringTcpServer:196 - initChannel: /127.0.0.1:21889
2024-04-16 21:01:00 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is subdev
2024-04-16 21:01:00 INFO SimpleGateway:100 - create new session ok, the session is Session{nodeId='subdev', channel=[id: 0xf9b89f78, L:/127.0.0.1:8080 - R:/127.0.0.1:21889], deviceId='subdev_deviceId'} 在平台上看到子设备上线。 图12 设备列表-设备在线 子设备上报消息 图13 子设备上报消息 查看日志看到上报成功 2024-04-16 21:02:36 INFO StringTcpServer:137 - channelRead0 is /127.0.0.1:21889, the msg is hello
2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/messages/up, msg = {"name":null,"id":null,"content":"hello","object_device_id":"subdev_deviceId"]
2024-04-16 21:02:36 INFO MqttConnection:299 - publish message topic is $oc/devices/5e06bfee334dd4f33759f5b3_demo/sys/gateway/sub_devices/properties/report, msg = {"devices":[{"services":[{"properties":{"temprature":2,"alarm":1},"service_id":"parameter","event_time":null}],"device_id":"subdev_deviceId"}]] 查看消息跟踪 在平台上找到网关,选择 设备详情-消息跟踪,打开消息跟踪。继续让子设备发送数据,等待片刻后看到消息跟踪: 图14 消息跟踪-直连设备