# 标准消息编解码协议 终端设备可以根据此标准消息解码协议进行对接物联平台 ## 标准协议Topic ``` * 标准编解码器 * * 包含:码库、编码、解码 * 码库:产品标识+设备标识+码库特定义 * 如:属性上报: 产品标识+设备标识+ /properties/report * * 注释: * A.默认为标准物模型数据格式,这直接 payload.toJavaObject(type) 转换 * B.非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 * * *
 *     下行Topic:
 *          读取设备属性: /{productId}/{deviceId}/properties/read
 *          修改设备属性: /{productId}/{deviceId}/properties/write
 *          调用设备功能: /{productId}/{deviceId}/function/invoke
 *
 *          //网关设备
 *          读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
 *          修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
 *          调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
 *
 *      上行Topic:
 *          读取属性回复: /{productId}/{deviceId}/properties/read/reply
 *          修改属性回复: /{productId}/{deviceId}/properties/write/reply
 *          调用设备功能: /{productId}/{deviceId}/function/invoke/reply
 *          上报设备事件: /{productId}/{deviceId}/event/{eventId}
 *          上报设备属性: /{productId}/{deviceId}/properties/report
 *          上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
 *
 *          //网关设备
 *          子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
 *          子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
 *          读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
 *          修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
 *          调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
 *          上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
 *          上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
 *
 * 
* 基于jet links 的消息编解码器 ``` ## 支持协议 * V1.0 支持MQTT ## 使用 使用命名`mvn package` 打包后, 通过jetlinks管理界面`设备管理-协议管理`中上传jar包. 类名填写: `cn.flyrise.iot.protocol.PaiProtocolSupportProvider` ## 扩展开发 pai-official-protocol工程也可以当作是标准协议骨架,可进行自定义编解码协议的开发 ### Topic主题不变-只是进行数据转换 * 非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 (TopicMessageCodec) ``` //事件上报 - 转行标准数据 event("/*/event/*", EventMessage.class){ @Override DeviceMessage doDecode(DeviceOperator device, String[] topic, JSONObject payload){ EventMessage eventMessage = payload.toJavaObject(EventMessage.class); eventMessage.setEvent(topic[topic.length - 1]); return eventMessage; } }, ``` * 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec) ``` package cn.flyrise.iot.protocol.agree.mqtt; import cn.flyrise.iot.protocol.topic.TopicMessage; import cn.flyrise.iot.protocol.topic.TopicMessageCodec; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DisconnectDeviceMessage; import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; import reactor.core.publisher.Mono; import javax.annotation.Nonnull; import java.nio.charset.StandardCharsets; /** * MQTT 编解码器 * @author zhangqiang * @since 0.1 */ @Slf4j public class FlyriseMqttDeviceMessageCodec implements DeviceMessageCodec { private final Transport transport; public FlyriseMqttDeviceMessageCodec(Transport transport) { this.transport = transport; } public FlyriseMqttDeviceMessageCodec() { this(DefaultTransport.MQTT); } @Override public Transport getSupportTransport() { return transport; } /** * MQTT 编码器 * @param context * @return */ @Nonnull public Mono encode(@Nonnull MessageEncodeContext context) { Message message = context.getMessage(); return Mono.defer(() -> { if (message instanceof DeviceMessage) { // 断开连接操作 if (message instanceof DisconnectDeviceMessage) { return ((ToDeviceMessageContext) context) .disconnect() .then(Mono.empty()); } DeviceMessage deviceMessage = (DeviceMessage) message; TopicMessage msg = TopicMessageCodec.encode(deviceMessage); if (null == msg) { return Mono.empty(); } log.info("pai-official-protocol TopicMessage:{}",JSONObject.toJSONString(msg)); return Mono .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf)) //是否有产品信息 .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId()) .flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId)) ) .defaultIfEmpty("null") .map(productId -> SimpleMqttMessage .builder() .clientId(deviceMessage.getDeviceId()) .topic("/".concat(productId).concat(msg.getTopic())) .payloadType(MessagePayloadType.JSON) .payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(msg.getMessage()))) .build()); } return Mono.empty(); }); } /** * MQTT 解码器 * @param context * @return */ @Nonnull @Override public Mono decode(@Nonnull MessageDecodeContext context) { return Mono.fromSupplier(() -> { // 转为mqtt消息 MqttMessage mqttMessage = (MqttMessage) context.getMessage(); // 消息主题 String topic = mqttMessage.getTopic(); // 消息体转为json JSONObject payload = JSON.parseObject(mqttMessage.getPayload().toString(StandardCharsets.UTF_8)); DeviceOperator deviceOperator = context.getDevice(); return TopicMessageCodec.decode(deviceOperator,topic, payload); }); } } ``` ### Topic主题变更 * 自己定义的Topoc 需要对应 标准协议Topic对应的物模型数据结构 ``` //上报属性数据 reportProperty("/*/properties/report", ReportPropertyMessage.class){ // to do 自定义解码器 doDecode(DeviceOperator device, String topic, JSONObject payload) }, ``` * 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec)