Browse Source

初始化

master
accpt27chen/zhangq 4 years ago
commit
319e00b1ce
  1. 200
      README.md
  2. 9
      pai-subsystem-protocol.iml
  3. 116
      pom.xml
  4. 153
      src/main/java/cn/flyrise/iot/protocol/PaiProtocolSupportProvider.java
  5. 97
      src/main/java/cn/flyrise/iot/protocol/agree/coap/AbstractCoapDeviceMessageCodec.java
  6. 137
      src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDTLSDeviceMessageCodec.java
  7. 72
      src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDeviceMessageCodec.java
  8. 77
      src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpAuthenticator.java
  9. 87
      src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpDeviceMessageCodec.java
  10. 66
      src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttAuthenticator.java
  11. 136
      src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttDeviceMessageCodec.java
  12. 95
      src/main/java/cn/flyrise/iot/protocol/agree/websocket/PaiWebsocketDeviceMessageCodec.java
  13. 56
      src/main/java/cn/flyrise/iot/protocol/cipher/Ciphers.java
  14. 17
      src/main/java/cn/flyrise/iot/protocol/config/MessageCodecConfig.java
  15. 27
      src/main/java/cn/flyrise/iot/protocol/config/ObjectMappers.java
  16. 72
      src/main/java/cn/flyrise/iot/protocol/functional/FunctionalTopicHandlers.java
  17. 10
      src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncRequest.java
  18. 15
      src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncResponse.java
  19. 20
      src/main/java/cn/flyrise/iot/protocol/topic/TopicMessage.java
  20. 370
      src/main/java/cn/flyrise/iot/protocol/topic/TopicMessageCodec.java
  21. 17
      src/main/java/cn/flyrise/iot/protocol/topic/TopicPayload.java
  22. 24
      src/main/java/org/jetlinks/pro/network/http/HttpAuthenticationRequest.java
  23. 1
      src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider

200
README.md

@ -0,0 +1,200 @@
# 标准消息编解码协议
终端设备可以根据此标准消息解码协议进行对接物联平台
## 标准协议Topic
```
* 标准编解码器
*
* 包含:码库、编码、解码
* 码库:产品标识+设备标识+码库特定义
* 如:属性上报: 产品标识+设备标识+ /properties/report
*
* 注释:
* A.默认为标准物模型数据格式,这直接 payload.toJavaObject(type) 转换
* B.非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换
*
*
* <pre>
* 下行Topic:
* 读取设备属性: /{productId}/{deviceId}/properties/read
* 修改设备属性: /{productId}/{deviceId}/properties/write
* 调用设备功能: /{productId}/{deviceId}/function/invoke
*
* //网关设备
* 读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
* 修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
* 调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
*
* 上行Topic:
* 读取属性回复: /{productId}/{deviceId}/properties/read/reply
* 修改属性回复: /{productId}/{deviceId}/properties/write/reply
* 调用设备功能: /{productId}/{deviceId}/function/invoke/reply
* 上报设备事件: /{productId}/{deviceId}/event/{eventId}
* 上报设备属性: /{productId}/{deviceId}/properties/report
* 上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
*
* //网关设备
* 子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
* 子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
* 读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
* 修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
* 调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
* 上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
* 上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
*
* </pre>
* 基于jet links 的消息编解码器
```
## 支持协议
* V1.0 支持MQTT
## 使用
使用命名`mvn package` 打包后, 通过jetlinks管理界面`设备管理-协议管理`中上传jar包.
类名填写: `cn.flyrise.iot.protocol.PaiProtocolSupportProvider`
## 扩展开发
pai-official-protocol工程也可以当作是标准协议骨架,可进行自定义编解码协议的开发
### Topic主题不变-只是进行数据转换
* 非标准物模型数据格式,则重写相应的doEncode、doDecode方法,进行标准物模型数据格式转换 (TopicMessageCodec)
```
//事件上报 - 转行标准数据
event("/*/event/*", EventMessage.class){
@Override
DeviceMessage doDecode(DeviceOperator device, String[] topic, JSONObject payload){
EventMessage eventMessage = payload.toJavaObject(EventMessage.class);
eventMessage.setEvent(topic[topic.length - 1]);
return eventMessage;
}
},
```
* 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec)
```
package cn.flyrise.iot.protocol.agree.mqtt;
import cn.flyrise.iot.protocol.topic.TopicMessage;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
/**
* MQTT 编解码器
* @author zhangqiang
* @since 0.1
*/
@Slf4j
public class FlyriseMqttDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
public FlyriseMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public FlyriseMqttDeviceMessageCodec() {
this(DefaultTransport.MQTT);
}
@Override
public Transport getSupportTransport() {
return transport;
}
/**
* MQTT 编码器
* @param context
* @return
*/
@Nonnull
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
Message message = context.getMessage();
return Mono.defer(() -> {
if (message instanceof DeviceMessage) {
// 断开连接操作
if (message instanceof DisconnectDeviceMessage) {
return ((ToDeviceMessageContext) context)
.disconnect()
.then(Mono.empty());
}
DeviceMessage deviceMessage = (DeviceMessage) message;
TopicMessage msg = TopicMessageCodec.encode(deviceMessage);
if (null == msg) {
return Mono.empty();
}
log.info("pai-official-protocol TopicMessage:{}",JSONObject.toJSONString(msg));
return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf)) //是否有产品信息
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage
.builder()
.clientId(deviceMessage.getDeviceId())
.topic("/".concat(productId).concat(msg.getTopic()))
.payloadType(MessagePayloadType.JSON)
.payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(msg.getMessage())))
.build());
}
return Mono.empty();
});
}
/**
* MQTT 解码器
* @param context
* @return
*/
@Nonnull
@Override
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
return Mono.fromSupplier(() -> {
// 转为mqtt消息
MqttMessage mqttMessage = (MqttMessage) context.getMessage();
// 消息主题
String topic = mqttMessage.getTopic();
// 消息体转为json
JSONObject payload = JSON.parseObject(mqttMessage.getPayload().toString(StandardCharsets.UTF_8));
DeviceOperator deviceOperator = context.getDevice();
return TopicMessageCodec.decode(deviceOperator,topic, payload);
});
}
}
```
### Topic主题变更
* 自己定义的Topoc 需要对应 标准协议Topic对应的物模型数据结构
```
//上报属性数据
reportProperty("/*/properties/report", ReportPropertyMessage.class){
// to do 自定义解码器 doDecode(DeviceOperator device, String topic, JSONObject payload)
},
```
* 对应协议编解码器也需要进行相应调整,如MQTT(FlyriseMqttDeviceMessageCodec)

9
pai-subsystem-protocol.iml

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

116
pom.xml

@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.flyrise.iot.protocol</groupId>
<artifactId>pai-official-protocol</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<netty.version>4.1.51.Final</netty.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${project.build.jdk}</source>
<target>${project.build.jdk}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-core</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
<version>2.2.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.8.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.8.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.2.5.RELEASE</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>http://nexus.hsweb.me/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
<repository>
<id>aliyun-nexus</id>
<name>aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>

153
src/main/java/cn/flyrise/iot/protocol/PaiProtocolSupportProvider.java

@ -0,0 +1,153 @@
package cn.flyrise.iot.protocol;
import cn.flyrise.iot.protocol.agree.coap.PaiCoapDTLSDeviceMessageCodec;
import cn.flyrise.iot.protocol.agree.coap.PaiCoapDeviceMessageCodec;
import cn.flyrise.iot.protocol.agree.http.PaiHttpAuthenticator;
import cn.flyrise.iot.protocol.agree.http.PaiHttpDeviceMessageCodec;
import cn.flyrise.iot.protocol.agree.mqtt.PaiMqttAuthenticator;
import cn.flyrise.iot.protocol.agree.mqtt.PaiMqttDeviceMessageCodec;
import cn.flyrise.iot.protocol.agree.websocket.PaiWebsocketDeviceMessageCodec;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import reactor.core.publisher.Mono;
/**
* 物联协议
* Flyrise-MQTT
* @author zhangqiang
* @since 0.1
*/
public class PaiProtocolSupportProvider implements ProtocolSupportProvider {
@Override
public void dispose() {
//协议卸载时执行
}
/**
* MQTT 配置信息
*/
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
"MQTT认证配置"
, "MQTT客户端clentId为设备id")
.add("username", "username", "MQTT用户名", StringType.GLOBAL)
.add("password", "password", "MQTT密码", PasswordType.GLOBAL)
.add("clientid", "clientid", "MQTT客户端ID", StringType.GLOBAL)
.add("host", "host", "设备host地址", StringType.GLOBAL);
/**
* COAP 配置信息
*/
private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
/**
* COAPDTLS 配置信息
*/
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
/**
* MQTT 配置信息
*/
private static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置"
, "设备请求Authorization为设备id")
.add("authorization", "authorization", "授权信息", StringType.GLOBAL);
@Override
public Mono<? extends ProtocolSupport> create(ServiceContext context) {
CompositeProtocolSupport support = new CompositeProtocolSupport();
// 编解码协议基础信息
support.setId("pai-official-protocol");
support.setName("pai-official-protocol");
support.setDescription("pai-official-protocol Version 1.0");
// 固定为JetLinksDeviceMetadataCodec
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
// 如下配置支持的协议(agree目录下开发的协议) 支持协议: MQTT 、COAP 、COAPDTLS 、HTTP
// MQTT ===========================================================================================================
{
// MQTT认证策略
support.addAuthenticator(DefaultTransport.MQTT, new PaiMqttAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new PaiMqttAuthenticator());
// MQTT需要的配置信息
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig);
// MQTT消息解码器
support.addMessageCodecSupport(new PaiMqttDeviceMessageCodec(DefaultTransport.MQTT));
support.addMessageCodecSupport(new PaiMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
}
// COAP ===========================================================================================================
{
// COAP认证策略 (COAP使用数据加解密认证,CoAP DTLS使用签名认证)
// COAP需要的配置信息
support.addConfigMetadata(DefaultTransport.CoAP, coapConfig);
support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig);
// COAP消息解码器
support.addMessageCodecSupport(new PaiCoapDeviceMessageCodec());
support.addMessageCodecSupport(new PaiCoapDTLSDeviceMessageCodec());
}
// HTTP ===========================================================================================================
{
// HTTP认证策略
support.addAuthenticator(DefaultTransport.HTTP, new PaiHttpAuthenticator());
support.addAuthenticator(DefaultTransport.HTTPS, new PaiHttpAuthenticator());
// HTTP需要的配置信息
support.addConfigMetadata(DefaultTransport.HTTP, httpConfig);
// HTTP消息解码器
support.addMessageCodecSupport(new PaiHttpDeviceMessageCodec(DefaultTransport.HTTP));
support.addMessageCodecSupport(new PaiHttpDeviceMessageCodec(DefaultTransport.HTTPS));
}
// TCP ===========================================================================================================
{
// TCP认证策略
// TCP需要的配置信息
// TCP消息解码器
}
// UDP ===========================================================================================================
{
// UDP认证策略
// UDP需要的配置信息
// UDP消息解码器
}
// WebSocket ======================================================================================================
{
// WebSocket认证策略
// WebSocket需要的配置信息
// WebSocket消息解码器
support.addMessageCodecSupport(new PaiWebsocketDeviceMessageCodec());
}
return Mono.just(support);
}
}

97
src/main/java/cn/flyrise/iot/protocol/agree/coap/AbstractCoapDeviceMessageCodec.java

@ -0,0 +1,97 @@
package cn.flyrise.iot.protocol.agree.coap;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* COAP 默认编解码器
*
* 目前COAP只支持上行所以只需要解码处理
*/
@Slf4j
public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec {
protected abstract Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response);
protected String getPath(CoapMessage message){
String path = message.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
}
return path;
}
protected String getDeviceId(CoapMessage message){
String deviceId = message.getStringOption(2100).orElse(null);
String[] paths = TopicMessageCodec.removeProductPath(getPath(message));
if (StringUtils.isEmpty(deviceId) && paths.length > 1) {
deviceId = paths[1];
}
return deviceId;
}
/**
* 解码
* @param context
* @return
*/
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
AtomicBoolean alreadyReply = new AtomicBoolean();
Consumer<Object> responseHandler = (resp) -> {
if (alreadyReply.compareAndSet(false, true)) {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
if (resp instanceof byte[]) {
exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp));
}
}
};
return this
.decode(exchangeMessage, context, responseHandler)
.doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST);
})
.switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST)));
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
/**
* 编码
* @param context
* @return
*/
@Nonnull
@Override
public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

137
src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDTLSDeviceMessageCodec.java

@ -0,0 +1,137 @@
package cn.flyrise.iot.protocol.agree.coap;
import cn.flyrise.iot.protocol.functional.FunctionalTopicHandlers;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import cn.flyrise.iot.protocol.config.ObjectMappers;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public class PaiCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP_DTLS;
}
/**
* 解码
* @param message
* @param context
* @param response
* @return
*/
public Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
if (context.getDevice() == null) {
return Flux.empty();
}
return Flux.defer(() -> {
String path = getPath(message);
String deviceId = getDeviceId(message);
String sign = message.getStringOption(2110).orElse(null);
String token = message.getStringOption(2111).orElse(null);
byte[] payload = message.payloadAsBytes();
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
if (StringUtils.isEmpty(deviceId)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
// TODO: 2021/7/30 移到 FunctionalTopicHandlers
if (path.endsWith("/request-token")) {
//认证
return context
.getDevice(deviceId)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMap(device -> device
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, deviceId, payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return device
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
}))
.then(Mono.empty());
}
if (StringUtils.isEmpty(token)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getSelfConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码,可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}))
.doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
response.accept(CoAP.ResponseCode.BAD_REQUEST);
});
});
}
protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) {
//验证签名
byte[] secureKeyBytes = secureKey.getBytes();
byte[] signPayload = Arrays.copyOf(payload, payload.length + secureKeyBytes.length);
System.arraycopy(secureKeyBytes, 0, signPayload, 0, secureKeyBytes.length);
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(signPayload).equalsIgnoreCase(sign)) {
log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
return false;
}
return true;
}
}

