Browse Source

1、在service-agbox网关服务模块中增加mqtt配置,以及巡检设备事件和心跳mqtt处理逻辑;
2、将service-iot巡检设备心跳和service-fire巡检事件上报的推送方式改为mqtt推送;

james 4 months ago
parent
commit
4d06f09636
17 changed files with 601 additions and 92 deletions
  1. 50 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/config/mqtt/MqttBaseConfig.java
  2. 52 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/config/mqtt/MqttInConfig.java
  3. 84 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/config/mqtt/MqttOutConfig.java
  4. 86 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/enums/TopListener.java
  5. 57 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/listener/MqttListener.java
  6. 56 56
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/listener/RabbitMQListener.java
  7. 21 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/MqttStrategy.java
  8. 26 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/SimpleContext.java
  9. 54 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/event/event.java
  10. 41 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/info/Info.java
  11. 21 0
      service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/vo/MqttBaseVO.java
  12. 1 1
      service-fire/service-fire-biz/src/main/java/com/usky/fire/RuoYiSystemApplication.java
  13. 13 5
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/PatrolInspectionAttendanceServiceImpl.java
  14. 10 5
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/PatrolInspectionEventServiceImpl.java
  15. 19 19
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/listener/RabbitMQListener.java
  16. 1 1
      service-iot/service-iot-biz/src/main/java/com/usky/iot/RuoYiSystemApplication.java
  17. 9 5
      service-iot/service-iot-biz/src/main/java/com/usky/iot/service/impl/BaseAppInfoServiceImpl.java

+ 50 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,50 @@
+package com.usky.agbox.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+	@Value("${mqtt.username}")
+	private String username;
+
+	@Value("${mqtt.password}")
+	private String password;
+
+	@Value("${mqtt.url}")
+	private String hostUrl;
+
+	@Value("${mqtt.sub-topics}")
+	private String msgTopic;
+
+	@Value("${mqtt.keep-alive-interval}")
+	//心跳间隔
+	private int keepAliveInterval;
+	@Value("${mqtt.completionTimeout}")
+	//心跳间隔
+	private int completionTimeout;
+
+
+	@Bean
+	public MqttPahoClientFactory mqttClientFactory() {
+		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+		MqttConnectOptions options = new MqttConnectOptions();
+		options.setServerURIs(new String[]{this.getHostUrl()});
+		options.setUserName(this.getUsername());
+		options.setPassword(this.getPassword().toCharArray());
+		factory.setConnectionOptions(options);
+		return factory;
+	}
+
+}

+ 52 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,52 @@
+package com.usky.agbox.service.config.mqtt;
+
+import com.usky.agbox.service.enums.TopListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/11/1 16:37
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttInConfig {
+
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     *
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        String[] tops = mqttBaseConfig.getMsgTopic().split(",");
+        String clientId = "h-agbox-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), tops);
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 84 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,84 @@
+package com.usky.agbox.service.config.mqtt;
+
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttOutConfig {
+
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+
+    /**
+     * 连接通道
+     *
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     *
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         *
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         *
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         *
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 86 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/enums/TopListener.java

@@ -0,0 +1,86 @@
+package com.usky.agbox.service.enums;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/12/31 11:11
+ */
+public enum TopListener {
+
+    /**
+     * 城运对接
+     */
+    MH_WATER_INFO("fireInfo","mh/water/info",1),
+    MH_WATER_ALERT("fireInfo","mh/water/alert",1),
+    MH_WATER_STATISTICS("fireInfo","mh/water/statistics",1),
+    DEVICE_INFO("fireInfo","device/info",1),
+    DEVICE_ALERT("fireInfo","device/alert",1),
+    DEVICE_DETAIL("fireInfo","device/detail",1),
+    DEVICE_AJ("fireInfo","device/aj",1),
+
+    /**
+     * 全部设备对接
+     */
+    FIRE_INFO("fireInfo","/usky/ytDP0001/+/+/info",1),
+    FIRE_ALERT("fireAlarm","/usky/ytDP0001/+/+/alarm",1),
+    WATER_INFO("waterInfo","/usky/ytDP0002/+/+/info",1),
+    WATER_ALERT("waterAlert","/usky/ytDP0002/+/+/alarm",1),
+    SMOKE_INFO("smokeInfo","/usky/ytDP0003/+/+/info",1),
+    SMOKE_ALERT("smokeAlarm","/usky/ytDP0003/+/+/alarm",1),
+    LIQUID_INFO("waterInfo","/usky/ytDP0005/+/+/info",1),
+    LIQUID_ALERT("waterAlert","/usky/ytDP0005/+/+/alarm",1),
+    RTU_INFO("rtuinfo","/usky/ytDP0006/+/+/info",1),
+    RTU_ALERT("rtuAlert","/usky/ytDP0006/+/+/alarm",1),
+    ELECTRICAL_INFO("electricalInfo","/usky/ytDP0007/+/+/info",1),
+    ELECTRICAL_ALERT("electricalAlarm","/usky/ytDP0007/+/+/alarm",1),
+    MH_COVER_INFO("waterInfo","/usky/ytDP0008/+/+/info",1),
+    MH_COVER_ALERT("waterAlert","/usky/ytDP0008/+/+/alarm",1),
+    VIDEO_ALERT("videoAlert","/usky/ytCamCore/+/+/alarm",1),
+    LR_SMOKE_INFO("smokeInfo","/usky/ytDP00033/+/+/info",1),
+    LR_SMOKE_ALERT("smokeAlarm","/usky/ytDP00033/+/+/alarm",1);
+
+
+
+
+    private String name;
+    private String code;
+    //0发送队列,1监听队列2都是
+    private Integer type;
+
+    TopListener(String name, String code, Integer type){
+        this.name = name;
+        this.code = code;
+        this.type = type;
+    }
+
+    public static TopListener parse(String code){
+        TopListener topListener = null;
+        for (TopListener t:TopListener.values()) {
+            if (t.getCode().equals(code)){
+                topListener = t;
+                break;
+            }
+        }
+        return topListener;
+    }
+    public static List<TopListener> parse(Integer type){
+        List<TopListener> listeners = new ArrayList<>();
+        for (TopListener t:TopListener.values()) {
+            if (t.getType().equals(type)){
+                listeners.add(t);
+            }
+        }
+        return listeners;
+    }
+    public String getCode(){
+        return code;
+    }
+    public String getName(){
+        return name;
+    }
+    public Integer getType(){
+        return type;
+    }
+}

+ 57 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/listener/MqttListener.java

@@ -0,0 +1,57 @@
+package com.usky.agbox.service.listener;
+
+
+import com.usky.agbox.service.config.mqtt.MqttInConfig;
+import com.usky.agbox.service.mqtt.SimpleContext;
+import com.usky.agbox.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:13
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Slf4j
+@Component
+public class MqttListener {
+
+    public static final String MESSAGE_NAME = "messageInput";
+
+    @Autowired
+    private SimpleContext simpleContext;
+
+    /**
+     * 处理消息-消费者
+     *
+     * @return
+     */
+    @Bean(MESSAGE_NAME)
+    @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
+    public MessageHandler handler() {
+        return message -> {
+            String payload = message.getPayload().toString();
+            //进行接口推送
+            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+            if (null != mqttReceivedTopic) {
+                String topic = mqttReceivedTopic.toString();
+                MqttBaseVO mqttBaseVO = new MqttBaseVO();
+                mqttBaseVO.setTopic(topic);
+                if (topic.indexOf("info") != -1 ) {
+                    mqttBaseVO.setDescribe("info");
+                    mqttBaseVO.setData(payload);
+                }else if(topic.indexOf("event") != -1 ) {
+                    mqttBaseVO.setDescribe("event");
+                    mqttBaseVO.setData(payload);
+                }
+                //统一处理数据
+                simpleContext.getResource(mqttBaseVO);
+            }
+        };
+    }
+}

+ 56 - 56
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/listener/RabbitMQListener.java

@@ -14,60 +14,60 @@ import org.springframework.stereotype.Component;
 @Component
 public class RabbitMQListener {
 
-    @Autowired
-    private patrolAgbox PatrolAgbox;
-    @RabbitHandler
-    @RabbitListener(bindings = @QueueBinding(
-            value = @Queue(),
-            exchange = @Exchange(value = "Patrol_EEvent"),
-            key = {"${agBox.routeKey}"}
-    ))
-    public void getData(Message message){
-        try {
-            String str = new String(message.getBody(),"utf-8");
-            JSONObject eventVO1 = JSONObject.parseObject(str);
-            JSONObject eventVO = new JSONObject();
-            if (eventVO1.get("eventType").equals(31)){
-                eventVO1.put("eventCode",1);
-                PatrolAgbox.addEvent(eventVO1.toJSONString());
-            }else if (eventVO1.get("eventType").equals(32)){
-                eventVO1.put("eventCode",2);
-                PatrolAgbox.addEvent(eventVO1.toJSONString());
-            }else {
-                if (eventVO1.get("eventType").equals(33)||eventVO1.get("eventType").equals(34)){
-                    eventVO.put("eventCode",5);
-                }else {
-                    eventVO.put("eventCode",16);
-                }
-                String timeWithT = eventVO1.get("createTime").toString();
-                eventVO.put("deviceId",eventVO1.get("deviceId"));
-                eventVO.put("triggerTime",timeWithT.replace("T", " "));
-                eventVO.put("name",eventVO1.get("createBy"));
-                eventVO.put("certifiedNo","");
-                PatrolAgbox.addEvent(eventVO.toJSONString());
-            }
-            System.out.println("FEventReceiver消费者收到消息: " + str);
-        } catch (Exception e){
-            e.printStackTrace();
-        }
-    }
-
-    @RabbitHandler
-    @RabbitListener(bindings = @QueueBinding(
-            value = @Queue(),
-            exchange = @Exchange(value = "Patrol_EInfo"),
-            key = {"${agBox.routeKey}"}
-    ))
-    public void getDataInfo(Message message){
-        try {
-            String str = new String(message.getBody(),"utf-8");
-            JSONObject eventVO1 = JSONObject.parseObject(str);
-            JSONObject eventVO = new JSONObject();
-            eventVO.put("deviceId",eventVO1.get("deviceId"));
-            PatrolAgbox.updateHeart(eventVO.toJSONString());
-            System.out.println("FInfoReceiver消费者收到消息: " + str);
-        } catch (Exception e){
-            e.printStackTrace();
-        }
-    }
+//    @Autowired
+//    private patrolAgbox PatrolAgbox;
+//    @RabbitHandler
+//    @RabbitListener(bindings = @QueueBinding(
+//            value = @Queue(),
+//            exchange = @Exchange(value = "Patrol_EEvent"),
+//            key = {"${agBox.routeKey}"}
+//    ))
+//    public void getData(Message message){
+//        try {
+//            String str = new String(message.getBody(),"utf-8");
+//            JSONObject eventVO1 = JSONObject.parseObject(str);
+//            JSONObject eventVO = new JSONObject();
+//            if (eventVO1.get("eventType").equals(31)){
+//                eventVO1.put("eventCode",1);
+//                PatrolAgbox.addEvent(eventVO1.toJSONString());
+//            }else if (eventVO1.get("eventType").equals(32)){
+//                eventVO1.put("eventCode",2);
+//                PatrolAgbox.addEvent(eventVO1.toJSONString());
+//            }else {
+//                if (eventVO1.get("eventType").equals(33)||eventVO1.get("eventType").equals(34)){
+//                    eventVO.put("eventCode",5);
+//                }else {
+//                    eventVO.put("eventCode",16);
+//                }
+//                String timeWithT = eventVO1.get("createTime").toString();
+//                eventVO.put("deviceId",eventVO1.get("deviceId"));
+//                eventVO.put("triggerTime",timeWithT.replace("T", " "));
+//                eventVO.put("name",eventVO1.get("createBy"));
+//                eventVO.put("certifiedNo","");
+//                PatrolAgbox.addEvent(eventVO.toJSONString());
+//            }
+//            System.out.println("FEventReceiver消费者收到消息: " + str);
+//        } catch (Exception e){
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    @RabbitHandler
+//    @RabbitListener(bindings = @QueueBinding(
+//            value = @Queue(),
+//            exchange = @Exchange(value = "Patrol_EInfo"),
+//            key = {"${agBox.routeKey}"}
+//    ))
+//    public void getDataInfo(Message message){
+//        try {
+//            String str = new String(message.getBody(),"utf-8");
+//            JSONObject eventVO1 = JSONObject.parseObject(str);
+//            JSONObject eventVO = new JSONObject();
+//            eventVO.put("deviceId",eventVO1.get("deviceId"));
+//            PatrolAgbox.updateHeart(eventVO.toJSONString());
+//            System.out.println("FInfoReceiver消费者收到消息: " + str);
+//        } catch (Exception e){
+//            e.printStackTrace();
+//        }
+//    }
 }

+ 21 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/MqttStrategy.java

@@ -0,0 +1,21 @@
+package com.usky.agbox.service.mqtt;
+
+
+import com.usky.agbox.service.vo.MqttBaseVO;
+
+/**
+ * 策略类
+ *
+ * @author yq
+ * @date 2021/11/3 8:27
+ */
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqttBaseVO
+     * @return
+     */
+    String disposeMessage(MqttBaseVO mqttBaseVO);
+
+}

+ 26 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/SimpleContext.java

@@ -0,0 +1,26 @@
+package com.usky.agbox.service.mqtt;
+
+
+import com.usky.agbox.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 中间处理消息转发
+ */
+@Service
+public class SimpleContext {
+    @Autowired
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MqttBaseVO mqttBaseVO) {
+        return strategyMap.get(mqttBaseVO.getDescribe()).disposeMessage(mqttBaseVO);
+    }
+}

+ 54 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/event/event.java

@@ -0,0 +1,54 @@
+package com.usky.agbox.service.mqtt.event;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.agbox.service.job.patrolAgbox;
+import com.usky.agbox.service.mqtt.MqttStrategy;
+import com.usky.agbox.service.vo.MqttBaseVO;
+import com.usky.common.core.util.JsonUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+
+@Service("event")
+public class event implements MqttStrategy {
+    @Autowired
+    private patrolAgbox PatrolAgbox;
+
+    //处理下发命令响应消息
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+
+        try {
+            JSONObject eventVO1 = JSONObject.parseObject(mqttBaseVO.getData().toString());
+            JSONObject eventVO = new JSONObject();
+            if (eventVO1.get("eventType").equals(31)){
+                eventVO1.put("eventCode",1);
+                PatrolAgbox.addEvent(eventVO1.toString());
+            }else if (eventVO1.get("eventType").equals(32)){
+                eventVO1.put("eventCode",2);
+                PatrolAgbox.addEvent(eventVO1.toString());
+            }else {
+                if (eventVO1.get("eventType").equals(33)||eventVO1.get("eventType").equals(34)){
+                    eventVO.put("eventCode",5);
+                }else {
+                    eventVO.put("eventCode",16);
+                }
+                String timeWithT = eventVO1.get("createTime").toString();
+                eventVO.put("deviceId",eventVO1.get("deviceId"));
+                eventVO.put("triggerTime",timeWithT.replace("T", " "));
+                eventVO.put("name",eventVO1.get("createBy"));
+                eventVO.put("certifiedNo","");
+                PatrolAgbox.addEvent(eventVO.toJSONString());
+            }
+            System.out.println("FEventReceiver消费者收到消息: " + mqttBaseVO.getData().toString());
+        } catch (Exception e){
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+
+
+}

+ 41 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/mqtt/info/Info.java

@@ -0,0 +1,41 @@
+package com.usky.agbox.service.mqtt.info;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.agbox.service.job.patrolAgbox;
+import com.usky.agbox.service.mqtt.MqttStrategy;
+import com.usky.agbox.service.vo.MqttBaseVO;
+import com.usky.common.core.util.JsonUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author zyj
+ * @date 2022/12/6 15:07
+ */
+@Service("info")
+public class Info implements MqttStrategy {
+    @Autowired
+    private patrolAgbox PatrolAgbox;
+
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+
+        try {
+            JSONObject map_data = JSONObject.parseObject(mqttBaseVO.getData().toString());
+            JSONObject eventVO = new JSONObject();
+            eventVO.put("deviceId",map_data.get("deviceId"));
+            PatrolAgbox.updateHeart(eventVO.toJSONString());
+            System.out.println("FInfoReceiver消费者收到消息: " + mqttBaseVO.getData().toString());
+        } catch (Exception e){
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+
+
+}

+ 21 - 0
service-agbox/service-agbox-biz/src/main/java/com/usky/agbox/service/vo/MqttBaseVO.java

@@ -0,0 +1,21 @@
+package com.usky.agbox.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:32
+ */
+@Data
+public class MqttBaseVO {
+    /**
+     * 接口描述
+     */
+    private String describe;
+
+    private String topic;
+    /**
+     * 数据内容
+      */
+    private Object data;
+}

+ 1 - 1
service-fire/service-fire-biz/src/main/java/com/usky/fire/RuoYiSystemApplication.java

@@ -14,7 +14,7 @@ import org.springframework.context.annotation.ComponentScan;
  * @author ruoyi
  */
 
-@EnableRabbit
+//@EnableRabbit
 @EnableFeignClients(basePackages = "com.usky")
 @MapperScan(value = "com.usky.fire.mapper")
 @ComponentScan("com.usky")

+ 13 - 5
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/PatrolInspectionAttendanceServiceImpl.java

@@ -1,5 +1,7 @@
 package com.usky.fire.service.impl;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -19,6 +21,7 @@ import com.usky.common.mybatis.core.AbstractCrudService;
 import com.usky.fire.service.PatrolInspectionPersonnelService;
 import com.usky.fire.service.PatrolInspectionTypeService;
 
+import com.usky.fire.service.config.mqtt.MqttOutConfig;
 import com.usky.fire.service.config.rabbitmq.RabbitMQConfig;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -60,11 +63,14 @@ public class PatrolInspectionAttendanceServiceImpl extends AbstractCrudService<P
     @Value("${agBox.push}")
     private Integer pushFlag;
 
-    @Resource
-    private RabbitTemplate rabbitTemplate;
+//    @Resource
+//    private RabbitTemplate rabbitTemplate;
+//
+//    @Autowired
+//    private RabbitMQConfig rabbitMQConfig;
 
-    @Autowired
-    private RabbitMQConfig rabbitMQConfig;
+    @Resource
+    private MqttOutConfig.MqttGateway mqttGateway;
 
     @Override
     public IPage<Map<String, Object>> pageList(Integer pageNum, Integer pageSize, String operateCode, String operator, LocalDateTime startTime, LocalDateTime endTime) {
@@ -176,7 +182,9 @@ public class PatrolInspectionAttendanceServiceImpl extends AbstractCrudService<P
         jsonObj.put("name", SecurityUtils.getUsername());
         jsonObj.put("certifiedNo", patrolInspectionAttendance.getIdentificationNumber());
 
-        rabbitTemplate.convertAndSend(rabbitMQConfig.patrolEventExchange, SecurityUtils.getTenantId().toString(), jsonObj.toJSONString());
+//        rabbitTemplate.convertAndSend(rabbitMQConfig.patrolEventExchange, SecurityUtils.getTenantId().toString(), jsonObj.toJSONString());
+        mqttGateway.sendToMqtt("/patrolEEvent/"+SecurityUtils.getTenantId()+"/event", jsonObj.toJSONString());
+
 //        if (pushFlag.equals(1)){
 //            JSONObject a = remotePatrolAgboxService.addEvent(jsonObj.toJSONString());
 //        }

+ 10 - 5
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/PatrolInspectionEventServiceImpl.java

@@ -18,6 +18,7 @@ import com.usky.fire.service.PatrolInspectionEventService;
 import com.usky.common.mybatis.core.AbstractCrudService;
 import com.usky.fire.service.PatrolInspectionPersonnelService;
 import com.usky.fire.service.PatrolInspectionPlanScheduleService;
+import com.usky.fire.service.config.mqtt.MqttOutConfig;
 import com.usky.fire.service.config.rabbitmq.RabbitMQConfig;
 import com.usky.fire.service.vo.PatrolInspectionPlanRequestVO;
 import com.usky.system.RemoteDictService;
@@ -32,6 +33,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.io.IOException;
 import java.time.LocalDateTime;
 import java.util.HashMap;
@@ -52,10 +54,12 @@ public class PatrolInspectionEventServiceImpl extends AbstractCrudService<Patrol
     private PatrolInspectionPlanScheduleService patrolInspectionPlanScheduleService;
     @Autowired
     private PatrolInspectionPersonnelService patrolInspectionPersonnelService;
-    @Autowired
-    private RabbitTemplate rabbitTemplate;
-    @Autowired
-    private RabbitMQConfig rabbitMQConfig;
+//    @Autowired
+//    private RabbitTemplate rabbitTemplate;
+//    @Autowired
+//    private RabbitMQConfig rabbitMQConfig;
+    @Resource
+    private MqttOutConfig.MqttGateway mqttGateway;
     @Autowired
     private RemoteMceService remoteMceService;
     @Autowired
@@ -126,7 +130,8 @@ public class PatrolInspectionEventServiceImpl extends AbstractCrudService<Patrol
         map.put("createBy",patrolInspectionEvent.getCreateBy());
         map.put("createTime",patrolInspectionEvent.getCreateTime());
 
-        rabbitTemplate.convertAndSend(rabbitMQConfig.patrolEventExchange,SecurityUtils.getTenantId().toString(),map);
+//        rabbitTemplate.convertAndSend(rabbitMQConfig.patrolEventExchange,SecurityUtils.getTenantId().toString(),map);
+        mqttGateway.sendToMqtt("/patrolEEvent/"+SecurityUtils.getTenantId()+"/event",JSONObject.toJSONString(map));
 //      rabbitTemplate.convertAndSend(rabbitMQConfig.patrolFEventExchange,"", JSONObject.toJSONString(map));
 
         this.save(patrolInspectionEvent);

+ 19 - 19
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/listener/RabbitMQListener.java

@@ -13,23 +13,23 @@ import java.util.Map;
 @Component
 public class RabbitMQListener {
 
-    @RabbitHandler
-    @RabbitListener(queues = "Patrol_QEvent")  //监听名称为Patrol_QEvent队列消息
-    public void process(Map testMessage){
-        System.out.println("DirectReceiver消费者收到消息: " + testMessage.toString());
-    }
-
-    @RabbitHandler
-    @RabbitListener(bindings = @QueueBinding(
-            value = @Queue(),
-            exchange = @Exchange(value = "Patrol_FEvent",type = ExchangeTypes.FANOUT)
-    ))
-    public void getData(Message message){
-        try {
-            String str = new String(message.getBody(),"utf-8");
-            System.out.println("FanoutReceiver消费者收到消息: " + str);
-        } catch (Exception e){
-            e.printStackTrace();
-        }
-    }
+//    @RabbitHandler
+//    @RabbitListener(queues = "Patrol_QEvent")  //监听名称为Patrol_QEvent队列消息
+//    public void process(Map testMessage){
+//        System.out.println("DirectReceiver消费者收到消息: " + testMessage.toString());
+//    }
+//
+//    @RabbitHandler
+//    @RabbitListener(bindings = @QueueBinding(
+//            value = @Queue(),
+//            exchange = @Exchange(value = "Patrol_FEvent",type = ExchangeTypes.FANOUT)
+//    ))
+//    public void getData(Message message){
+//        try {
+//            String str = new String(message.getBody(),"utf-8");
+//            System.out.println("FanoutReceiver消费者收到消息: " + str);
+//        } catch (Exception e){
+//            e.printStackTrace();
+//        }
+//    }
 }

+ 1 - 1
service-iot/service-iot-biz/src/main/java/com/usky/iot/RuoYiSystemApplication.java

@@ -34,7 +34,7 @@ import java.net.UnknownHostException;
 @MapperScan(value = "com.usky.iot.mapper")
 @ComponentScan("com.usky")
 @SpringBootApplication
-@EnableRabbit
+//@EnableRabbit
 public class RuoYiSystemApplication
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(RuoYiSystemApplication.class);

+ 9 - 5
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/impl/BaseAppInfoServiceImpl.java

@@ -15,6 +15,7 @@ import com.usky.iot.domain.BaseGgpFacility;
 import com.usky.iot.mapper.BaseAppInfoMapper;
 import com.usky.iot.service.BaseAppInfoService;
 import com.usky.common.mybatis.core.AbstractCrudService;
+import com.usky.iot.service.config.mqtt.MqttOutConfig;
 import com.usky.iot.service.config.rabbitmq.RabbitMQConfig;
 import com.usky.iot.service.vo.AppInfoRequest;
 import com.usky.iot.service.vo.BaseGgpFacilityRequest;
@@ -57,11 +58,13 @@ public class BaseAppInfoServiceImpl extends AbstractCrudService<BaseAppInfoMappe
     @Value("${agBox.push}")
     private Integer pushFlag;
 
+//    @Resource
+//    private RabbitTemplate rabbitTemplate;
+//
+//    @Autowired
+//    private RabbitMQConfig rabbitMQConfig;
     @Resource
-    private RabbitTemplate rabbitTemplate;
-
-    @Autowired
-    private RabbitMQConfig rabbitMQConfig;
+    private MqttOutConfig.MqttGateway mqttGateway;
 
     public String getIpAddress(HttpServletRequest request) {
         String ip = request.getHeader("X-Forwarded-For");
@@ -102,7 +105,8 @@ public class BaseAppInfoServiceImpl extends AbstractCrudService<BaseAppInfoMappe
         map.put("deviceId",baseAppInfo.getDeviceId());
         map.put("userName",baseAppInfo.getUserName());
         map.put("createTime",baseAppInfo.getCreateTime());
-        rabbitTemplate.convertAndSend(rabbitMQConfig.patrolEInfoExchange,SecurityUtils.getTenantId().toString(),JSONObject.toJSONString(map));
+//        rabbitTemplate.convertAndSend(rabbitMQConfig.patrolEInfoExchange,SecurityUtils.getTenantId().toString(),JSONObject.toJSONString(map));
+        mqttGateway.sendToMqtt("/patrolEInfo/"+SecurityUtils.getTenantId()+"/info",JSONObject.toJSONString(map));
 
         this.save(baseAppInfo);
 //        if (pushFlag.equals(1)){