commit 319e00b1cebb1f291b88f568228bf900628a0e47 Author: accpt27chen/zhangq <13631262097@139.com> Date: Wed Oct 12 14:06:24 2022 +0800 初始化 diff --git a/README.md b/README.md new file mode 100644 index 0000000..619eaaa --- /dev/null +++ b/README.md @@ -0,0 +1,200 @@ +# 标准消息编解码协议 +终端设备可以根据此标准消息解码协议进行对接物联平台 + +## 标准协议Topic +``` + * 标准编解码器 + * + * 包含:码库、编码、解码 + * 码库:产品标识+设备标识+码库特定义 + * 如:属性上报: 产品标识+设备标识+ /properties/report + * + * 注释: + * A.默认为标准物模型数据格式,这直接 payload.toJavaObject(type) 转换 + * B.非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 + * + * + *
+ *     下行Topic:
+ *          读取设备属性: /{productId}/{deviceId}/properties/read
+ *          修改设备属性: /{productId}/{deviceId}/properties/write
+ *          调用设备功能: /{productId}/{deviceId}/function/invoke
+ *
+ *          //网关设备
+ *          读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
+ *          修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
+ *          调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
+ *
+ *      上行Topic:
+ *          读取属性回复: /{productId}/{deviceId}/properties/read/reply
+ *          修改属性回复: /{productId}/{deviceId}/properties/write/reply
+ *          调用设备功能: /{productId}/{deviceId}/function/invoke/reply
+ *          上报设备事件: /{productId}/{deviceId}/event/{eventId}
+ *          上报设备属性: /{productId}/{deviceId}/properties/report
+ *          上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
+ *
+ *          //网关设备
+ *          子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
+ *          子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
+ *          读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
+ *          修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
+ *          调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
+ *          上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
+ *          上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
+ *
+ * 
+ * 基于jet links 的消息编解码器 +``` + +## 支持协议 +* V1.0 支持MQTT + +## 使用 + +使用命名`mvn package` 打包后, 通过jetlinks管理界面`设备管理-协议管理`中上传jar包. +类名填写: `cn.flyrise.iot.protocol.PaiProtocolSupportProvider` + +## 扩展开发 +pai-official-protocol工程也可以当作是标准协议骨架,可进行自定义编解码协议的开发 +### Topic主题不变-只是进行数据转换 + +* 非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 (TopicMessageCodec) + +``` + //事件上报 - 转行标准数据 + event("/*/event/*", EventMessage.class){ + @Override + DeviceMessage doDecode(DeviceOperator device, String[] topic, JSONObject payload){ + EventMessage eventMessage = payload.toJavaObject(EventMessage.class); + eventMessage.setEvent(topic[topic.length - 1]); + return eventMessage; + } + }, +``` +* 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec) + +``` +package cn.flyrise.iot.protocol.agree.mqtt; + +import cn.flyrise.iot.protocol.topic.TopicMessage; +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.device.DeviceConfigKey; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DisconnectDeviceMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; + +/** + * MQTT 编解码器 + * @author zhangqiang + * @since 0.1 + */ +@Slf4j +public class FlyriseMqttDeviceMessageCodec implements DeviceMessageCodec { + + private final Transport transport; + + public FlyriseMqttDeviceMessageCodec(Transport transport) { + this.transport = transport; + } + + public FlyriseMqttDeviceMessageCodec() { + this(DefaultTransport.MQTT); + } + + @Override + public Transport getSupportTransport() { + return transport; + } + + /** + * MQTT 编码器 + * @param context + * @return + */ + @Nonnull + public Mono encode(@Nonnull MessageEncodeContext context) { + + Message message = context.getMessage(); + return Mono.defer(() -> { + if (message instanceof DeviceMessage) { + // 断开连接操作 + if (message instanceof DisconnectDeviceMessage) { + return ((ToDeviceMessageContext) context) + .disconnect() + .then(Mono.empty()); + } + + DeviceMessage deviceMessage = (DeviceMessage) message; + TopicMessage msg = TopicMessageCodec.encode(deviceMessage); + if (null == msg) { + return Mono.empty(); + } + + log.info("pai-official-protocol TopicMessage:{}",JSONObject.toJSONString(msg)); + return Mono + .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf)) //是否有产品信息 + .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId()) + .flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId)) + ) + .defaultIfEmpty("null") + .map(productId -> SimpleMqttMessage + .builder() + .clientId(deviceMessage.getDeviceId()) + .topic("/".concat(productId).concat(msg.getTopic())) + .payloadType(MessagePayloadType.JSON) + .payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(msg.getMessage()))) + .build()); + } + return Mono.empty(); + + }); + } + + /** + * MQTT 解码器 + * @param context + * @return + */ + @Nonnull + @Override + public Mono decode(@Nonnull MessageDecodeContext context) { + + return Mono.fromSupplier(() -> { + // 转为mqtt消息 + MqttMessage mqttMessage = (MqttMessage) context.getMessage(); + // 消息主题 + String topic = mqttMessage.getTopic(); + // 消息体转为json + JSONObject payload = JSON.parseObject(mqttMessage.getPayload().toString(StandardCharsets.UTF_8)); + DeviceOperator deviceOperator = context.getDevice(); + return TopicMessageCodec.decode(deviceOperator,topic, payload); + }); + } + +} + +``` + +### Topic主题变更 +* 自己定义的Topoc 需要对应 标准协议Topic对应的物模型数据结构 +``` + //上报属性数据 + reportProperty("/*/properties/report", ReportPropertyMessage.class){ + // to do 自定义解码器 doDecode(DeviceOperator device, String topic, JSONObject payload) + }, +``` +* 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec) + + + + diff --git a/pai-subsystem-protocol.iml b/pai-subsystem-protocol.iml new file mode 100644 index 0000000..8021953 --- /dev/null +++ b/pai-subsystem-protocol.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..cdd375f --- /dev/null +++ b/pom.xml @@ -0,0 +1,116 @@ + + + 4.0.0 + + cn.flyrise.iot.protocol + pai-official-protocol + 1.0 + + UTF-8 + zh_CN + 1.8 + ${java.version} + 4.1.51.Final + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${project.build.jdk} + ${project.build.jdk} + ${project.build.sourceEncoding} + + + + + + + + org.jetlinks + jetlinks-core + 1.1.9 + + + org.jetlinks + jetlinks-supports + 1.1.6 + + + org.eclipse.californium + californium-core + 2.2.1 + true + + + org.projectlombok + lombok + 1.18.10 + + + io.vertx + vertx-core + 3.8.3 + test + + + org.junit.jupiter + junit-jupiter + 5.5.2 + test + + + + ch.qos.logback + logback-classic + 1.2.3 + + + io.vertx + vertx-core + 3.8.3 + compile + + + + org.springframework + spring-webflux + 5.2.5.RELEASE + + + + + + + + io.netty + netty-bom + ${netty.version} + pom + import + + + + + + + hsweb-nexus + Nexus Release Repository + http://nexus.hsweb.me/content/groups/public/ + + true + always + + + + aliyun-nexus + aliyun + http://maven.aliyun.com/nexus/content/groups/public/ + + + \ No newline at end of file diff --git a/src/main/java/cn/flyrise/iot/protocol/PaiProtocolSupportProvider.java b/src/main/java/cn/flyrise/iot/protocol/PaiProtocolSupportProvider.java new file mode 100644 index 0000000..0b81087 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/PaiProtocolSupportProvider.java @@ -0,0 +1,153 @@ +package cn.flyrise.iot.protocol; + +import cn.flyrise.iot.protocol.agree.coap.PaiCoapDTLSDeviceMessageCodec; +import cn.flyrise.iot.protocol.agree.coap.PaiCoapDeviceMessageCodec; +import cn.flyrise.iot.protocol.agree.http.PaiHttpAuthenticator; +import cn.flyrise.iot.protocol.agree.http.PaiHttpDeviceMessageCodec; +import cn.flyrise.iot.protocol.agree.mqtt.PaiMqttAuthenticator; +import cn.flyrise.iot.protocol.agree.mqtt.PaiMqttDeviceMessageCodec; +import cn.flyrise.iot.protocol.agree.websocket.PaiWebsocketDeviceMessageCodec; +import org.jetlinks.core.ProtocolSupport; +import org.jetlinks.core.defaults.CompositeProtocolSupport; +import org.jetlinks.core.message.codec.DefaultTransport; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.DeviceConfigScope; +import org.jetlinks.core.metadata.types.EnumType; +import org.jetlinks.core.metadata.types.PasswordType; +import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.spi.ProtocolSupportProvider; +import org.jetlinks.core.spi.ServiceContext; +import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; +import reactor.core.publisher.Mono; + +/** + * 物联协议 + * Flyrise-MQTT + * @author zhangqiang + * @since 0.1 + */ +public class PaiProtocolSupportProvider implements ProtocolSupportProvider { + + + @Override + public void dispose() { + //协议卸载时执行 + } + + /** + * MQTT 配置信息 + */ + private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata( + "MQTT认证配置" + , "MQTT客户端clentId为设备id") + .add("username", "username", "MQTT用户名", StringType.GLOBAL) + .add("password", "password", "MQTT密码", PasswordType.GLOBAL) + .add("clientid", "clientid", "MQTT客户端ID", StringType.GLOBAL) + .add("host", "host", "设备host地址", StringType.GLOBAL); + + + /** + * COAP 配置信息 + */ + private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata( + "CoAP认证配置", + "使用CoAP进行数据上报时,需要对数据进行加密:" + + "encrypt(payload,secureKey);") + .add("encAlg", "加密算法", "加密算法", new EnumType() + .addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product) + .add("secureKey", "密钥", "16位密钥KEY", new PasswordType()); + + /** + * COAPDTLS 配置信息 + */ + private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata( + "CoAP DTLS配置", + "使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" + + "之后上报数据需要在Option中携带token信息. \n" + + "自定义Option: 2110,sign ; 2111,token ") + .add("secureKey", "密钥", "认证签名密钥", new PasswordType()); + + + /** + * MQTT 配置信息 + */ + private static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata( + "HTTP认证配置" + , "设备请求Authorization为设备id") + .add("authorization", "authorization", "授权信息", StringType.GLOBAL); + + + @Override + public Mono create(ServiceContext context) { + CompositeProtocolSupport support = new CompositeProtocolSupport(); + + // 编解码协议基础信息 + support.setId("pai-official-protocol"); + support.setName("pai-official-protocol"); + support.setDescription("pai-official-protocol Version 1.0"); + // 固定为JetLinksDeviceMetadataCodec + support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); + + // 如下配置支持的协议(agree目录下开发的协议) 支持协议: MQTT 、COAP 、COAPDTLS 、HTTP + + // MQTT =========================================================================================================== + { + // MQTT认证策略 + support.addAuthenticator(DefaultTransport.MQTT, new PaiMqttAuthenticator()); + support.addAuthenticator(DefaultTransport.MQTT_TLS, new PaiMqttAuthenticator()); + // MQTT需要的配置信息 + support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig); + support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig); + // MQTT消息解码器 + support.addMessageCodecSupport(new PaiMqttDeviceMessageCodec(DefaultTransport.MQTT)); + support.addMessageCodecSupport(new PaiMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS)); + } + + // COAP =========================================================================================================== + { + // COAP认证策略 (COAP使用数据加解密认证,CoAP DTLS使用签名认证) + // COAP需要的配置信息 + support.addConfigMetadata(DefaultTransport.CoAP, coapConfig); + support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig); + // COAP消息解码器 + support.addMessageCodecSupport(new PaiCoapDeviceMessageCodec()); + support.addMessageCodecSupport(new PaiCoapDTLSDeviceMessageCodec()); + } + + // HTTP =========================================================================================================== + { + // HTTP认证策略 + support.addAuthenticator(DefaultTransport.HTTP, new PaiHttpAuthenticator()); + support.addAuthenticator(DefaultTransport.HTTPS, new PaiHttpAuthenticator()); + // HTTP需要的配置信息 + support.addConfigMetadata(DefaultTransport.HTTP, httpConfig); + // HTTP消息解码器 + support.addMessageCodecSupport(new PaiHttpDeviceMessageCodec(DefaultTransport.HTTP)); + support.addMessageCodecSupport(new PaiHttpDeviceMessageCodec(DefaultTransport.HTTPS)); + } + + // TCP =========================================================================================================== + { + // TCP认证策略 + // TCP需要的配置信息 + // TCP消息解码器 + } + + // UDP =========================================================================================================== + { + // UDP认证策略 + // UDP需要的配置信息 + // UDP消息解码器 + } + + // WebSocket ====================================================================================================== + { + // WebSocket认证策略 + // WebSocket需要的配置信息 + // WebSocket消息解码器 + support.addMessageCodecSupport(new PaiWebsocketDeviceMessageCodec()); + } + + return Mono.just(support); + } +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/coap/AbstractCoapDeviceMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/agree/coap/AbstractCoapDeviceMessageCodec.java new file mode 100644 index 0000000..cb3c835 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/coap/AbstractCoapDeviceMessageCodec.java @@ -0,0 +1,97 @@ +package cn.flyrise.iot.protocol.agree.coap; + +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.codec.*; +import org.reactivestreams.Publisher; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * COAP 默认编解码器 + * + * 目前COAP只支持上行,所以只需要解码处理 + */ +@Slf4j +public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec { + + protected abstract Flux decode(CoapMessage message, MessageDecodeContext context, Consumer response); + + protected String getPath(CoapMessage message){ + String path = message.getPath(); + if (!path.startsWith("/")) { + path = "/" + path; + } + return path; + } + + protected String getDeviceId(CoapMessage message){ + String deviceId = message.getStringOption(2100).orElse(null); + String[] paths = TopicMessageCodec.removeProductPath(getPath(message)); + if (StringUtils.isEmpty(deviceId) && paths.length > 1) { + deviceId = paths[1]; + } + return deviceId; + } + + /** + * 解码 + * @param context + * @return + */ + @Nonnull + @Override + public Flux decode(@Nonnull MessageDecodeContext context) { + if (context.getMessage() instanceof CoapExchangeMessage) { + CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage()); + AtomicBoolean alreadyReply = new AtomicBoolean(); + Consumer responseHandler = (resp) -> { + if (alreadyReply.compareAndSet(false, true)) { + if (resp instanceof CoAP.ResponseCode) { + exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp)); + } + if (resp instanceof String) { + exchangeMessage.getExchange().respond(((String) resp)); + } + if (resp instanceof byte[]) { + exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp)); + } + } + }; + + return this + .decode(exchangeMessage, context, responseHandler) + .doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED)) + .doOnError(error -> { + log.error("decode coap message error", error); + responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST); + }) + .switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST))); + } + if (context.getMessage() instanceof CoapMessage) { + return decode(((CoapMessage) context.getMessage()), context, resp -> { + log.info("skip response coap request:{}", resp); + }); + } + + return Flux.empty(); + } + + /** + * 编码 + * @param context + * @return + */ + @Nonnull + @Override + public Publisher encode(@Nonnull MessageEncodeContext context) { + return Mono.empty(); + } +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDTLSDeviceMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDTLSDeviceMessageCodec.java new file mode 100644 index 0000000..35c7ca8 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDTLSDeviceMessageCodec.java @@ -0,0 +1,137 @@ +package cn.flyrise.iot.protocol.agree.coap; + +import cn.flyrise.iot.protocol.functional.FunctionalTopicHandlers; +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import cn.flyrise.iot.protocol.config.ObjectMappers; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.OptionNumberRegistry; +import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.codec.CoapMessage; +import org.jetlinks.core.message.codec.DefaultTransport; +import org.jetlinks.core.message.codec.MessageDecodeContext; +import org.jetlinks.core.message.codec.Transport; +import org.springframework.http.MediaType; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Arrays; +import java.util.function.Consumer; + +@Slf4j +public class PaiCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { + + @Override + public Transport getSupportTransport() { + return DefaultTransport.CoAP_DTLS; + } + + + /** + * 解码 + * @param message + * @param context + * @param response + * @return + */ + public Flux decode(CoapMessage message, MessageDecodeContext context, Consumer response) { + if (context.getDevice() == null) { + return Flux.empty(); + } + return Flux.defer(() -> { + String path = getPath(message); + String deviceId = getDeviceId(message); + String sign = message.getStringOption(2110).orElse(null); + String token = message.getStringOption(2111).orElse(null); + byte[] payload = message.payloadAsBytes(); + boolean cbor = message + .getStringOption(OptionNumberRegistry.CONTENT_FORMAT) + .map(MediaType::valueOf) + .map(MediaType.APPLICATION_CBOR::includes) + .orElse(false); + ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER; + + if (StringUtils.isEmpty(deviceId)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + // TODO: 2021/7/30 移到 FunctionalTopicHandlers + if (path.endsWith("/request-token")) { + //认证 + return context + .getDevice(deviceId) + .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED))) + .flatMap(device -> device + .getConfig("secureKey") + .flatMap(sk -> { + String secureKey = sk.asString(); + if (!verifySign(secureKey, deviceId, payload, sign)) { + response.accept(CoAP.ResponseCode.BAD_REQUEST); + return Mono.empty(); + } + String newToken = IDGenerator.MD5.generate(); + return device + .setConfig("coap-token", newToken) + .doOnSuccess(success -> { + JSONObject json = new JSONObject(); + json.put("token", newToken); + response.accept(json.toJSONString()); + }); + })) + .then(Mono.empty()); + } + if (StringUtils.isEmpty(token)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + return context + .getDevice(deviceId) + .flatMapMany(device -> device + .getSelfConfig("coap-token") + .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED))) + .flatMapMany(value -> { + String tk = value.asString(); + if (!token.equals(tk)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + return TopicMessageCodec + .decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload) + //如果不能直接解码,可能是其他设备功能 + .switchIfEmpty(FunctionalTopicHandlers + .handle(device, + path.split("/"), + payload, + objectMapper, + reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload())))); + })) + + .doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED)) + .doOnError(error -> { + log.error("decode coap message error", error); + response.accept(CoAP.ResponseCode.BAD_REQUEST); + }); + }); + + } + + + protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) { + //验证签名 + byte[] secureKeyBytes = secureKey.getBytes(); + byte[] signPayload = Arrays.copyOf(payload, payload.length + secureKeyBytes.length); + System.arraycopy(secureKeyBytes, 0, signPayload, 0, secureKeyBytes.length); + if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(signPayload).equalsIgnoreCase(sign)) { + log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload); + return false; + } + return true; + } + + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDeviceMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDeviceMessageCodec.java new file mode 100644 index 0000000..3749704 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDeviceMessageCodec.java @@ -0,0 +1,72 @@ +package cn.flyrise.iot.protocol.agree.coap; + +import cn.flyrise.iot.protocol.functional.FunctionalTopicHandlers; +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import cn.flyrise.iot.protocol.cipher.Ciphers; +import cn.flyrise.iot.protocol.config.ObjectMappers; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.OptionNumberRegistry; +import org.jetlinks.core.Value; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.codec.CoapMessage; +import org.jetlinks.core.message.codec.DefaultTransport; +import org.jetlinks.core.message.codec.MessageDecodeContext; +import org.jetlinks.core.message.codec.Transport; +import org.springframework.http.MediaType; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.Consumer; + +@Slf4j +public class PaiCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { + + @Override + public Transport getSupportTransport() { + return DefaultTransport.CoAP; + } + + /** + * 解码 + * @param message + * @param context + * @param response + * @return + */ + protected Flux decode(CoapMessage message, MessageDecodeContext context, Consumer response) { + String path = getPath(message); + String deviceId = getDeviceId(message); + boolean cbor = message + .getStringOption(OptionNumberRegistry.CONTENT_FORMAT) + .map(MediaType::valueOf) + .map(MediaType.APPLICATION_CBOR::includes) + .orElse(false); + ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER; + return context + .getDevice(deviceId) + .flatMapMany(device -> device + .getConfigs("encAlg", "secureKey") + .flatMapMany(configs -> { + Ciphers ciphers = configs + .getValue("encAlg") + .map(Value::asString) + .flatMap(Ciphers::of) + .orElse(Ciphers.AES); + String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null); + byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey); + //解码 + return TopicMessageCodec + .decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload) + //如果不能直接解码,可能是其他设备功能 + .switchIfEmpty(FunctionalTopicHandlers + .handle(device, + path.split("/"), + payload, + objectMapper, + reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload())))); + })); + } + + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpAuthenticator.java b/src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpAuthenticator.java new file mode 100644 index 0000000..5c4af9f --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpAuthenticator.java @@ -0,0 +1,77 @@ +package cn.flyrise.iot.protocol.agree.http; + +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import org.jetlinks.core.Value; +import org.jetlinks.core.defaults.Authenticator; +import org.jetlinks.core.device.*; +import org.jetlinks.core.message.codec.http.Header; +import org.jetlinks.core.message.codec.http.HttpExchangeMessage; +import org.jetlinks.pro.network.http.HttpAuthenticationRequest; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * HTTP 认证授权 + * @author zhangqiang + * @since 0.1 + */ +public class PaiHttpAuthenticator implements Authenticator { + + /** + * 在网络连接建立的时候,可能无法获取设备的标识(如:http,websocket等),则会调用此方法来进行认证. + * 注意: 认证通过后,需要设置设备ID.{@link AuthenticationResponse#success(String)} + * @param request 认证请求 + * @param registry 设备注册中心 + * @return 认证结果 + */ + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) { + if (request instanceof HttpAuthenticationRequest) { + HttpAuthenticationRequest http = ((HttpAuthenticationRequest) request); + HttpExchangeMessage message = http.getHttpExchangeMessage(); + String[] topics = TopicMessageCodec.removeProductPath(message.getPath()); + + return registry + .getDevice(topics[1]) + .flatMap(device -> authenticate(request, device)); + } + return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request)); + + } + + /** + * 对指定对设备进行认证 + * + * @param request 认证请求 + * @param deviceOperation 设备 + * @return 认证结果 + */ + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) { + if (request instanceof HttpAuthenticationRequest) { + HttpAuthenticationRequest http = ((HttpAuthenticationRequest) request); + HttpExchangeMessage message = http.getHttpExchangeMessage(); + + String[] topics = TopicMessageCodec.removeProductPath(message.getPath()); + if(!deviceOperation.getDeviceId().equals(topics[1]))return Mono.just(AuthenticationResponse.error(400, "请求设备deviceId不存在")); + + return deviceOperation.getConfigs("authorization") + .flatMap(values -> { + Header header_authorization = message.getHeader("Authorization").orElse(null); + if(header_authorization != null){ + String[] authorizationValues = header_authorization.getValue(); + String auth = values.getValue("authorization").map(Value::asString).orElse(null); + if(authorizationValues!=null + && authorizationValues.length>0 + && authorizationValues[0].equals(auth))return Mono.just(AuthenticationResponse.success()); + } + return Mono.just(AuthenticationResponse.error(400, "Authorization错误")); + }); + } + return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request)); + } +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpDeviceMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpDeviceMessageCodec.java new file mode 100644 index 0000000..700c9ef --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpDeviceMessageCodec.java @@ -0,0 +1,87 @@ +package cn.flyrise.iot.protocol.agree.http; + +import cn.flyrise.iot.protocol.config.ObjectMappers; +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.Unpooled; +import lombok.AllArgsConstructor; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.RepayableDeviceMessage; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.message.codec.http.HttpExchangeMessage; +import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage; +import org.springframework.http.MediaType; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; + +@AllArgsConstructor +public class PaiHttpDeviceMessageCodec implements DeviceMessageCodec { + + private final Transport transport; + + private final ObjectMapper mapper; + + public PaiHttpDeviceMessageCodec(Transport transport) { + this.transport = transport; + this.mapper = ObjectMappers.JSON_MAPPER; + } + + public PaiHttpDeviceMessageCodec() { + this(DefaultTransport.HTTP); + } + + @Override + public Transport getSupportTransport() { + return transport; + } + + /** + * 解码 + * @param context + * @return + */ + @Nonnull + @Override + public Flux decode(@Nonnull MessageDecodeContext context) { + HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage(); + + // 消息解码 + return TopicMessageCodec + .decode(mapper, TopicMessageCodec.removeProductPath(message.getPath()), message.payloadAsBytes()) + .switchIfEmpty(Mono.defer(() -> { + //未转换成功,响应404 + return message.response(SimpleHttpResponseMessage + .builder() + .status(404) + .contentType(MediaType.APPLICATION_JSON) + .payload(Unpooled.wrappedBuffer("{\"success\":false}".getBytes())) + .build()).then(Mono.empty()); + })) + .flatMap(msg -> { + //响应成功 + return message.response(SimpleHttpResponseMessage + .builder() + .status(200) + .contentType(MediaType.APPLICATION_JSON) + .payload(Unpooled.wrappedBuffer("{\"success\":true}".getBytes())) + .build()) + .thenReturn(msg); + }); + } + + /** + * 编码 + * @param context + * @return + */ + public Mono encode(MessageEncodeContext context) { + + return context + .reply(((RepayableDeviceMessage) context.getMessage()).newReply().success()) + .then(Mono.empty()); + } + + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttAuthenticator.java b/src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttAuthenticator.java new file mode 100644 index 0000000..a37758a --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttAuthenticator.java @@ -0,0 +1,66 @@ +package cn.flyrise.iot.protocol.agree.mqtt; + +import org.jetlinks.core.Value; +import org.jetlinks.core.defaults.Authenticator; +import org.jetlinks.core.device.*; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; + +/** + * MQTT 认证授权 + * @author zhangqiang + * @since 0.1 + */ +public class PaiMqttAuthenticator implements Authenticator { + + /** + * 在MQTT服务网关中指定了认证协议时,将调用此方法进行认证。 + * 注意: 认证通过后,需要设置设备ID.{@link AuthenticationResponse#success(String)} + * @param request 认证请求 + * @param registry 设备注册中心 + * @return 认证结果 + */ + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) { + if (request instanceof MqttAuthenticationRequest) { + MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request); + + return registry + .getDevice(mqtt.getClientId()) + .flatMap(device -> authenticate(request, device)); + } + return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request)); + + } + + /** + * 对指定对设备进行认证 + * + * @param request 认证请求 + * @param deviceOperation 设备 + * @return 认证结果 + */ + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) { + if (request instanceof MqttAuthenticationRequest) { + MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request); + + if(!mqtt.getClientId().equals(deviceOperation.getDeviceId())) return Mono.just(AuthenticationResponse.error(400, "客户端clientId与设备deviceId不一致")); + return deviceOperation.getConfigs("username", "password") + .flatMap(values -> { + String username = values.getValue("username").map(Value::asString).orElse(null); + String password = values.getValue("password").map(Value::asString).orElse(null); + if (mqtt.getUsername().equals(username) && mqtt + .getPassword() + .equals(password)) { + return Mono.just(AuthenticationResponse.success()); + } else { + return Mono.just(AuthenticationResponse.error(400, "账号或密码错误")); + } + + }); + } + return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request)); + } +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttDeviceMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttDeviceMessageCodec.java new file mode 100644 index 0000000..5edaebb --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttDeviceMessageCodec.java @@ -0,0 +1,136 @@ +package cn.flyrise.iot.protocol.agree.mqtt; + +import cn.flyrise.iot.protocol.functional.FunctionalTopicHandlers; +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import cn.flyrise.iot.protocol.topic.TopicPayload; +import cn.flyrise.iot.protocol.config.ObjectMappers; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.device.DeviceConfigKey; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DisconnectDeviceMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; + +/** + * MQTT 编解码器 + * @author zhangqiang + * @since 0.1 + */ +@Slf4j +public class PaiMqttDeviceMessageCodec implements DeviceMessageCodec { + + private final Transport transport; + + private final ObjectMapper mapper; + + public PaiMqttDeviceMessageCodec(Transport transport) { + this.transport = transport; + this.mapper = ObjectMappers.JSON_MAPPER; + } + + public PaiMqttDeviceMessageCodec() { + this(DefaultTransport.MQTT); + } + + @Override + public Transport getSupportTransport() { + return transport; + } + + /** + * 编码 + * @param context + * @return + */ + @Nonnull + public Mono encode(@Nonnull MessageEncodeContext context) { + return Mono.defer(() -> { + Message message = context.getMessage(); + + if (message instanceof DisconnectDeviceMessage) { + return ((ToDeviceMessageContext) context) + .disconnect() + .then(Mono.empty()); + } + + if (message instanceof DeviceMessage) { + DeviceMessage deviceMessage = ((DeviceMessage) message); + + TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage); + if (convertResult == null) { + return Mono.empty(); + } + return Mono + .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf)) + .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId()) + .flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId)) + ) + .defaultIfEmpty("null") + .map(productId -> SimpleMqttMessage + .builder() + .clientId(deviceMessage.getDeviceId()) + .topic("/".concat(productId).concat(convertResult.getTopic())) + .payloadType(MessagePayloadType.JSON) + .payload(Unpooled.wrappedBuffer(convertResult.getPayload())) + .build()); + } else { + return Mono.empty(); + } + }); + } + + /** + * 解码 + * @param context + * @return + */ + @Nonnull + @Override + public Flux decode(@Nonnull MessageDecodeContext context) { + MqttMessage message = (MqttMessage) context.getMessage(); + byte[] payload = message.payloadAsBytes(); + + return TopicMessageCodec + .decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload) + //如果不能直接解码,可能是其他设备功能 + .switchIfEmpty(FunctionalTopicHandlers + .handle(context.getDevice(), + message.getTopic().split("/"), + payload, + mapper, + reply -> doReply(context, reply))) + ; + + } + + private Mono doReply(MessageCodecContext context, TopicPayload reply) { + + if (context instanceof FromDeviceMessageContext) { + return ((FromDeviceMessageContext) context) + .getSession() + .send(SimpleMqttMessage + .builder() + .topic(reply.getTopic()) + .payload(reply.getPayload()) + .build()) + .then(); + } else if (context instanceof ToDeviceMessageContext) { + return ((ToDeviceMessageContext) context) + .sendToDevice(SimpleMqttMessage + .builder() + .topic(reply.getTopic()) + .payload(reply.getPayload()) + .build()) + .then(); + } + return Mono.empty(); + + } + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/agree/websocket/PaiWebsocketDeviceMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/agree/websocket/PaiWebsocketDeviceMessageCodec.java new file mode 100644 index 0000000..febacda --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/agree/websocket/PaiWebsocketDeviceMessageCodec.java @@ -0,0 +1,95 @@ +package cn.flyrise.iot.protocol.agree.websocket; + +import cn.flyrise.iot.protocol.config.ObjectMappers; +import cn.flyrise.iot.protocol.topic.TopicMessageCodec; +import cn.flyrise.iot.protocol.topic.TopicPayload; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.Unpooled; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DisconnectDeviceMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage; +import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage; +import org.jetlinks.core.message.codec.http.websocket.WebSocketSession; +import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; + + +public class PaiWebsocketDeviceMessageCodec implements DeviceMessageCodec { + + private final Transport transport; + + private final ObjectMapper mapper; + + public PaiWebsocketDeviceMessageCodec(Transport transport) { + this.transport = transport; + this.mapper = ObjectMappers.JSON_MAPPER; + } + + public PaiWebsocketDeviceMessageCodec() { + this(DefaultTransport.WebSocket); + } + + @Override + public Transport getSupportTransport() { + return transport; + } + + /** + * 解码 + * @param context + * @return + */ + @Nonnull + @Override + public Flux decode(@Nonnull MessageDecodeContext context) { + WebSocketSessionMessage mqttMessage = (WebSocketSessionMessage) context.getMessage(); + WebSocketSession session = mqttMessage.getWebSocketSession(); + + byte[] payload = mqttMessage.payloadAsBytes(); + + return TopicMessageCodec + .decode(mapper, TopicMessageCodec.removeProductPath(session.getUri()), payload) + .switchIfEmpty(Mono.defer(() -> { + //未转换成功,响应404 + return session + .send(session.textMessage("{\"status\":404}")) + .then(Mono.empty()); + })); + } + + + public Mono encode(MessageEncodeContext context) { + return Mono.defer(() -> { + Message message = context.getMessage(); + + if (message instanceof DisconnectDeviceMessage) { + return ((ToDeviceMessageContext) context) + .disconnect() + .then(Mono.empty()); + } + + if (message instanceof DeviceMessage) { + DeviceMessage deviceMessage = ((DeviceMessage) message); + TopicPayload msg = TopicMessageCodec.encode(mapper, deviceMessage); + if (null == msg) { + return Mono.empty(); + } + JSONObject data = new JSONObject(); + data.put("topic", msg.getTopic()); + data.put("message", msg.getPayload()); + + return Mono.just(DefaultWebSocketMessage.of(WebSocketMessage.Type.TEXT, Unpooled.wrappedBuffer(data.toJSONString().getBytes()))); + } + return Mono.empty(); + + }); + } + + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/cipher/Ciphers.java b/src/main/java/cn/flyrise/iot/protocol/cipher/Ciphers.java new file mode 100644 index 0000000..af8a4d2 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/cipher/Ciphers.java @@ -0,0 +1,56 @@ +package cn.flyrise.iot.protocol.cipher; + +import lombok.SneakyThrows; +import org.apache.commons.codec.binary.Base64; + +import javax.crypto.Cipher; +import javax.crypto.spec.SecretKeySpec; +import java.util.Optional; + +public enum Ciphers { + AES { + @SneakyThrows + public byte[] encrypt(byte[] src, String key) { + if (key == null || key.length() != 16) { + throw new IllegalArgumentException("illegal key"); + } + SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES"); + Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); + cipher.init(Cipher.ENCRYPT_MODE, skeySpec); + return cipher.doFinal(src); + } + + @SneakyThrows + public byte[] decrypt(byte[] src, String key) { + if (key == null || key.length() != 16) { + throw new IllegalArgumentException("illegal key"); + } + SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES"); + Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); + cipher.init(Cipher.DECRYPT_MODE, skeySpec); + return cipher.doFinal(src); + } + }; + + + public static Optional of(String name) { + try { + return Optional.of(valueOf(name.toUpperCase())); + } catch (Exception e) { + return Optional.empty(); + } + } + + public abstract byte[] encrypt(byte[] src, String key); + + public abstract byte[] decrypt(byte[] src, String key); + + String encryptBase64(String src, String key) { + return Base64.encodeBase64String(encrypt(src.getBytes(), key)); + } + + byte[] decryptBase64(String src, String key) { + return decrypt(Base64.decodeBase64(src), key); + } + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/config/MessageCodecConfig.java b/src/main/java/cn/flyrise/iot/protocol/config/MessageCodecConfig.java new file mode 100644 index 0000000..c287e93 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/config/MessageCodecConfig.java @@ -0,0 +1,17 @@ +package cn.flyrise.iot.protocol.config; + +/** + * 默认 + * 基础配置 + * @author zhangqiang + * @since 0.1 + */ +public class MessageCodecConfig { + + // 编码数据结构 true默认 false自定义 + public static final boolean ENCODE_MOLD = true; + + // 解码数据结构模 true默认 false自定义 + public static final boolean DECODE_MOLD = true; + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/config/ObjectMappers.java b/src/main/java/cn/flyrise/iot/protocol/config/ObjectMappers.java new file mode 100644 index 0000000..41d764a --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/config/ObjectMappers.java @@ -0,0 +1,27 @@ +package cn.flyrise.iot.protocol.config; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +public class ObjectMappers { + + public static final ObjectMapper JSON_MAPPER; + public static final ObjectMapper CBOR_MAPPER; + + static { + JSON_MAPPER = Jackson2ObjectMapperBuilder + .json() + .build() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + ; + CBOR_MAPPER = Jackson2ObjectMapperBuilder + .cbor() + .build() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/functional/FunctionalTopicHandlers.java b/src/main/java/cn/flyrise/iot/protocol/functional/FunctionalTopicHandlers.java new file mode 100644 index 0000000..a482f92 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/functional/FunctionalTopicHandlers.java @@ -0,0 +1,72 @@ +package cn.flyrise.iot.protocol.functional; + +import cn.flyrise.iot.protocol.topic.TopicPayload; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.utils.TopicUtils; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.function.Function; + +/** + * 功能性的topic,不和平台交互 + */ +public enum FunctionalTopicHandlers { + + //同步时间 + timeSync("/*/*/time-sync") { + @SneakyThrows + @SuppressWarnings("all") + Mono doHandle(DeviceOperator device, + String[] topic, + byte[] payload, + ObjectMapper mapper, + Function> sender) { + TopicPayload topicPayload = new TopicPayload(); + topicPayload.setTopic(String.join("/", topic) + "/reply"); + TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class); + TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis()); + topicPayload.setPayload(mapper.writeValueAsBytes(response)); + //直接回复给设备 + return sender + .apply(topicPayload) + .then(Mono.empty()); + } + }; + + FunctionalTopicHandlers(String topic) { + this.pattern = topic.split("/"); + } + + private final String[] pattern; + + abstract Publisher doHandle(DeviceOperator device, + String[] topic, + byte[] payload, + ObjectMapper mapper, + Function> sender); + + + public static Publisher handle(DeviceOperator device, + String[] topic, + byte[] payload, + ObjectMapper mapper, + Function> sender) { + return Mono + .justOrEmpty(fromTopic(topic)) + .flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender)); + } + + static Optional fromTopic(String[] topic) { + for (FunctionalTopicHandlers value : values()) { + if (TopicUtils.match(value.pattern, topic)) { + return Optional.of(value); + } + } + return Optional.empty(); + } +} diff --git a/src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncRequest.java b/src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncRequest.java new file mode 100644 index 0000000..4e29bec --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncRequest.java @@ -0,0 +1,10 @@ +package cn.flyrise.iot.protocol.functional; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class TimeSyncRequest { + private String messageId; +} diff --git a/src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncResponse.java b/src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncResponse.java new file mode 100644 index 0000000..94f001d --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncResponse.java @@ -0,0 +1,15 @@ +package cn.flyrise.iot.protocol.functional; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +public class TimeSyncResponse { + private String messageId; + private long timestamp; +} diff --git a/src/main/java/cn/flyrise/iot/protocol/topic/TopicMessage.java b/src/main/java/cn/flyrise/iot/protocol/topic/TopicMessage.java new file mode 100644 index 0000000..323da8e --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/topic/TopicMessage.java @@ -0,0 +1,20 @@ +package cn.flyrise.iot.protocol.topic; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +/** + * 消息实体 + * @author zhangqiang + * @since 0.1 + */ +@Getter +@Setter +@AllArgsConstructor +public class TopicMessage { + + private String topic; + + private Object message; +} diff --git a/src/main/java/cn/flyrise/iot/protocol/topic/TopicMessageCodec.java b/src/main/java/cn/flyrise/iot/protocol/topic/TopicMessageCodec.java new file mode 100644 index 0000000..f27ab46 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/topic/TopicMessageCodec.java @@ -0,0 +1,370 @@ +package cn.flyrise.iot.protocol.topic; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.core.message.*; +import org.jetlinks.core.message.event.EventMessage; +import org.jetlinks.core.message.firmware.*; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessageReply; +import org.jetlinks.core.message.property.*; +import org.jetlinks.core.message.state.DeviceStateCheckMessage; +import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; +import org.jetlinks.core.utils.TopicUtils; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +/** + * 标准编解码器 + * + * 包含:码库、编码、解码 + * 码库:产品标识+设备标识+码库特定义 + * 如:属性上报: 产品标识+设备标识+ /properties/report + * + * 注释: + * A.默认为标准物模型数据格式,这直接 payload.toJavaObject(type) 转换 + * B.非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 + * + * + *
+ *     下行Topic:
+ *          读取设备属性: /{productId}/{deviceId}/properties/read
+ *          修改设备属性: /{productId}/{deviceId}/properties/write
+ *          调用设备功能: /{productId}/{deviceId}/function/invoke
+ *
+ *          //网关设备
+ *          读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
+ *          修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
+ *          调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
+ *
+ *      上行Topic:
+ *          读取属性回复: /{productId}/{deviceId}/properties/read/reply
+ *          修改属性回复: /{productId}/{deviceId}/properties/write/reply
+ *          调用设备功能: /{productId}/{deviceId}/function/invoke/reply
+ *          上报设备事件: /{productId}/{deviceId}/event/{eventId}
+ *          上报设备属性: /{productId}/{deviceId}/properties/report
+ *          上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
+ *
+ *          //网关设备
+ *          子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
+ *          子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
+ *          读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
+ *          修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
+ *          调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
+ *          上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
+ *          上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
+ *
+ * 
+ * 基于jet links 的消息编解码器 + * @author zhangqiang + * @since 0.1 + */ +public enum TopicMessageCodec { + + //上报属性数据 + reportProperty("/*/properties/report", ReportPropertyMessage.class), + //事件上报 + event("/*/event/*", EventMessage.class) { + @Override + Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + String event = topic[topic.length - 1]; + + return Mono.from(super.doDecode(mapper, topic, payload)) + .cast(EventMessage.class) + .doOnNext(e -> e.setEvent(event)) + .cast(DeviceMessage.class); + } + + @Override + void refactorTopic(String[] topics, DeviceMessage message) { + super.refactorTopic(topics, message); + EventMessage event = ((EventMessage) message); + topics[topics.length - 1] = String.valueOf(event.getEvent()); + } + }, + //读取属性 + readProperty("/*/properties/read", ReadPropertyMessage.class), + //读取属性回复 + readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class), + //修改属性 + writeProperty("/*/properties/write", WritePropertyMessage.class), + //修改属性回复 + writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class), + //调用功能 + functionInvoke("/*/function/invoke", FunctionInvokeMessage.class), + //调用功能回复 + functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class), + //子设备消息 + child("/*/child/*/**", ChildDeviceMessage.class) { + @Override + public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); + _topic[0] = "";// topic以/开头所有第一位是空白 + return TopicMessageCodec + .decode(mapper, _topic, payload) + .map(childMsg -> { + ChildDeviceMessage msg = new ChildDeviceMessage(); + msg.setDeviceId(topic[1]); + msg.setChildDeviceMessage(childMsg); + msg.setTimestamp(childMsg.getTimestamp()); + msg.setMessageId(childMsg.getMessageId()); + return msg; + }); + } + + @Override + protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) { + ChildDeviceMessage deviceMessage = ((ChildDeviceMessage) message); + + DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage()); + + TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage); + String[] childTopic = payload.getTopic().split("/"); + String[] topic = new String[topics.length + childTopic.length - 3]; + //合并topic + System.arraycopy(topics, 0, topic, 0, topics.length - 1); + System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1); + + refactorTopic(topic, message); + payload.setTopic(String.join("/", topic)); + return payload; + + } + }, //子设备消息回复 + childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) { + @Override + public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); + _topic[0] = "";// topic以/开头所有第一位是空白 + return TopicMessageCodec + .decode(mapper, _topic, payload) + .map(childMsg -> { + ChildDeviceMessageReply msg = new ChildDeviceMessageReply(); + msg.setDeviceId(topic[1]); + msg.setChildDeviceMessage(childMsg); + msg.setTimestamp(childMsg.getTimestamp()); + msg.setMessageId(childMsg.getMessageId()); + return msg; + }); + } + + @Override + protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) { + ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message); + + DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage()); + + TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage); + String[] childTopic = payload.getTopic().split("/"); + String[] topic = new String[topics.length + childTopic.length - 3]; + //合并topic + System.arraycopy(topics, 0, topic, 0, topics.length - 1); + System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1); + + refactorTopic(topic, message); + payload.setTopic(String.join("/", topic)); + return payload; + + } + }, + //更新标签 + updateTag("/*/tags", UpdateTagMessage.class), + //注册 + register("/*/register", DeviceRegisterMessage.class), + //注销 + unregister("/*/unregister", DeviceUnRegisterMessage.class), + //更新固件消息 + upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class), + //更新固件升级进度消息 + upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class), + //拉取固件 + requestFirmware("/*/firmware/pull", RequestFirmwareMessage.class), + //拉取固件更新回复 + requestFirmwareReply("/*/firmware/pull/reply", RequestFirmwareMessageReply.class), + //上报固件版本 + reportFirmware("/*/firmware/report", ReportFirmwareMessage.class), + //读取固件回复 + readFirmware("/*/firmware/read", ReadFirmwareMessage.class), + //读取固件回复 + readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class), + //派生物模型上报 + derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class), + //透传设备消息 + direct("/*/direct", DirectDeviceMessage.class) { + @Override + public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + DirectDeviceMessage message = new DirectDeviceMessage(); + message.setDeviceId(topic[1]); + message.setPayload(payload); + return Mono.just(message); + } + }, + //断开连接消息 + disconnect("/*/disconnect", DisconnectDeviceMessage.class), + //断开连接回复 + disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class), + //上线 + connect("/*/online", DeviceOnlineMessage.class), + //离线 + offline("/*/offline", DeviceOfflineMessage.class), + //日志 + log("/*/log", DeviceLogMessage.class), + //状态检查 + stateCheck("/*/state-check", DeviceStateCheckMessage.class), + stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class), + ; + + /** + * 构造函数 + * @param topic + * @param type + */ + TopicMessageCodec(String topic, Class type) { + this.pattern = topic.split("/"); + this.type = type; + } + + // 主题 + private final String[] pattern; + // DeviceMessage 类型 + private final Class type; + + /** + * 标准解码 + * @param mapper + * @param topics + * @param payload + * @return + */ + public static Flux decode(ObjectMapper mapper, String[] topics, byte[] payload) { + return Mono + .justOrEmpty(fromTopic(topics)) + .flatMapMany(topicMessageCodec -> topicMessageCodec.doDecode(mapper, topics, payload)); + } + + /** + * 标准解码 + * @param mapper + * @param topic + * @param payload + * @return + */ + public static Flux decode(ObjectMapper mapper, String topic, byte[] payload) { + return decode(mapper, topic.split("/"), payload); + } + + /** + * 标准编码 + * @param mapper + * @param message + * @return + */ + public static TopicPayload encode(ObjectMapper mapper, DeviceMessage message) { + + return fromMessage(message) + .orElseThrow(() -> new UnsupportedOperationException("unsupported message:" + message.getMessageType())) + .doEncode(mapper, message); + } + + /** + * 根据主题topic确认 + * 是否为合法码库类型 + * @param topic + * @return + */ + static Optional fromTopic(String[] topic) { + for (TopicMessageCodec value : values()) { + if (TopicUtils.match(value.pattern, topic)) { + return Optional.of(value); + } + } + return Optional.empty(); + } + + /** + * 根据消息类型型确认 + * 是否为合法码库类型 + * @param message + * @return + */ + static Optional fromMessage(DeviceMessage message) { + for (TopicMessageCodec value : values()) { + if (value.type == message.getClass()) { + return Optional.of(value); + } + } + return Optional.empty(); + } + + /** + * 标准解码 + * @param mapper + * @param topic + * @param payload + * @return + */ + Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + return Mono + .fromCallable(() -> { + DeviceMessage message = mapper.readValue(payload, type); + FastBeanCopier.copy(Collections.singletonMap("deviceId", topic[1]), message); + + return message; + }); + } + + /** + * 标准编码 + * + * 默认数据结构 无需转换 平台什么数据格式就按什么数据格式编码 + * 如需自定义编码器则在重写doEncode方法即可 + * @param mapper + * @param topics + * @param message + * @return + */ + @SneakyThrows + TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) { + refactorTopic(topics, message); + return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message)); + } + + /** + * 标准编码 + * @param mapper + * @param message + * @return + */ + @SneakyThrows + TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) { + String[] topics = Arrays.copyOf(pattern, pattern.length); + return doEncode(mapper, topics, message); + } + + void refactorTopic(String[] topics, DeviceMessage message) { + topics[1] = message.getDeviceId(); + } + + /** + * 移除topic中的产品信息,topic第一个层为产品ID,在解码时,不需要此信息,所以需要移除之. + * + * @param topic topic + * @return 移除后的topic + */ + public static String[] removeProductPath(String topic) { + if (!topic.startsWith("/")) { + topic = "/" + topic; + } + String[] topicArr = topic.split("/"); + String[] topics = Arrays.copyOfRange(topicArr, 1, topicArr.length); + topics[0] = ""; + return topics; + } + +} diff --git a/src/main/java/cn/flyrise/iot/protocol/topic/TopicPayload.java b/src/main/java/cn/flyrise/iot/protocol/topic/TopicPayload.java new file mode 100644 index 0000000..b793912 --- /dev/null +++ b/src/main/java/cn/flyrise/iot/protocol/topic/TopicPayload.java @@ -0,0 +1,17 @@ +package cn.flyrise.iot.protocol.topic; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +public class TopicPayload { + + private String topic; + + private byte[] payload; +} diff --git a/src/main/java/org/jetlinks/pro/network/http/HttpAuthenticationRequest.java b/src/main/java/org/jetlinks/pro/network/http/HttpAuthenticationRequest.java new file mode 100644 index 0000000..5d5f31f --- /dev/null +++ b/src/main/java/org/jetlinks/pro/network/http/HttpAuthenticationRequest.java @@ -0,0 +1,24 @@ +package org.jetlinks.pro.network.http; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.jetlinks.core.device.AuthenticationRequest; +import org.jetlinks.core.message.codec.Transport; +import org.jetlinks.core.message.codec.http.HttpExchangeMessage; + +/** + * @author 张强 + * @since 1.0.0 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class HttpAuthenticationRequest implements AuthenticationRequest { + + private HttpExchangeMessage httpExchangeMessage; + + private Transport transport; +} diff --git a/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider b/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider new file mode 100644 index 0000000..6c56f51 --- /dev/null +++ b/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider @@ -0,0 +1 @@ +cn.flyrise.iot.protocol.PaiProtocolSupportProvider \ No newline at end of file