72
src/main/java/cn/flyrise/iot/protocol/agree/coap/PaiCoapDeviceMessageCodec.java

@ -0,0 +1,72 @@
package cn.flyrise.iot.protocol.agree.coap;
import cn.flyrise.iot.protocol.functional.FunctionalTopicHandlers;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import cn.flyrise.iot.protocol.cipher.Ciphers;
import cn.flyrise.iot.protocol.config.ObjectMappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.jetlinks.core.Value;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
@Slf4j
public class PaiCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP;
}
/**
* 解码
* @param message
* @param context
* @param response
* @return
*/
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
String path = getPath(message);
String deviceId = getDeviceId(message);
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey);
//解码
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码,可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}));
}
}

77
src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpAuthenticator.java

@ -0,0 +1,77 @@
package cn.flyrise.iot.protocol.agree.http;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import org.jetlinks.core.Value;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.pro.network.http.HttpAuthenticationRequest;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* HTTP 认证授权
* @author zhangqiang
* @since 0.1
*/
public class PaiHttpAuthenticator implements Authenticator {
/**
* 在网络连接建立的时候,可能无法获取设备的标识(:http,websocket等),则会调用此方法来进行认证.
* 注意: 认证通过后,需要设置设备ID.{@link AuthenticationResponse#success(String)}
* @param request 认证请求
* @param registry 设备注册中心
* @return 认证结果
*/
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (request instanceof HttpAuthenticationRequest) {
HttpAuthenticationRequest http = ((HttpAuthenticationRequest) request);
HttpExchangeMessage message = http.getHttpExchangeMessage();
String[] topics = TopicMessageCodec.removeProductPath(message.getPath());
return registry
.getDevice(topics[1])
.flatMap(device -> authenticate(request, device));
}
return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
}
/**
* 对指定对设备进行认证
*
* @param request 认证请求
* @param deviceOperation 设备
* @return 认证结果
*/
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
if (request instanceof HttpAuthenticationRequest) {
HttpAuthenticationRequest http = ((HttpAuthenticationRequest) request);
HttpExchangeMessage message = http.getHttpExchangeMessage();
String[] topics = TopicMessageCodec.removeProductPath(message.getPath());
if(!deviceOperation.getDeviceId().equals(topics[1]))return Mono.just(AuthenticationResponse.error(400, "请求设备deviceId不存在"));
return deviceOperation.getConfigs("authorization")
.flatMap(values -> {
Header header_authorization = message.getHeader("Authorization").orElse(null);
if(header_authorization != null){
String[] authorizationValues = header_authorization.getValue();
String auth = values.getValue("authorization").map(Value::asString).orElse(null);
if(authorizationValues!=null
&& authorizationValues.length>0
&& authorizationValues[0].equals(auth))return Mono.just(AuthenticationResponse.success());
}
return Mono.just(AuthenticationResponse.error(400, "Authorization错误"));
});
}
return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
}
}

87
src/main/java/cn/flyrise/iot/protocol/agree/http/PaiHttpDeviceMessageCodec.java

@ -0,0 +1,87 @@
package cn.flyrise.iot.protocol.agree.http;
import cn.flyrise.iot.protocol.config.ObjectMappers;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.AllArgsConstructor;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
@AllArgsConstructor
public class PaiHttpDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
private final ObjectMapper mapper;
public PaiHttpDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public PaiHttpDeviceMessageCodec() {
this(DefaultTransport.HTTP);
}
@Override
public Transport getSupportTransport() {
return transport;
}
/**
* 解码
* @param context
* @return
*/
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
// 消息解码
return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getPath()), message.payloadAsBytes())
.switchIfEmpty(Mono.defer(() -> {
//未转换成功,响应404
return message.response(SimpleHttpResponseMessage
.builder()
.status(404)
.contentType(MediaType.APPLICATION_JSON)
.payload(Unpooled.wrappedBuffer("{\"success\":false}".getBytes()))
.build()).then(Mono.empty());
}))
.flatMap(msg -> {
//响应成功
return message.response(SimpleHttpResponseMessage
.builder()
.status(200)
.contentType(MediaType.APPLICATION_JSON)
.payload(Unpooled.wrappedBuffer("{\"success\":true}".getBytes()))
.build())
.thenReturn(msg);
});
}
/**
* 编码
* @param context
* @return
*/
public Mono<EncodedMessage> encode(MessageEncodeContext context) {
return context
.reply(((RepayableDeviceMessage<?>) context.getMessage()).newReply().success())
.then(Mono.empty());
}
}

