package com.usky.sas.mqtt; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.usky.sas.common.enums.SystemTypeCodeEnum; import com.usky.sas.common.global.SasWebSocket; import com.usky.sas.domain.*; import com.usky.sas.mapper.*; import com.usky.sas.mqtt.dto.*; import com.usky.sas.service.*; import com.usky.sas.service.dto.agbox.JsonRpcRequest; import com.usky.sas.service.vo.*; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j @Service public class MqttService { @Value("${mqtt.client-id:sas-client-${random.uuid}}") private String clientId; @Value("${mqtt.topics:}") private String topics; private MqttClient client; private volatile boolean isListening = true; private final ObjectMapper mapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private SasUsbEventService usbEventService; @Autowired private SasSnapEventService snapEventService; @Autowired private SasEntranceEventService entranceEventService; @Autowired private SasParkingEventService parkingEventService; @Autowired private SasRoadblockEventService roadblockEventService; @Autowired private SasAlarsasEventService alarsasEventService; @Autowired private SasPatrolEventService patrolEventService; @Autowired private SasAcquisitionEventService acquisitionEventService; @Autowired private SasPerceptionEventService perceptionEventService; @Autowired private SasCollectionEventService collectionEventService; @Autowired private SasConfigService sasConfigService; @Autowired private SasDeviceService deviceService; @Autowired private SasEventTypeGroupService eventTypeGroupService; @Autowired private SasPatrolUserParamService sasPatrolUserParamService; @Autowired private SasGisMapper gisMapper; @Autowired private SasPicMapper sasPicMapper; // Event Code Mappers @Autowired private SasUsbEventCodeMapper usbEventCodeMapper; @Autowired private SasSnapTypeCodeMapper snapTypeCodeMapper; @Autowired private SasEntranceEventCodeMapper entranceEventCodeMapper; @Autowired private SasParkingEventCodeMapper parkingEventCodeMapper; @Autowired private SasRoadblockEventCodeMapper roadblockEventCodeMapper; @Autowired private SasAlarsasEventCodeMapper alarsasEventCodeMapper; @Autowired private SasPatrolEventCodeMapper patrolEventCodeMapper; @Autowired private SasAcquisitionEventCodeMapper acquisitionEventCodeMapper; @Autowired private SasPerceptionEventCodeMapper perceptionEventCodeMapper; @Autowired private SasCollectionEventCodeMapper collectionEventCodeMapper; @PostConstruct public void init() { SasConfig config = sasConfigService.getConfig(); if (config == null || config.getHost() == null || config.getHost().isEmpty()) { log.warn("sas_config 中无 MQTT 配置或 host 为空,跳过 MQTT 连接"); isListening = false; return; } try { String host = config.getHost(); int port = parsePort(config.getPort(), 1883); String username = config.getUsername() != null ? config.getUsername() : ""; String password = config.getPassword() != null ? config.getPassword() : ""; String brokerUrl = "tcp://" + host + ":" + port; client = new MqttClient(brokerUrl, clientId); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); if (!username.isEmpty()) { options.setUserName(username); options.setPassword(password.toCharArray()); } options.setConnectionTimeout(60); options.setKeepAliveInterval(60); options.setAutomaticReconnect(true); client.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT连接成功, reconnect={}", reconnect); parseTopics().forEach(MqttService.this::subscribe); isListening = true; } @Override public void connectionLost(Throwable cause) { log.error("MQTT连接丢失: {}", cause.getMessage()); isListening = false; scheduleReconnect(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if (isListening) { processMessage(topic, message); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); client.connect(options); log.info("MQTT客户端初始化完成, clientId={}", clientId); isListening = true; } catch (MqttException e) { log.error("MQTT初始化失败: {}", e.getMessage(), e); isListening = false; } } private static int parsePort(String portStr, int defaultPort) { if (portStr == null || portStr.isEmpty()) { return defaultPort; } try { return Integer.parseInt(portStr.trim()); } catch (NumberFormatException e) { return defaultPort; } } private void scheduleReconnect() { Thread t = new Thread(() -> { try { Thread.sleep(5000L); if (client != null && !client.isConnected()) { log.info("尝试主动重连MQTT..."); client.reconnect(); } } catch (Exception e) { log.error("MQTT主动重连失败", e); } }, "mqtt-reconnect"); t.setDaemon(true); t.start(); } private List parseTopics() { return Arrays.asList(topics.split(",")); } private void processMessage(String topic, MqttMessage message) { String payload = new String(message.getPayload()); log.debug("收到MQTT消息, topic={}, payload={}", topic, payload); try { String[] parts = topic.split("/"); if (parts.length < 2) { log.warn("未知的topic格式: {}", topic); return; } String eventType = null; for (int i = 0; i < parts.length - 1; i++) { if ("event".equals(parts[i]) && i + 1 < parts.length) { eventType = parts[i + 1]; break; } } if (eventType == null) { log.warn("无法从topic解析事件类型: {}", topic); return; } switch (eventType) { case "usbalarm": handleUsbEvent(payload, topic); break; case "snap": handleSnapEvent(payload, topic); break; case "entrance": handleEntranceEvent(payload, topic); break; case "parking": handleParkingEvent(payload, topic); break; case "roadblock": handleRoadblockEvent(payload, topic); break; case "alarm": handleAlarsasEvent(payload, topic); break; case "patrol": handlePatrolEvent(payload, topic); break; case "acquisition": handleAcquisitionEvent(payload, topic); break; case "perception": handlePerceptionEvent(payload, topic); break; case "collection": handleCollectionEvent(payload, topic); break; default: log.warn("未知的事件类型: {}, topic={}", eventType, topic); } } catch (Exception e) { log.error("处理MQTT消息失败, topic={}", topic, e); } } /** * 查询设备信息,如果设备不存在返回 null */ private SasDevice findDevice(String deviceId, Integer channel, Integer deviceTypeCode) { return deviceService.getOne(new LambdaQueryWrapper() .eq(SasDevice::getDeviceId, deviceId) .eq(SasDevice::getChannel, channel) .eq(SasDevice::getDeviceType, deviceTypeCode) .last("limit 1")); } /** * 查询事件类型分组信息 */ private SasEventTypeGroup findEventTypeGroup(Integer deviceTypeCode) { return eventTypeGroupService.getOne(new LambdaQueryWrapper() .eq(SasEventTypeGroup::getDeviceType, deviceTypeCode) .last("limit 1")); } @Transactional(rollbackFor = Exception.class) public void handleUsbEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class); if(message.getEventCode() == null && root.get("code") != null) { message.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.usb.getCode()); if (device == null) { log.warn("设备不存在,丢弃USB事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel()); return; } SasUsbEvent event = new SasUsbEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (usbEventService.getById(eventId) != null) { log.info("USB事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setCardId(message.getIcId()); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (message.getEventPic() != null) { UsbEventMessage.PicInfo picInfo = message.getEventPic(); if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) { SasPic pic = new SasPic(); pic.setId(IdUtil.randomUUID()); pic.setUrl(picInfo.getUrl()); pic.setPath(picInfo.getPath()); pic.setCreateTime(LocalDateTime.now()); pic.setUpdateTime(LocalDateTime.now()); sasPicMapper.insert(pic); // 设置事件图片ID event.setPicId(pic.getId()); } } usbEventService.save(event); // WebSocket 推送 BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(message.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasUsbEventCode code = usbEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.usb.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.usb.getCode()); SasWebSocket.sendAll(info); log.info("USB事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.usb.getCode()); } } catch (Exception e) { log.error("处理USB事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleSnapEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { SnapEventResult eventResult = (SnapEventResult)this.mapper.readValue(payload, SnapEventResult.class); log.info("实时智能分析"); Map map = new HashMap(); map.put("eventId", eventResult.getId()); JsonRpcRequest getEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null); Map getEventInfo = new HashMap(); getEventInfo.put("key", this.sasConfigService.getConfig().getKeyds()); getEventInfo.put("json", getEventInfoJson.toString()); log.info("请求AG报文:{}", JSONUtil.toJsonStr(getEventInfo)); String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/snap", getEventInfo); log.info("请求AG响应:{}", resultEvent); SnapEventInfoVo eventInfoVo = (SnapEventInfoVo)this.mapper.readValue(resultEvent, SnapEventInfoVo.class); SnapEventInfoResult result = eventInfoVo.getResult(); SnapEventInfo eventInfo = result.getEventInfo(); SasSnapEvent bean = (SasSnapEvent)BeanUtil.toBean(eventInfo, SasSnapEvent.class); BrieflyEventInfo info = (BrieflyEventInfo)BeanUtil.toBeanIgnoreError(bean, BrieflyEventInfo.class); if (eventInfo.getEventPic() != null) { SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class); pic.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(pic); bean.setEventPicId(pic.getId()); info.setEventUrl(pic.getUrl() + pic.getPath()); } if (eventInfo.getScenePic() != null) { SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getScenePic(), SasPic.class); pic.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(pic); bean.setScenePicId(pic.getId()); info.setSceneUrl(pic.getUrl() + pic.getPath()); } if(eventInfo.getEventCode() == null && root.get("code") != null) { eventInfo.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(eventInfo.getDeviceId(), eventInfo.getChannel(), SystemTypeCodeEnum.snap.getCode()); if (device == null) { log.warn("设备不存在,丢弃抓拍事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel()); return; } String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty() ? eventResult.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (snapEventService.getById(eventId) != null) { log.info("抓拍事件已存在, 跳过保存, eventId={}", eventId); return; } bean.setEventId(eventId); BeanUtil.copyProperties(eventInfo, bean, "eventId"); bean.setTriggerTime(parseTime(eventInfo.getTriggerTime())); bean.setCreateTime(LocalDateTime.now()); bean.setUpdateTime(LocalDateTime.now()); snapEventService.save(bean); info.setTriggerTime(eventInfo.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); SasSnapTypeCode code = snapTypeCodeMapper.selectById(bean.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.snap.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.snap.getCode()); SasWebSocket.sendAll(info); log.info("抓拍事件保存并推送成功, eventId={}", bean.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.snap.getCode()); } } catch (Exception e) { log.error("处理抓拍事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleEntranceEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class); if(message.getEventCode() == null && root.get("code") != null) { message.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.entrance.getCode()); if (device == null) { log.warn("设备不存在,丢弃门禁事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel()); return; } SasEntranceEvent event = new SasEntranceEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (entranceEventService.getById(eventId) != null) { log.info("门禁事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setCardId(message.getIcId()); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (message.getEntrancePic() != null) { EntranceEventMessage.PicInfo picInfo = message.getEntrancePic(); if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) { SasPic pic = new SasPic(); pic.setId(IdUtil.randomUUID()); pic.setUrl(picInfo.getUrl()); pic.setPath(picInfo.getPath()); pic.setCreateTime(LocalDateTime.now()); pic.setUpdateTime(LocalDateTime.now()); sasPicMapper.insert(pic); // 设置事件图片ID event.setPicId(pic.getId()); } } entranceEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(message.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasEntranceEventCode code = entranceEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.entrance.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.entrance.getCode()); SasWebSocket.sendAll(info); log.info("门禁事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.entrance.getCode()); } } catch (Exception e) { log.error("处理门禁事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleParkingEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { ParkingEventResult eventResult = (ParkingEventResult)this.mapper.readValue(payload, ParkingEventResult.class); Map map = new HashMap(); map.put("eventId", eventResult.getId()); JsonRpcRequest getParkingEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null); Map getParkingEventInfo = new HashMap(); getParkingEventInfo.put("key", this.sasConfigService.getConfig().getKeyds()); getParkingEventInfo.put("json", getParkingEventInfoJson.toString()); log.info("请求AG报文:{}", JSONUtil.toJsonStr(getParkingEventInfo)); String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/parking", getParkingEventInfo); log.info("请求AG响应:{}", resultEvent); ParkingEventInfoVo parkingEventInfoVo = (ParkingEventInfoVo)this.mapper.readValue(resultEvent, ParkingEventInfoVo.class); ParkingEventInfoResult result = parkingEventInfoVo.getResult(); ParkingEventInfo eventInfo = result.getEventInfo(); SasParkingEvent bean = (SasParkingEvent)BeanUtil.toBean(eventInfo, SasParkingEvent.class); bean.setCreateTime(LocalDateTime.parse(eventInfo.getReceivingTime(), this.formatter)); bean.setTriggerTime(LocalDateTime.parse(eventInfo.getTriggerTime(), this.formatter)); bean.setEventId(eventResult.getId()); bean.setAccessType(eventInfo.getIo()); bean.setCarType(eventInfo.getCarCode()); bean.setPlateType(eventInfo.getPlateCode()); if (eventInfo.getEventPic() != null) { SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class); pic.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(pic); bean.setEventPicId(pic.getId()); } if (eventInfo.getPlacePic() != null) { SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getPlacePic(), SasPic.class); pic.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(pic); bean.setPlatePicId(pic.getId()); } if(eventInfo.getEventCode() == null && root.get("code") != null) { eventInfo.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(eventInfo.getDeviceId(), eventInfo.getChannel(), SystemTypeCodeEnum.parking.getCode()); if (device == null) { log.warn("设备不存在,丢弃停车场事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel()); return; } SasParkingEvent event = new SasParkingEvent(); String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty() ? eventResult.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (parkingEventService.getById(eventId) != null) { log.info("停车场事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(eventInfo, event, "eventId"); event.setTriggerTime(parseTime(eventInfo.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); parkingEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(eventInfo.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (eventInfo.getEventPic() != null) { info.setEventUrl(eventInfo.getEventPic().getUrl() + eventInfo.getEventPic().getPath()); } if (eventInfo.getPlacePic() != null) { info.setEventUrl(eventInfo.getPlacePic().getUrl() + eventInfo.getPlacePic().getPath()); } SasParkingEventCode code = parkingEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.parking.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.parking.getCode()); SasWebSocket.sendAll(info); log.info("停车场事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.parking.getCode()); } } catch (Exception e) { log.error("处理停车场事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleRoadblockEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class); if(message.getEventCode() == null && root.get("code") != null) { message.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.roadblock.getCode()); if (device == null) { log.warn("设备不存在,丢弃阻车路障事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel()); return; } SasRoadblockEvent event = new SasRoadblockEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (roadblockEventService.getById(eventId) != null) { log.info("阻车路障事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (message.getRoadblockPic() != null) { RoadblockEventMessage.PicInfo picInfo = message.getRoadblockPic(); if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) { SasPic pic = new SasPic(); pic.setId(IdUtil.randomUUID()); pic.setUrl(picInfo.getUrl()); pic.setPath(picInfo.getPath()); pic.setCreateTime(LocalDateTime.now()); pic.setUpdateTime(LocalDateTime.now()); sasPicMapper.insert(pic); // 设置事件图片ID event.setPicId(pic.getId()); } } roadblockEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(message.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasRoadblockEventCode code = roadblockEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.roadblock.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.roadblock.getCode()); SasWebSocket.sendAll(info); log.info("阻车路障事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.roadblock.getCode()); } } catch (Exception e) { log.error("处理阻车路障事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleAlarsasEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { AlarmEventResult eventResult = (AlarmEventResult)this.mapper.readValue(payload, AlarmEventResult.class); Map map = new HashMap(); map.put("eventId", eventResult.getId()); JsonRpcRequest getAlarmEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null); Map getAlarmEventInfo = new HashMap(); getAlarmEventInfo.put("key", this.sasConfigService.getConfig().getKeyds()); getAlarmEventInfo.put("json", getAlarmEventInfoJson.toString()); log.info("请求AG报文:{}", JSONUtil.toJsonStr(getAlarmEventInfo)); String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/alarm", getAlarmEventInfo); log.info("请求AG响应:{}", resultEvent); AlarmEventInfoVo alarmEventInfoVo = (AlarmEventInfoVo)this.mapper.readValue(resultEvent, AlarmEventInfoVo.class); AlarmEventInfoResult result = alarmEventInfoVo.getResult(); AlarmEventInfo eventInfo = result.getEventInfo(); SasAlarsasEvent event = new SasAlarsasEvent(); if (eventInfo.getEventPic() != null) { SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class); pic.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(pic); event.setPicId(pic.getId()); } if (eventInfo.getGis() != null) { SasGis gis = (SasGis)BeanUtil.toBean(eventInfo.getGis(), SasGis.class); gis.setId(IdUtil.randomUUID()); this.gisMapper.insert(gis); event.setGisId(gis.getId()); } if(eventInfo.getEventCode() == null && root.get("code") != null) { eventInfo.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(eventInfo.getDeviceId(), eventInfo.getChannel(), SystemTypeCodeEnum.alarm.getCode()); if (device == null) { log.warn("设备不存在,丢弃入侵报警事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel()); return; } String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty() ? eventResult.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 // if (alarsasEventService.getById(eventId) != null) { // log.info("入侵报警事件已存在, 跳过保存, eventId={}", eventId); // return; // } event.setEventId(eventId); BeanUtil.copyProperties(eventResult, event, "eventId"); event.setTriggerTime(parseTime(eventInfo.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); alarsasEventService.saveOrUpdate(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(eventInfo.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasAlarsasEventCode code = alarsasEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.alarm.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.alarm.getCode()); SasWebSocket.sendAll(info); log.info("入侵报警事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.alarm.getCode()); } } catch (Exception e) { log.error("处理入侵报警事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handlePatrolEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { PatrolEventResult eventResult = (PatrolEventResult)this.mapper.readValue(payload, PatrolEventResult.class); Map map = new HashMap(); map.put("eventId", eventResult.getId()); JsonRpcRequest getPatrolEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null); Map getPatrolEventInfo = new HashMap(); getPatrolEventInfo.put("key", this.sasConfigService.getConfig().getKeyds()); getPatrolEventInfo.put("json", getPatrolEventInfoJson.toString()); log.info("请求AG报文:{}", JSONUtil.toJsonStr(getPatrolEventInfo)); String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/patrol", getPatrolEventInfo); log.info("请求AG响应:{}", resultEvent); PatrolEventInfoVo patrolEventInfoVo = (PatrolEventInfoVo)this.mapper.readValue(resultEvent, PatrolEventInfoVo.class); PatrolEventInfoResult result = patrolEventInfoVo.getResult(); PatrolEventInfo eventInfo = result.getEventInfo(); // 1. 检查设备是否存在 SasDevice device = findDevice(eventInfo.getDeviceId(), 0, SystemTypeCodeEnum.patrol.getCode()); if (device == null) { log.warn("设备不存在,丢弃巡检事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel()); return; } SasPatrolEvent event = new SasPatrolEvent(); String eventId = patrolEventInfoVo.getId() != null && !patrolEventInfoVo.getId().isEmpty() ? patrolEventInfoVo.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (patrolEventService.getById(eventId) != null) { log.info("巡检事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(eventInfo, event, "eventId"); // 关键:清洗空格 + 指定格式化器解析 String triggerTimeStr = eventInfo.getTriggerTime().replaceAll("\\s", " "); event.setTriggerTime(LocalDateTime.parse(triggerTimeStr, formatter)); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (eventInfo.getEventPic() != null) { SasPic pic = (SasPic) BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class); pic.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(pic); event.setPicId(pic.getId()); } if (eventInfo.getUserParams() != null) { SasPatrolUserParam userParam = (SasPatrolUserParam)BeanUtil.toBean(eventInfo.getUserParams(), SasPatrolUserParam.class); userParam.setId(IdUtil.randomUUID()); this.sasPatrolUserParamService.save(userParam); event.setUserParasasId(userParam.getId()); } patrolEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(eventInfo.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasPatrolEventCode code = patrolEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.patrol.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.patrol.getCode()); SasWebSocket.sendAll(info); log.info("巡检事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.patrol.getCode()); } } catch (Exception e) { log.error("处理巡检事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleAcquisitionEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class); if(message.getEventCode() == null && root.get("code") != null) { message.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.acquisition.getCode()); if (device == null) { log.warn("设备不存在,丢弃数据采集事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel()); return; } SasAcquisitionEvent event = new SasAcquisitionEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (acquisitionEventService.getById(eventId) != null) { log.info("数据采集事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (message.getAcquisitionPic() != null) { AcquisitionEventMessage.PicInfo picInfo = message.getAcquisitionPic(); if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) { SasPic pic = new SasPic(); pic.setId(IdUtil.randomUUID()); pic.setUrl(picInfo.getUrl()); pic.setPath(picInfo.getPath()); pic.setCreateTime(LocalDateTime.now()); pic.setUpdateTime(LocalDateTime.now()); sasPicMapper.insert(pic); // 设置事件图片ID event.setPicId(pic.getId()); } } acquisitionEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(message.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasAcquisitionEventCode code = acquisitionEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.acquisition.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.acquisition.getCode()); SasWebSocket.sendAll(info); log.info("数据采集事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.acquisition.getCode()); } } catch (Exception e) { log.error("处理数据采集事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handlePerceptionEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class); if(message.getEventCode() == null && root.get("code") != null) { message.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.perception.getCode()); if (device == null) { log.warn("设备不存在,丢弃状态感知事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel()); return; } SasPerceptionEvent event = new SasPerceptionEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (perceptionEventService.getById(eventId) != null) { log.info("状态感知事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (message.getPerceptionPic() != null) { PerceptionEventMessage.PicInfo picInfo = message.getPerceptionPic(); if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) { SasPic pic = new SasPic(); pic.setId(IdUtil.randomUUID()); pic.setUrl(picInfo.getUrl()); pic.setPath(picInfo.getPath()); pic.setCreateTime(LocalDateTime.now()); pic.setUpdateTime(LocalDateTime.now()); sasPicMapper.insert(pic); // 设置事件图片ID event.setPicId(pic.getId()); } } perceptionEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(message.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (event.getPicId() != null) { SasPic pic = sasPicMapper.selectById(event.getPicId()); if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) { info.setEventUrl(pic.getUrl() + pic.getPath()); } } SasPerceptionEventCode code = perceptionEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.perception.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.perception.getCode()); SasWebSocket.sendAll(info); log.info("状态感知事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.perception.getCode()); } } catch (Exception e) { log.error("处理状态感知事件失败", e); } } @Transactional(rollbackFor = Exception.class) public void handleCollectionEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class); if(message.getEventCode() == null && root.get("code") != null) { message.setEventCode(root.get("code").asInt()); } // 1. 检查设备是否存在 SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.collection.getCode()); if (device == null) { log.warn("设备不存在,丢弃状态采集事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel()); return; } SasCollectionEvent event = new SasCollectionEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (collectionEventService.getById(eventId) != null) { log.info("状态采集事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); // 处理图片信息 if (message.getScenePicInfo() != null) { SasPic bean = (SasPic)BeanUtil.toBean(message.getScenePicInfo(), SasPic.class); bean.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(bean); event.setScenePicId(bean.getId()); } if (message.getEventPicInfo() != null) { SasPic bean = (SasPic)BeanUtil.toBean(message.getEventPicInfo(), SasPic.class); bean.setId(IdUtil.randomUUID()); this.sasPicMapper.insert(bean); event.setPicId(bean.getId()); } collectionEventService.save(event); BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class); info.setTriggerTime(message.getTriggerTime()); info.setCreateTime(LocalDateTime.now().format(formatter)); // 设置图片URL(如果有) if (message.getEventPicInfo() != null) { info.setEventUrl(message.getEventPicInfo().getUrl() + message.getEventPicInfo().getPath()); } if (message.getScenePicInfo() != null) { info.setEventUrl(message.getScenePicInfo().getUrl() + message.getScenePicInfo().getPath()); } SasCollectionEventCode code = collectionEventCodeMapper.selectById(event.getEventCode()); if (code != null) { info.setEventTypeName(code.getName()); } enrichDeviceInfo(info, device); SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.collection.getCode()); if (eventTypeGroup != null) { info.setEventLevelId(eventTypeGroup.getEventLevel()); } info.setDeviceType(SystemTypeCodeEnum.collection.getCode()); SasWebSocket.sendAll(info); log.info("状态采集事件保存并推送成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { handleHeartbeat(payload, SystemTypeCodeEnum.collection.getCode()); } } catch (Exception e) { log.error("处理状态采集事件失败", e); } } /** * 统一处理心跳逻辑 */ private void handleHeartbeat(String payload, int deviceTypeCode) { try { GlobalDeviceHeartInfo heartInfo = mapper.readValue(payload, GlobalDeviceHeartInfo.class); List devices = deviceService.list(new LambdaQueryWrapper() .eq(SasDevice::getDeviceId, heartInfo.getDeviceId()) .eq(SasDevice::getDeviceType, deviceTypeCode)); if (CollUtil.isNotEmpty(devices)) { GisHeartInfo heartInfoGis = heartInfo.getGis(); List updateList = devices.stream().map(device -> { device.setTriggerTime(LocalDateTime.now()); device.setUpdateTime(LocalDateTime.now()); // 更新 GIS 信息 if (heartInfoGis != null && StrUtil.isNotBlank(device.getGisId())) { SasGis gis = gisMapper.selectById(device.getGisId()); if (gis != null) { // GisHeartInfo: longitude, latitude, altitude // SasGis: lon, lat, alt if (heartInfoGis.getLongitude() != null) gis.setLon(heartInfoGis.getLongitude()); if (heartInfoGis.getLatitude() != null) gis.setLat(heartInfoGis.getLatitude()); if (heartInfoGis.getAltitude() != null) gis.setAlt(heartInfoGis.getAltitude()); gisMapper.updateById(gis); } } return device; }).collect(Collectors.toList()); deviceService.updateBatchById(updateList); log.debug("更新设备心跳成功, deviceId={}, count={}", heartInfo.getDeviceId(), updateList.size()); } } catch (Exception e) { log.error("处理设备心跳失败", e); } } /** * 补充设备信息到 BrieflyEventInfo */ private void enrichDeviceInfo(BrieflyEventInfo info, SasDevice device) { if (device != null) { info.setAddress(device.getAddress()); if (StrUtil.isBlank(info.getNote())) { info.setNote(device.getNote()); } } } /** * 补充设备信息到 BrieflyEventInfo (重载,用于兼容) */ private void enrichDeviceInfo(BrieflyEventInfo info, String deviceId, Integer channel, Integer deviceTypeCode) { SasDevice device = findDevice(deviceId, channel, deviceTypeCode); enrichDeviceInfo(info, device); } private String extractDeviceId(String topic) { String[] parts = topic.split("/"); return parts.length > 0 ? parts[parts.length - 1] : "unknown"; } private LocalDateTime parseTime(String timeStr) { if (timeStr == null || timeStr.isEmpty()) { return LocalDateTime.now(); } try { return LocalDateTime.parse(timeStr, formatter); } catch (Exception e) { log.warn("时间解析失败: {}, 使用当前时间", timeStr); return LocalDateTime.now(); } } public void subscribe(String topic) { try { if (client != null && client.isConnected()) { client.subscribe(topic); log.info("订阅主题成功: {}", topic); } } catch (MqttException e) { log.error("订阅主题失败: {}", topic, e); } } public void pauseListening() { this.isListening = false; log.info("MQTT监听已暂停"); } public void resumeListening() { this.isListening = true; log.info("MQTT监听已恢复"); } public boolean getConnectionStatus() { return client != null && client.isConnected(); } public boolean getListeningStatus() { return isListening; } public void reconnect() { try { if (client != null) { if (client.isConnected()) { client.disconnect(); } client.close(); client = null; } isListening = true; init(); } catch (Exception e) { log.error("MQTT重连失败", e); isListening = false; } } @PreDestroy public void disconnect() { try { if (client != null) { if (client.isConnected()) { client.disconnect(); log.info("MQTT连接已断开"); } client.close(); client = null; } } catch (MqttException e) { log.error("断开MQTT连接失败", e); } } }