Browse Source

MQTT配置,告警初步新增接口

hanzhengyi 1 year ago
parent
commit
6b7f924b4f

+ 18 - 1
service-iot/service-iot-biz/pom.xml

@@ -37,7 +37,24 @@
             <artifactId>aliyun-java-sdk-dysmsapi</artifactId>
             <version>1.1.0</version>
         </dependency>
-        
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+            <version>2.5.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-stream</artifactId>
+            <version>5.5.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+            <version>5.5.5</version>
+        </dependency>
 
     </dependencies>
 

+ 11 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/controller/web/BaseAlarmController.java

@@ -4,6 +4,7 @@ package com.usky.iot.controller.web;
 import com.usky.common.core.bean.ApiResult;
 import com.usky.common.core.bean.CommonPage;
 import com.usky.iot.domain.BaseAlarm;
+import com.usky.iot.domain.BaseGgpFacility;
 import com.usky.iot.service.BaseAlarmService;
 import com.usky.iot.service.vo.BaseAlarmListVO;
 import com.usky.iot.service.vo.BaseAlarmRequestVO;
@@ -58,5 +59,15 @@ public class BaseAlarmController {
         return ApiResult.success();
     }
 
+    /**
+     * 新增
+     * @param baseAlarm
+     * @return
+     */
+    @PostMapping("/alarmInfo")
+    public ApiResult<Void> add(@RequestBody BaseAlarm baseAlarm){
+        baseAlarmService.add(baseAlarm);
+        return ApiResult.success();
+    }
 }
 

+ 3 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/BaseAlarmService.java

@@ -3,6 +3,7 @@ package com.usky.iot.service;
 import com.usky.common.core.bean.CommonPage;
 import com.usky.iot.domain.BaseAlarm;
 import com.usky.common.mybatis.core.CrudService;
+import com.usky.iot.domain.BaseGgpFacility;
 import com.usky.iot.service.vo.BaseAlarmListVO;
 import com.usky.iot.service.vo.BaseAlarmRequestVO;
 import com.usky.iot.service.vo.BaseAlarmResponeVO;
@@ -23,4 +24,6 @@ public interface BaseAlarmService extends CrudService<BaseAlarm> {
     CommonPage<BaseAlarm> page(BaseAlarmListVO baseAlarmListVO);
 
     void update(BaseAlarm baseAlarm);
+
+    boolean add(BaseAlarm baseAlarm);
 }

+ 50 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,50 @@
+package com.usky.iot.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+	@Value("${mqtt.username}")
+	private String username;
+
+	@Value("${mqtt.password}")
+	private String password;
+
+	@Value("${mqtt.url}")
+	private String hostUrl;
+
+	@Value("${mqtt.sub-topics}")
+	private String msgTopic;
+
+	@Value("${mqtt.keep-alive-interval}")
+	//心跳间隔
+	private int keepAliveInterval;
+	@Value("${mqtt.completionTimeout}")
+	//心跳间隔
+	private int completionTimeout;
+
+
+	@Bean
+	public MqttPahoClientFactory mqttClientFactory() {
+		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+		MqttConnectOptions options = new MqttConnectOptions();
+		options.setServerURIs(new String[]{this.getHostUrl()});
+		options.setUserName(this.getUsername());
+		options.setPassword(this.getPassword().toCharArray());
+		factory.setConnectionOptions(options);
+		return factory;
+	}
+
+}

+ 84 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,84 @@
+package com.usky.iot.service.config.mqtt;
+
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttOutConfig {
+
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+
+    /**
+     * 连接通道
+     *
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     *
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "h-backend-mqtt-out-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         *
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         *
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         *
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 86 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/enums/TopListener.java

@@ -0,0 +1,86 @@
+package com.usky.iot.service.enums;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/12/31 11:11
+ */
+public enum TopListener {
+
+    /**
+     * 城运对接
+     */
+    MH_WATER_INFO("fireInfo","mh/water/info",1),
+    MH_WATER_ALERT("fireInfo","mh/water/alert",1),
+    MH_WATER_STATISTICS("fireInfo","mh/water/statistics",1),
+    DEVICE_INFO("fireInfo","device/info",1),
+    DEVICE_ALERT("fireInfo","device/alert",1),
+    DEVICE_DETAIL("fireInfo","device/detail",1),
+    DEVICE_AJ("fireInfo","device/aj",1),
+
+    /**
+     * 全部设备对接
+     */
+    FIRE_INFO("fireInfo","/usky/ytDP0001/+/+/info",1),
+    FIRE_ALERT("fireAlarm","/usky/ytDP0001/+/+/alarm",1),
+    WATER_INFO("waterInfo","/usky/ytDP0002/+/+/info",1),
+    WATER_ALERT("waterAlert","/usky/ytDP0002/+/+/alarm",1),
+    SMOKE_INFO("smokeInfo","/usky/ytDP0003/+/+/info",1),
+    SMOKE_ALERT("smokeAlarm","/usky/ytDP0003/+/+/alarm",1),
+    LIQUID_INFO("waterInfo","/usky/ytDP0005/+/+/info",1),
+    LIQUID_ALERT("waterAlert","/usky/ytDP0005/+/+/alarm",1),
+    RTU_INFO("rtuinfo","/usky/ytDP0006/+/+/info",1),
+    RTU_ALERT("rtuAlert","/usky/ytDP0006/+/+/alarm",1),
+    ELECTRICAL_INFO("electricalInfo","/usky/ytDP0007/+/+/info",1),
+    ELECTRICAL_ALERT("electricalAlarm","/usky/ytDP0007/+/+/alarm",1),
+    MH_COVER_INFO("waterInfo","/usky/ytDP0008/+/+/info",1),
+    MH_COVER_ALERT("waterAlert","/usky/ytDP0008/+/+/alarm",1),
+    VIDEO_ALERT("videoAlert","/usky/ytCamCore/+/+/alarm",1),
+    LR_SMOKE_INFO("smokeInfo","/usky/ytDP00033/+/+/info",1),
+    LR_SMOKE_ALERT("smokeAlarm","/usky/ytDP00033/+/+/alarm",1);
+
+
+
+
+    private String name;
+    private String code;
+    //0发送队列,1监听队列2都是
+    private Integer type;
+
+    TopListener(String name, String code, Integer type){
+        this.name = name;
+        this.code = code;
+        this.type = type;
+    }
+
+    public static TopListener parse(String code){
+        TopListener topListener = null;
+        for (TopListener t:TopListener.values()) {
+            if (t.getCode().equals(code)){
+                topListener = t;
+                break;
+            }
+        }
+        return topListener;
+    }
+    public static List<TopListener> parse(Integer type){
+        List<TopListener> listeners = new ArrayList<>();
+        for (TopListener t:TopListener.values()) {
+            if (t.getType().equals(type)){
+                listeners.add(t);
+            }
+        }
+        return listeners;
+    }
+    public String getCode(){
+        return code;
+    }
+    public String getName(){
+        return name;
+    }
+    public Integer getType(){
+        return type;
+    }
+}

+ 9 - 13
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/impl/BaseAlarmServiceImpl.java

@@ -1,35 +1,27 @@
 package com.usky.iot.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.usky.common.core.bean.CommonPage;
 import com.usky.common.security.utils.SecurityUtils;
-import com.usky.iot.domain.BaseAlarm;
-import com.usky.iot.domain.BaseAlarmType;
-import com.usky.iot.domain.DmpDeviceInfo;
-import com.usky.iot.domain.DmpProductInfo;
+import com.usky.iot.domain.*;
 import com.usky.iot.mapper.BaseAlarmMapper;
 import com.usky.iot.service.BaseAlarmService;
 import com.usky.common.mybatis.core.AbstractCrudService;
 import com.usky.iot.service.DmpDeviceInfoService;
 import com.usky.iot.service.DmpProductInfoService;
+import com.usky.iot.service.config.mqtt.MqttOutConfig;
 import com.usky.iot.service.vo.BaseAlarmListVO;
 import com.usky.iot.service.vo.BaseAlarmRequestVO;
 import com.usky.iot.service.vo.BaseAlarmResponeVO;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import org.springframework.web.bind.annotation.RequestBody;
 
+import javax.annotation.Resource;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -214,7 +206,11 @@ public class BaseAlarmServiceImpl extends AbstractCrudService<BaseAlarmMapper, B
             this.updateById(baseAlarm);
         }
     }
-
+    @Override
+    public boolean add(BaseAlarm baseAlarm) {
+        baseAlarm.setTenantId(SecurityUtils.getTenantId());
+        return this.save(baseAlarm);
+    }
 }
 
 

+ 25 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/mqtt/MqttStrategy.java

@@ -0,0 +1,25 @@
+package com.usky.iot.service.mqtt;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.usky.iot.service.vo.MqttBaseVO;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 策略类
+ *
+ * @author yq
+ * @date 2021/11/3 8:27
+ */
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqttBaseVO
+     * @return
+     */
+    String disposeMessage(MqttBaseVO mqttBaseVO);
+
+}

+ 21 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/vo/MqttBaseVO.java

@@ -0,0 +1,21 @@
+package com.usky.iot.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:32
+ */
+@Data
+public class MqttBaseVO {
+    /**
+     * 接口描述
+     */
+    private String describe;
+
+    private String topic;
+    /**
+     * 数据内容
+      */
+    private Object data;
+}

+ 35 - 0
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/vo/MqttDataVO.java

@@ -0,0 +1,35 @@
+package com.usky.iot.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author zyj
+ * @date 2023-01-11
+ */
+@Data
+public class MqttDataVO {
+    /**
+     * 设备编号
+     */
+    private String devId;
+
+    /**
+     * 设备类型
+     */
+    private Integer deviceType;
+
+    /**
+     * 单位编号
+     */
+    private String companyId;
+
+    /**
+     * 系统类型 (1.火灾自动报警系统 2.自动喷水灭火系统 3.消防给水及消火栓系统 4.防排烟系统 5.电气火灾系统 6.消防视频监控系统)
+     */
+    private Integer systemType;
+
+    /**
+     * 单位名称
+     */
+    private String companyName;
+}