66
src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttAuthenticator.java

@ -0,0 +1,66 @@
package cn.flyrise.iot.protocol.agree.mqtt;
import org.jetlinks.core.Value;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
/**
* MQTT 认证授权
* @author zhangqiang
* @since 0.1
*/
public class PaiMqttAuthenticator implements Authenticator {
/**
* 在MQTT服务网关中指定了认证协议时,将调用此方法进行认证
* 注意: 认证通过后,需要设置设备ID.{@link AuthenticationResponse#success(String)}
* @param request 认证请求
* @param registry 设备注册中心
* @return 认证结果
*/
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (request instanceof MqttAuthenticationRequest) {
MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
return registry
.getDevice(mqtt.getClientId())
.flatMap(device -> authenticate(request, device));
}
return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
}
/**
* 对指定对设备进行认证
*
* @param request 认证请求
* @param deviceOperation 设备
* @return 认证结果
*/
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
if (request instanceof MqttAuthenticationRequest) {
MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
if(!mqtt.getClientId().equals(deviceOperation.getDeviceId())) return Mono.just(AuthenticationResponse.error(400, "客户端clientId与设备deviceId不一致"));
return deviceOperation.getConfigs("username", "password")
.flatMap(values -> {
String username = values.getValue("username").map(Value::asString).orElse(null);
String password = values.getValue("password").map(Value::asString).orElse(null);
if (mqtt.getUsername().equals(username) && mqtt
.getPassword()
.equals(password)) {
return Mono.just(AuthenticationResponse.success());
} else {
return Mono.just(AuthenticationResponse.error(400, "账号或密码错误"));
}
});
}
return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
}
}

136
src/main/java/cn/flyrise/iot/protocol/agree/mqtt/PaiMqttDeviceMessageCodec.java

@ -0,0 +1,136 @@
package cn.flyrise.iot.protocol.agree.mqtt;
import cn.flyrise.iot.protocol.functional.FunctionalTopicHandlers;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import cn.flyrise.iot.protocol.topic.TopicPayload;
import cn.flyrise.iot.protocol.config.ObjectMappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
/**
* MQTT 编解码器
* @author zhangqiang
* @since 0.1
*/
@Slf4j
public class PaiMqttDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
private final ObjectMapper mapper;
public PaiMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public PaiMqttDeviceMessageCodec() {
this(DefaultTransport.MQTT);
}
@Override
public Transport getSupportTransport() {
return transport;
}
/**
* 编码
* @param context
* @return
*/
@Nonnull
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.defer(() -> {
Message message = context.getMessage();
if (message instanceof DisconnectDeviceMessage) {
return ((ToDeviceMessageContext) context)
.disconnect()
.then(Mono.empty());
}
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage
.builder()
.clientId(deviceMessage.getDeviceId())
.topic("/".concat(productId).concat(convertResult.getTopic()))
.payloadType(MessagePayloadType.JSON)
.payload(Unpooled.wrappedBuffer(convertResult.getPayload()))
.build());
} else {
return Mono.empty();
}
});
}
/**
* 解码
* @param context
* @return
*/
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
MqttMessage message = (MqttMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
//如果不能直接解码,可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(context.getDevice(),
message.getTopic().split("/"),
payload,
mapper,
reply -> doReply(context, reply)))
;
}
private Mono<Void> doReply(MessageCodecContext context, TopicPayload reply) {
if (context instanceof FromDeviceMessageContext) {
return ((FromDeviceMessageContext) context)
.getSession()
.send(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
} else if (context instanceof ToDeviceMessageContext) {
return ((ToDeviceMessageContext) context)
.sendToDevice(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
}
return Mono.empty();
}
}

95
src/main/java/cn/flyrise/iot/protocol/agree/websocket/PaiWebsocketDeviceMessageCodec.java

@ -0,0 +1,95 @@
package cn.flyrise.iot.protocol.agree.websocket;
import cn.flyrise.iot.protocol.config.ObjectMappers;
import cn.flyrise.iot.protocol.topic.TopicMessageCodec;
import cn.flyrise.iot.protocol.topic.TopicPayload;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSession;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
public class PaiWebsocketDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
private final ObjectMapper mapper;
public PaiWebsocketDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public PaiWebsocketDeviceMessageCodec() {
this(DefaultTransport.WebSocket);
}
@Override
public Transport getSupportTransport() {
return transport;
}
/**
* 解码
* @param context
* @return
*/
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
WebSocketSessionMessage mqttMessage = (WebSocketSessionMessage) context.getMessage();
WebSocketSession session = mqttMessage.getWebSocketSession();
byte[] payload = mqttMessage.payloadAsBytes();
return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(session.getUri()), payload)
.switchIfEmpty(Mono.defer(() -> {
//未转换成功,响应404
return session
.send(session.textMessage("{\"status\":404}"))
.then(Mono.empty());
}));
}
public Mono<EncodedMessage> encode(MessageEncodeContext context) {
return Mono.defer(() -> {
Message message = context.getMessage();
if (message instanceof DisconnectDeviceMessage) {
return ((ToDeviceMessageContext) context)
.disconnect()
.then(Mono.empty());
}
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
TopicPayload msg = TopicMessageCodec.encode(mapper, deviceMessage);
if (null == msg) {
return Mono.empty();
}
JSONObject data = new JSONObject();
data.put("topic", msg.getTopic());
data.put("message", msg.getPayload());
return Mono.just(DefaultWebSocketMessage.of(WebSocketMessage.Type.TEXT, Unpooled.wrappedBuffer(data.toJSONString().getBytes())));
}
return Mono.empty();
});
}
}

