You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

7.5 KiB

标准消息编解码协议

终端设备可以根据此标准消息解码协议进行对接物联平台

标准协议Topic

 * 标准编解码器
 *
 * 包含:码库、编码、解码
 * 码库:产品标识+设备标识+码库特定义
 * 如:属性上报: 产品标识+设备标识+ /properties/report
 *
 * 注释:
 * A.默认为标准物模型数据格式,这直接 payload.toJavaObject(type) 转换
 * B.非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换
 *
 *
 * <pre>
 *     下行Topic:
 *          读取设备属性: /{productId}/{deviceId}/properties/read
 *          修改设备属性: /{productId}/{deviceId}/properties/write
 *          调用设备功能: /{productId}/{deviceId}/function/invoke
 *
 *          //网关设备
 *          读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
 *          修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
 *          调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
 *
 *      上行Topic:
 *          读取属性回复: /{productId}/{deviceId}/properties/read/reply
 *          修改属性回复: /{productId}/{deviceId}/properties/write/reply
 *          调用设备功能: /{productId}/{deviceId}/function/invoke/reply
 *          上报设备事件: /{productId}/{deviceId}/event/{eventId}
 *          上报设备属性: /{productId}/{deviceId}/properties/report
 *          上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
 *
 *          //网关设备
 *          子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
 *          子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
 *          读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
 *          修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
 *          调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
 *          上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
 *          上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
 *
 * </pre>
 * 基于jet links 的消息编解码器

支持协议

  • V1.0 支持MQTT

使用

使用命名mvn package 打包后, 通过jetlinks管理界面设备管理-协议管理中上传jar包. 类名填写: cn.flyrise.iot.protocol.PaiProtocolSupportProvider

扩展开发

pai-official-protocol工程也可以当作是标准协议骨架,可进行自定义编解码协议的开发

Topic主题不变-只是进行数据转换

  • 非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 (TopicMessageCodec)
    //事件上报 - 转行标准数据
    event("/*/event/*", EventMessage.class){
        @Override
        DeviceMessage doDecode(DeviceOperator device, String[] topic, JSONObject payload){
            EventMessage eventMessage = payload.toJavaObject(EventMessage.class);
            eventMessage.setEvent(topic[topic.length - 1]);
            return eventMessage;
        }
    },
  • 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec)
package cn.flyrise.iot.protocol.agree.mqtt;

import cn.flyrise.iot.protocol.topic.TopicMessage;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import reactor.core.publisher.Mono;

import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;

/**
 * MQTT 编解码器
 * @author zhangqiang
 * @since 0.1
 */
@Slf4j
public class FlyriseMqttDeviceMessageCodec implements DeviceMessageCodec {

    private final Transport transport;

    public FlyriseMqttDeviceMessageCodec(Transport transport) {
        this.transport = transport;
    }

    public FlyriseMqttDeviceMessageCodec() {
        this(DefaultTransport.MQTT);
    }

    @Override
    public Transport getSupportTransport() {
        return transport;
    }

    /**
     * MQTT 编码器
     * @param context
     * @return
     */
    @Nonnull
    public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {

        Message message = context.getMessage();
        return Mono.defer(() -> {
            if (message instanceof DeviceMessage) {
                // 断开连接操作
                if (message instanceof DisconnectDeviceMessage) {
                    return ((ToDeviceMessageContext) context)
                            .disconnect()
                            .then(Mono.empty());
                }

                DeviceMessage deviceMessage = (DeviceMessage) message;
                TopicMessage msg = TopicMessageCodec.encode(deviceMessage);
                if (null == msg) {
                    return Mono.empty();
                }

                log.info("pai-official-protocol TopicMessage:{}",JSONObject.toJSONString(msg));
                return Mono
                        .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))  //是否有产品信息
                        .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
                                .flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
                        )
                        .defaultIfEmpty("null")
                        .map(productId -> SimpleMqttMessage
                                .builder()
                                .clientId(deviceMessage.getDeviceId())
                                .topic("/".concat(productId).concat(msg.getTopic()))
                                .payloadType(MessagePayloadType.JSON)
                                .payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(msg.getMessage())))
                                .build());
            }
            return Mono.empty();

        });
    }

    /**
     * MQTT 解码器
     * @param context
     * @return
     */
    @Nonnull
    @Override
    public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {

        return Mono.fromSupplier(() -> {
            // 转为mqtt消息
            MqttMessage mqttMessage = (MqttMessage) context.getMessage();
            // 消息主题
            String topic = mqttMessage.getTopic();
            // 消息体转为json
            JSONObject payload = JSON.parseObject(mqttMessage.getPayload().toString(StandardCharsets.UTF_8));
            DeviceOperator deviceOperator = context.getDevice();
            return TopicMessageCodec.decode(deviceOperator,topic, payload);
        });
    }

}

Topic主题变更

  • 自己定义的Topoc 需要对应 标准协议Topic对应的物模型数据结构
    //上报属性数据
    reportProperty("/*/properties/report", ReportPropertyMessage.class){
        // to do 自定义解码器 doDecode(DeviceOperator device, String topic, JSONObject payload)
    },
  • 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec)