Parcourir la source

开发处理消费事件mqtt消息推送websocket服务逻辑,同时处理心跳mqtt消息发生时间同步到设备信息表对应字段

james il y a 5 heures
Parent
commit
ca2a6b644b

+ 327 - 89
service-sas/service-sas-biz/src/main/java/com/usky/sas/mqtt/MqttService.java

@@ -1,11 +1,17 @@
 package com.usky.sas.mqtt;
 
 import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.util.IdUtil;
+import cn.hutool.core.util.StrUtil;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.usky.sas.common.enums.SystemTypeCodeEnum;
+import com.usky.sas.common.global.SasWebSocket;
 import com.usky.sas.domain.*;
+import com.usky.sas.mapper.*;
 import com.usky.sas.mqtt.dto.*;
 import com.usky.sas.service.*;
 import lombok.extern.slf4j.Slf4j;
@@ -13,6 +19,7 @@ import org.eclipse.paho.client.mqttv3.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -20,12 +27,12 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
 public class MqttService {
 
-    /** clientId、topics 仍从配置文件读取,连接参数 host/port/username/password 从 sas_config 表获取 */
     @Value("${mqtt.client-id:sas-client-${random.uuid}}")
     private String clientId;
 
@@ -33,7 +40,6 @@ public class MqttService {
     private String topics;
 
     private MqttClient client;
-    /** 是否处理消息:false 时收到消息不进入业务处理,用于启停监听 */
     private volatile boolean isListening = true;
 
     private final ObjectMapper mapper = new ObjectMapper()
@@ -63,6 +69,33 @@ public class MqttService {
 
     @Autowired
     private SasConfigService sasConfigService;
+    @Autowired
+    private SasDeviceService deviceService;
+    @Autowired
+    private SasGisMapper gisMapper;
+
+    // Event Code Mappers
+    @Autowired
+    private SasUsbEventCodeMapper usbEventCodeMapper;
+    @Autowired
+    private SasSnapTypeCodeMapper snapTypeCodeMapper;
+    @Autowired
+    private SasEntranceEventCodeMapper entranceEventCodeMapper;
+    @Autowired
+    private SasParkingEventCodeMapper parkingEventCodeMapper;
+    @Autowired
+    private SasRoadblockEventCodeMapper roadblockEventCodeMapper;
+    @Autowired
+    private SasAlarsasEventCodeMapper alarsasEventCodeMapper;
+    @Autowired
+    private SasPatrolEventCodeMapper patrolEventCodeMapper;
+    @Autowired
+    private SasAcquisitionEventCodeMapper acquisitionEventCodeMapper;
+    @Autowired
+    private SasPerceptionEventCodeMapper perceptionEventCodeMapper;
+    @Autowired
+    private SasCollectionEventCodeMapper collectionEventCodeMapper;
+
 
     @PostConstruct
     public void init() {
@@ -139,9 +172,6 @@ public class MqttService {
         }
     }
 
-    /**
-     * 连接丢失后延迟尝试主动重连(与自动重连互补)
-     */
     private void scheduleReconnect() {
         Thread t = new Thread(() -> {
             try {
@@ -167,14 +197,12 @@ public class MqttService {
         log.debug("收到MQTT消息, topic={}, payload={}", topic, payload);
 
         try {
-            // 从topic中提取事件类型,支持多级路径如 event/patrol/xxx 或 event/patrol
             String[] parts = topic.split("/");
             if (parts.length < 2) {
                 log.warn("未知的topic格式: {}", topic);
                 return;
             }
 
-            // 查找事件类型关键字在topic中的位置
             String eventType = null;
             for (int i = 0; i < parts.length - 1; i++) {
                 if ("event".equals(parts[i]) && i + 1 < parts.length) {
@@ -228,7 +256,8 @@ public class MqttService {
         }
     }
 
-    private void handleUsbEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleUsbEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -236,29 +265,47 @@ public class MqttService {
             if ("event".equals(method)) {
                 UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class);
                 SasUsbEvent event = new SasUsbEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (usbEventService.getById(eventId) != null) {
+                    log.info("USB事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setCardId(message.getIcId());
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 usbEventService.save(event);
-                log.info("USB事件保存成功, eventId={}", event.getEventId());
+
+                // WebSocket 推送
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasUsbEventCode code = usbEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.usb.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.usb.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("USB事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到USB设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.usb.getCode());
             }
         } catch (Exception e) {
-            log.error("处理USB事件失败: {}", e.getMessage(), e);
+            log.error("处理USB事件失败", e);
         }
     }
 
-    private void handleSnapEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleSnapEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -276,23 +323,36 @@ public class MqttService {
                     return;
                 }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 snapEventService.save(event);
-                log.info("抓拍事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasSnapTypeCode code = snapTypeCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.snap.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.snap.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("抓拍事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到抓拍设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.snap.getCode());
             }
         } catch (Exception e) {
-            log.error("处理抓拍事件失败: {}", e.getMessage(), e);
+            log.error("处理抓拍事件失败", e);
         }
     }
 
-    private void handleEntranceEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleEntranceEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -300,29 +360,46 @@ public class MqttService {
             if ("event".equals(method)) {
                 EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class);
                 SasEntranceEvent event = new SasEntranceEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (entranceEventService.getById(eventId) != null) {
+                    log.info("门禁事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setCardId(message.getIcId());
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 entranceEventService.save(event);
-                log.info("门禁事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasEntranceEventCode code = entranceEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.entrance.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.entrance.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("门禁事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到门禁设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.entrance.getCode());
             }
         } catch (Exception e) {
-            log.error("处理门禁事件失败: {}", e.getMessage(), e);
+            log.error("处理门禁事件失败", e);
         }
     }
 
-    private void handleParkingEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleParkingEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -330,28 +407,45 @@ public class MqttService {
             if ("event".equals(method)) {
                 ParkingEventMessage message = mapper.readValue(payload, ParkingEventMessage.class);
                 SasParkingEvent event = new SasParkingEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (parkingEventService.getById(eventId) != null) {
+                    log.info("停车场事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 parkingEventService.save(event);
-                log.info("停车场事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasParkingEventCode code = parkingEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.parking.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.parking.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("停车场事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到停车场设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.parking.getCode());
             }
         } catch (Exception e) {
-            log.error("处理停车场事件失败: {}", e.getMessage(), e);
+            log.error("处理停车场事件失败", e);
         }
     }
 
-    private void handleRoadblockEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleRoadblockEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -359,28 +453,45 @@ public class MqttService {
             if ("event".equals(method)) {
                 RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class);
                 SasRoadblockEvent event = new SasRoadblockEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (roadblockEventService.getById(eventId) != null) {
+                    log.info("阻车路障事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 roadblockEventService.save(event);
-                log.info("阻车路障事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasRoadblockEventCode code = roadblockEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.roadblock.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.roadblock.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("阻车路障事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到阻车路障设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.roadblock.getCode());
             }
         } catch (Exception e) {
-            log.error("处理阻车路障事件失败: {}", e.getMessage(), e);
+            log.error("处理阻车路障事件失败", e);
         }
     }
 
-    private void handleAlarsasEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleAlarsasEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -388,28 +499,45 @@ public class MqttService {
             if ("event".equals(method)) {
                 AlarsasEventMessage message = mapper.readValue(payload, AlarsasEventMessage.class);
                 SasAlarsasEvent event = new SasAlarsasEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (alarsasEventService.getById(eventId) != null) {
+                    log.info("入侵报警事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 alarsasEventService.save(event);
-                log.info("入侵报警事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasAlarsasEventCode code = alarsasEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.alarm.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.alarm.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("入侵报警事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到入侵报警设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.alarm.getCode());
             }
         } catch (Exception e) {
-            log.error("处理入侵报警事件失败: {}", e.getMessage(), e);
+            log.error("处理入侵报警事件失败", e);
         }
     }
 
-    private void handlePatrolEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handlePatrolEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -417,28 +545,45 @@ public class MqttService {
             if ("event".equals(method)) {
                 PatrolEventMessage message = mapper.readValue(payload, PatrolEventMessage.class);
                 SasPatrolEvent event = new SasPatrolEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (patrolEventService.getById(eventId) != null) {
+                    log.info("巡检事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 patrolEventService.save(event);
-                log.info("巡检事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasPatrolEventCode code = patrolEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.patrol.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.patrol.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("巡检事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到巡检设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.patrol.getCode());
             }
         } catch (Exception e) {
-            log.error("处理巡检事件失败: {}", e.getMessage(), e);
+            log.error("处理巡检事件失败", e);
         }
     }
 
-    private void handleAcquisitionEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleAcquisitionEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -446,28 +591,45 @@ public class MqttService {
             if ("event".equals(method)) {
                 AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class);
                 SasAcquisitionEvent event = new SasAcquisitionEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (acquisitionEventService.getById(eventId) != null) {
+                    log.info("数据采集事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 acquisitionEventService.save(event);
-                log.info("数据采集事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasAcquisitionEventCode code = acquisitionEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.acquisition.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.acquisition.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("数据采集事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到数据采集设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.acquisition.getCode());
             }
         } catch (Exception e) {
             log.error("处理数据采集事件失败", e);
         }
     }
 
-    private void handlePerceptionEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handlePerceptionEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -475,28 +637,45 @@ public class MqttService {
             if ("event".equals(method)) {
                 PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class);
                 SasPerceptionEvent event = new SasPerceptionEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (perceptionEventService.getById(eventId) != null) {
+                    log.info("状态感知事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 perceptionEventService.save(event);
-                log.info("状态感知事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasPerceptionEventCode code = perceptionEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.perception.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.perception.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("状态感知事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到状态感知设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.perception.getCode());
             }
         } catch (Exception e) {
             log.error("处理状态感知事件失败", e);
         }
     }
 
-    private void handleCollectionEvent(String payload, String topic) {
+    @Transactional(rollbackFor = Exception.class)
+    public void handleCollectionEvent(String payload, String topic) {
         try {
             JsonNode root = mapper.readTree(payload);
             String method = root.path("method").asText();
@@ -504,27 +683,101 @@ public class MqttService {
             if ("event".equals(method)) {
                 CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class);
                 SasCollectionEvent event = new SasCollectionEvent();
-
                 String eventId = message.getId() != null && !message.getId().isEmpty()
                         ? message.getId()
                         : IdUtil.randomUUID();
+                // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
+                if (collectionEventService.getById(eventId) != null) {
+                    log.info("状态采集事件已存在, 跳过保存, eventId={}", eventId);
+                    return;
+                }
                 event.setEventId(eventId);
-
                 BeanUtil.copyProperties(message, event, "eventId");
                 event.setTriggerTime(parseTime(message.getTriggerTime()));
                 event.setCreateTime(LocalDateTime.now());
                 event.setUpdateTime(LocalDateTime.now());
-
                 collectionEventService.save(event);
-                log.info("状态采集事件保存成功, eventId={}", event.getEventId());
+
+                BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
+                info.setTriggerTime(message.getTriggerTime());
+                info.setCreateTime(LocalDateTime.now().format(formatter));
+                
+                SasCollectionEventCode code = collectionEventCodeMapper.selectById(event.getEventCode());
+                if (code != null) {
+                    info.setEventTypeName(code.getName());
+                }
+
+                enrichDeviceInfo(info, event.getDeviceId(), event.getChannel(), SystemTypeCodeEnum.collection.getCode());
+                info.setDeviceType(SystemTypeCodeEnum.collection.getCode());
+                SasWebSocket.sendAll(info);
+
+                log.info("状态采集事件保存并推送成功, eventId={}", event.getEventId());
             } else if ("heart".equals(method)) {
-                log.debug("收到状态采集设备心跳, deviceId={}", extractDeviceId(topic));
+                handleHeartbeat(payload, SystemTypeCodeEnum.collection.getCode());
             }
         } catch (Exception e) {
             log.error("处理状态采集事件失败", e);
         }
     }
 
+    /**
+     * 统一处理心跳逻辑
+     */
+    private void handleHeartbeat(String payload, int deviceTypeCode) {
+        try {
+            GlobalDeviceHeartInfo heartInfo = mapper.readValue(payload, GlobalDeviceHeartInfo.class);
+            List<SasDevice> devices = deviceService.list(new LambdaQueryWrapper<SasDevice>()
+                    .eq(SasDevice::getDeviceId, heartInfo.getDeviceId())
+                    .eq(SasDevice::getDeviceType, deviceTypeCode));
+
+            if (CollUtil.isNotEmpty(devices)) {
+                GisHeartInfo heartInfoGis = heartInfo.getGis();
+                List<SasDevice> updateList = devices.stream().map(device -> {
+                    device.setTriggerTime(LocalDateTime.now());
+                    device.setUpdateTime(LocalDateTime.now());
+
+                    // 更新 GIS 信息
+                    if (heartInfoGis != null && StrUtil.isNotBlank(device.getGisId())) {
+                        SasGis gis = gisMapper.selectById(device.getGisId());
+                        if (gis != null) {
+                            // GisHeartInfo: longitude, latitude, altitude
+                            // SasGis: lon, lat, alt
+                            if (heartInfoGis.getLongitude() != null) gis.setLon(heartInfoGis.getLongitude());
+                            if (heartInfoGis.getLatitude() != null) gis.setLat(heartInfoGis.getLatitude());
+                            if (heartInfoGis.getAltitude() != null) gis.setAlt(heartInfoGis.getAltitude());
+                            
+                            gisMapper.updateById(gis);
+                        }
+                    }
+                    return device;
+                }).collect(Collectors.toList());
+                
+                deviceService.updateBatchById(updateList);
+                log.debug("更新设备心跳成功, deviceId={}, count={}", heartInfo.getDeviceId(), updateList.size());
+            }
+        } catch (Exception e) {
+            log.error("处理设备心跳失败", e);
+        }
+    }
+
+    /**
+     * 补充设备信息到 BrieflyEventInfo
+     */
+    private void enrichDeviceInfo(BrieflyEventInfo info, String deviceId, Integer channel, Integer deviceTypeCode) {
+        SasDevice device = deviceService.getOne(new LambdaQueryWrapper<SasDevice>()
+                .eq(SasDevice::getDeviceId, deviceId)
+                .eq(SasDevice::getChannel, channel)
+                .eq(SasDevice::getDeviceType, deviceTypeCode)
+                .last("limit 1")); // 确保只查一条
+        
+        if (device != null) {
+            info.setAddress(device.getAddress());
+            if (StrUtil.isBlank(info.getNote())) {
+                info.setNote(device.getNote());
+            }
+        }
+    }
+
     private String extractDeviceId(String topic) {
         String[] parts = topic.split("/");
         return parts.length > 0 ? parts[parts.length - 1] : "unknown";
@@ -553,39 +806,24 @@ public class MqttService {
         }
     }
 
-    /**
-     * 暂停监听:不再处理新到达的 MQTT 消息,连接保持
-     */
     public void pauseListening() {
         this.isListening = false;
         log.info("MQTT监听已暂停");
     }
 
-    /**
-     * 恢复监听:继续处理 MQTT 消息
-     */
     public void resumeListening() {
         this.isListening = true;
         log.info("MQTT监听已恢复");
     }
 
-    /**
-     * 查询连接状态:是否已连接 Broker
-     */
     public boolean getConnectionStatus() {
         return client != null && client.isConnected();
     }
 
-    /**
-     * 查询监听状态:是否正在处理消息(未暂停)
-     */
     public boolean getListeningStatus() {
         return isListening;
     }
 
-    /**
-     * 主动重连:先断开并关闭旧连接,再重新初始化连接并订阅
-     */
     public void reconnect() {
         try {
             if (client != null) {
@@ -618,4 +856,4 @@ public class MqttService {
             log.error("断开MQTT连接失败", e);
         }
     }
-}
+}

+ 79 - 0
service-sas/service-sas-biz/src/main/java/com/usky/sas/mqtt/dto/BrieflyEventInfo.java

@@ -0,0 +1,79 @@
+package com.usky.sas.mqtt.dto;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ * 简要事件信息 (WebSocket 推送)
+ */
+@Data
+public class BrieflyEventInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 事件ID
+     */
+    private String eventId;
+
+    /**
+     * 设备ID
+     */
+    private String deviceId;
+
+    /**
+     * 事件类型名称
+     */
+    private String eventTypeName;
+
+    /**
+     * 触发时间
+     */
+    private String triggerTime;
+
+    /**
+     * 创建时间 (接收时间)
+     */
+    private String createTime;
+
+    /**
+     * 事件图片 URL
+     */
+    private String eventUrl;
+
+    /**
+     * 场景图片 URL
+     */
+    private String sceneUrl;
+
+    /**
+     * 设备地址
+     */
+    private String address;
+
+    /**
+     * 备注
+     */
+    private String note;
+
+    /**
+     * 设备类型 (SystemTypeCodeEnum.code)
+     */
+    private Integer deviceType;
+    
+    /**
+     * 门禁出入类型(进/出)
+     */
+    private Integer accessType;
+    
+    /**
+     * 车辆类型
+     */
+    private String carType;
+    
+    /**
+     * 车牌类型
+     */
+    private String plateType;
+}

+ 39 - 0
service-sas/service-sas-biz/src/main/java/com/usky/sas/mqtt/dto/GisHeartInfo.java

@@ -0,0 +1,39 @@
+package com.usky.sas.mqtt.dto;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+/**
+ * GIS 心跳信息
+ */
+@Data
+public class GisHeartInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 经度
+     */
+    private BigDecimal longitude;
+
+    /**
+     * 纬度
+     */
+    private BigDecimal latitude;
+
+    /**
+     * 高度
+     */
+    private BigDecimal altitude;
+
+    /**
+     * 坐标系类型
+     */
+    private Integer gisType;
+
+    /**
+     * 坐标轴框架
+     */
+    private Integer frameOfAxes;
+}

+ 28 - 0
service-sas/service-sas-biz/src/main/java/com/usky/sas/mqtt/dto/GlobalDeviceHeartInfo.java

@@ -0,0 +1,28 @@
+package com.usky.sas.mqtt.dto;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * 全局设备心跳信息
+ */
+@Data
+public class GlobalDeviceHeartInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 设备ID
+     */
+    private String deviceId;
+
+    /**
+     * GIS 信息
+     */
+    private GisHeartInfo gis;
+
+    /**
+     * 设备状态
+     */
+    private Integer status;
+}