56
src/main/java/cn/flyrise/iot/protocol/cipher/Ciphers.java

@ -0,0 +1,56 @@
package cn.flyrise.iot.protocol.cipher;
import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.util.Optional;
public enum Ciphers {
AES {
@SneakyThrows
public byte[] encrypt(byte[] src, String key) {
if (key == null || key.length() != 16) {
throw new IllegalArgumentException("illegal key");
}
SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, skeySpec);
return cipher.doFinal(src);
}
@SneakyThrows
public byte[] decrypt(byte[] src, String key) {
if (key == null || key.length() != 16) {
throw new IllegalArgumentException("illegal key");
}
SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, skeySpec);
return cipher.doFinal(src);
}
};
public static Optional<Ciphers> of(String name) {
try {
return Optional.of(valueOf(name.toUpperCase()));
} catch (Exception e) {
return Optional.empty();
}
}
public abstract byte[] encrypt(byte[] src, String key);
public abstract byte[] decrypt(byte[] src, String key);
String encryptBase64(String src, String key) {
return Base64.encodeBase64String(encrypt(src.getBytes(), key));
}
byte[] decryptBase64(String src, String key) {
return decrypt(Base64.decodeBase64(src), key);
}
}

17
src/main/java/cn/flyrise/iot/protocol/config/MessageCodecConfig.java

@ -0,0 +1,17 @@
package cn.flyrise.iot.protocol.config;
/**
* 默认
* 基础配置
* @author zhangqiang
* @since 0.1
*/
public class MessageCodecConfig {
// 编码数据结构 true默认 false自定义
public static final boolean ENCODE_MOLD = true;
// 解码数据结构模 true默认 false自定义
public static final boolean DECODE_MOLD = true;
}

27
src/main/java/cn/flyrise/iot/protocol/config/ObjectMappers.java

@ -0,0 +1,27 @@
package cn.flyrise.iot.protocol.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
public class ObjectMappers {
public static final ObjectMapper JSON_MAPPER;
public static final ObjectMapper CBOR_MAPPER;
static {
JSON_MAPPER = Jackson2ObjectMapperBuilder
.json()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
;
CBOR_MAPPER = Jackson2ObjectMapperBuilder
.cbor()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
}

72
src/main/java/cn/flyrise/iot/protocol/functional/FunctionalTopicHandlers.java

@ -0,0 +1,72 @@
package cn.flyrise.iot.protocol.functional;
import cn.flyrise.iot.protocol.topic.TopicPayload;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.function.Function;
/**
* 功能性的topic,不和平台交互
*/
public enum FunctionalTopicHandlers {
//同步时间
timeSync("/*/*/time-sync") {
@SneakyThrows
@SuppressWarnings("all")
Mono<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
TopicPayload topicPayload = new TopicPayload();
topicPayload.setTopic(String.join("/", topic) + "/reply");
TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class);
TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis());
topicPayload.setPayload(mapper.writeValueAsBytes(response));
//直接回复给设备
return sender
.apply(topicPayload)
.then(Mono.empty());
}
};
FunctionalTopicHandlers(String topic) {
this.pattern = topic.split("/");
}
private final String[] pattern;
abstract Publisher<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender);
public static Publisher<DeviceMessage> handle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
return Mono
.justOrEmpty(fromTopic(topic))
.flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender));
}
static Optional<FunctionalTopicHandlers> fromTopic(String[] topic) {
for (FunctionalTopicHandlers value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
}

10
src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncRequest.java

@ -0,0 +1,10 @@
package cn.flyrise.iot.protocol.functional;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TimeSyncRequest {
private String messageId;
}

15
src/main/java/cn/flyrise/iot/protocol/functional/TimeSyncResponse.java

@ -0,0 +1,15 @@
package cn.flyrise.iot.protocol.functional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TimeSyncResponse {
private String messageId;
private long timestamp;
}

20
src/main/java/cn/flyrise/iot/protocol/topic/TopicMessage.java

@ -0,0 +1,20 @@
package cn.flyrise.iot.protocol.topic;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
/**
* 消息实体
* @author zhangqiang
* @since 0.1
*/
@Getter
@Setter
@AllArgsConstructor
public class TopicMessage {
private String topic;
private Object message;
}

370
src/main/java/cn/flyrise/iot/protocol/topic/TopicMessageCodec.java

@ -0,0 +1,370 @@
package cn.flyrise.iot.protocol.topic;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
/**
* 标准编解码器
*
* 包含码库编码解码
* 码库产品标识+设备标识+码库特定义
* 属性上报 产品标识+设备标识+ /properties/report
*
* 注释
* A.默认为标准物模型数据格式这直接 payload.toJavaObject(type) 转换
* B.非标准物模型数据格式则重写相应的doEncodedoDecode方法进行标准物模型数据格式转换
*
*
* <pre>
* 下行Topic:
* 读取设备属性: /{productId}/{deviceId}/properties/read
* 修改设备属性: /{productId}/{deviceId}/properties/write
* 调用设备功能: /{productId}/{deviceId}/function/invoke
*
* //网关设备
* 读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
* 修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
* 调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
*
* 上行Topic:
* 读取属性回复: /{productId}/{deviceId}/properties/read/reply
* 修改属性回复: /{productId}/{deviceId}/properties/write/reply
* 调用设备功能: /{productId}/{deviceId}/function/invoke/reply
* 上报设备事件: /{productId}/{deviceId}/event/{eventId}
* 上报设备属性: /{productId}/{deviceId}/properties/report
* 上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
*
* //网关设备
* 子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
* 子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
* 读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
* 修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
* 调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
* 上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
* 上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
*
* </pre>
* 基于jet links 的消息编解码器
* @author zhangqiang
* @since 0.1
*/
public enum TopicMessageCodec {
//上报属性数据
reportProperty("/*/properties/report", ReportPropertyMessage.class),
//事件上报
event("/*/event/*", EventMessage.class) {
@Override
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String event = topic[topic.length - 1];
return Mono.from(super.doDecode(mapper, topic, payload))
.cast(EventMessage.class)
.doOnNext(e -> e.setEvent(event))
.cast(DeviceMessage.class);
}
@Override
void refactorTopic(String[] topics, DeviceMessage message) {
super.refactorTopic(topics, message);
EventMessage event = ((EventMessage) message);
topics[topics.length - 1] = String.valueOf(event.getEvent());
}
},
//读取属性
readProperty("/*/properties/read", ReadPropertyMessage.class),
//读取属性回复
readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class),
//修改属性
writeProperty("/*/properties/write", WritePropertyMessage.class),
//修改属性回复
writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class),
//调用功能
functionInvoke("/*/function/invoke", FunctionInvokeMessage.class),
//调用功能回复
functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class),
//子设备消息
child("/*/child/*/**", ChildDeviceMessage.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessage msg = new ChildDeviceMessage();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessage deviceMessage = ((ChildDeviceMessage) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
}
}, //子设备消息回复
childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessageReply msg = new ChildDeviceMessageReply();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
}
},
//更新标签
updateTag("/*/tags", UpdateTagMessage.class),
//注册
register("/*/register", DeviceRegisterMessage.class),
//注销
unregister("/*/unregister", DeviceUnRegisterMessage.class),
//更新固件消息
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
//更新固件升级进度消息
upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
//拉取固件
requestFirmware("/*/firmware/pull", RequestFirmwareMessage.class),
//拉取固件更新回复
requestFirmwareReply("/*/firmware/pull/reply", RequestFirmwareMessageReply.class),
//上报固件版本
reportFirmware("/*/firmware/report", ReportFirmwareMessage.class),
//读取固件回复
readFirmware("/*/firmware/read", ReadFirmwareMessage.class),
//读取固件回复
readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class),
//派生物模型上报
derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class),
//透传设备消息
direct("/*/direct", DirectDeviceMessage.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
DirectDeviceMessage message = new DirectDeviceMessage();
message.setDeviceId(topic[1]);
message.setPayload(payload);
return Mono.just(message);
}
},
//断开连接消息
disconnect("/*/disconnect", DisconnectDeviceMessage.class),
//断开连接回复
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
//上线
connect("/*/online", DeviceOnlineMessage.class),
//离线
offline("/*/offline", DeviceOfflineMessage.class),
//日志
log("/*/log", DeviceLogMessage.class),
//状态检查
stateCheck("/*/state-check", DeviceStateCheckMessage.class),
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
;
/**
* 构造函数
* @param topic
* @param type
*/
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
}
// 主题
private final String[] pattern;
// DeviceMessage 类型
private final Class<? extends DeviceMessage> type;
/**
* 标准解码
* @param mapper
* @param topics
* @param payload
* @return
*/
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
return Mono
.justOrEmpty(fromTopic(topics))
.flatMapMany(topicMessageCodec -> topicMessageCodec.doDecode(mapper, topics, payload));
}
/**
* 标准解码
* @param mapper
* @param topic
* @param payload
* @return
*/
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String topic, byte[] payload) {
return decode(mapper, topic.split("/"), payload);
}
/**
* 标准编码
* @param mapper
* @param message
* @return
*/
public static TopicPayload encode(ObjectMapper mapper, DeviceMessage message) {
return fromMessage(message)
.orElseThrow(() -> new UnsupportedOperationException("unsupported message:" + message.getMessageType()))
.doEncode(mapper, message);
}
/**
* 根据主题topic确认
* 是否为合法码库类型
* @param topic
* @return
*/
static Optional<TopicMessageCodec> fromTopic(String[] topic) {
for (TopicMessageCodec value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
/**
* 根据消息类型型确认
* 是否为合法码库类型
* @param message
* @return
*/
static Optional<TopicMessageCodec> fromMessage(DeviceMessage message) {
for (TopicMessageCodec value : values()) {
if (value.type == message.getClass()) {
return Optional.of(value);
}
}
return Optional.empty();
}
/**
* 标准解码
* @param mapper
* @param topic
* @param payload
* @return
*/
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
return Mono
.fromCallable(() -> {
DeviceMessage message = mapper.readValue(payload, type);
FastBeanCopier.copy(Collections.singletonMap("deviceId", topic[1]), message);
return message;
});
}
/**
* 标准编码
*
* 默认数据结构 无需转换 平台什么数据格式就按什么数据格式编码
* 如需自定义编码器则在重写doEncode方法即可
* @param mapper
* @param topics
* @param message
* @return
*/
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
refactorTopic(topics, message);
return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message));
}
/**
* 标准编码
* @param mapper
* @param message
* @return
*/
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) {
String[] topics = Arrays.copyOf(pattern, pattern.length);
return doEncode(mapper, topics, message);
}
void refactorTopic(String[] topics, DeviceMessage message) {
topics[1] = message.getDeviceId();
}
/**
* 移除topic中的产品信息,topic第一个层为产品ID在解码时,不需要此信息,所以需要移除之.
*
* @param topic topic
* @return 移除后的topic
*/
public static String[] removeProductPath(String topic) {
if (!topic.startsWith("/")) {
topic = "/" + topic;
}
String[] topicArr = topic.split("/");
String[] topics = Arrays.copyOfRange(topicArr, 1, topicArr.length);
topics[0] = "";
return topics;
}
}

17
src/main/java/cn/flyrise/iot/protocol/topic/TopicPayload.java

@ -0,0 +1,17 @@
package cn.flyrise.iot.protocol.topic;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TopicPayload {
private String topic;
private byte[] payload;
}

24
src/main/java/org/jetlinks/pro/network/http/HttpAuthenticationRequest.java

@ -0,0 +1,24 @@
package org.jetlinks.pro.network.http;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.jetlinks.core.device.AuthenticationRequest;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
/**
* @author 张强
* @since 1.0.0
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class HttpAuthenticationRequest implements AuthenticationRequest {
private HttpExchangeMessage httpExchangeMessage;
private Transport transport;
}

1
src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider

@ -0,0 +1 @@
cn.flyrise.iot.protocol.PaiProtocolSupportProvider
Loading…
Cancel
Save