package com.usky.sas.mqtt; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.IdUtil; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.usky.sas.domain.*; import com.usky.sas.mqtt.dto.*; import com.usky.sas.service.*; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; @Slf4j @Service public class MqttService { /** clientId、topics 仍从配置文件读取,连接参数 host/port/username/password 从 sas_config 表获取 */ @Value("${mqtt.client-id:sas-client-${random.uuid}}") private String clientId; @Value("${mqtt.topics:}") private String topics; private MqttClient client; /** 是否处理消息:false 时收到消息不进入业务处理,用于启停监听 */ private volatile boolean isListening = true; private final ObjectMapper mapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private SasUsbEventService usbEventService; @Autowired private SasSnapEventService snapEventService; @Autowired private SasEntranceEventService entranceEventService; @Autowired private SasParkingEventService parkingEventService; @Autowired private SasRoadblockEventService roadblockEventService; @Autowired private SasAlarsasEventService alarsasEventService; @Autowired private SasPatrolEventService patrolEventService; @Autowired private SasAcquisitionEventService acquisitionEventService; @Autowired private SasPerceptionEventService perceptionEventService; @Autowired private SasCollectionEventService collectionEventService; @Autowired private SasConfigService sasConfigService; @PostConstruct public void init() { SasConfig config = sasConfigService.getConfig(); if (config == null || config.getHost() == null || config.getHost().isEmpty()) { log.warn("sas_config 中无 MQTT 配置或 host 为空,跳过 MQTT 连接"); isListening = false; return; } try { String host = config.getHost(); int port = parsePort(config.getPort(), 1883); String username = config.getUsername() != null ? config.getUsername() : ""; String password = config.getPassword() != null ? config.getPassword() : ""; String brokerUrl = "tcp://" + host + ":" + port; client = new MqttClient(brokerUrl, clientId); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); if (!username.isEmpty()) { options.setUserName(username); options.setPassword(password.toCharArray()); } options.setConnectionTimeout(60); options.setKeepAliveInterval(60); options.setAutomaticReconnect(true); client.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT连接成功, reconnect={}", reconnect); parseTopics().forEach(MqttService.this::subscribe); isListening = true; } @Override public void connectionLost(Throwable cause) { log.error("MQTT连接丢失: {}", cause.getMessage()); isListening = false; scheduleReconnect(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if (isListening) { processMessage(topic, message); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); client.connect(options); log.info("MQTT客户端初始化完成, clientId={}", clientId); isListening = true; } catch (MqttException e) { log.error("MQTT初始化失败: {}", e.getMessage(), e); isListening = false; } } private static int parsePort(String portStr, int defaultPort) { if (portStr == null || portStr.isEmpty()) { return defaultPort; } try { return Integer.parseInt(portStr.trim()); } catch (NumberFormatException e) { return defaultPort; } } /** * 连接丢失后延迟尝试主动重连(与自动重连互补) */ private void scheduleReconnect() { Thread t = new Thread(() -> { try { Thread.sleep(5000L); if (client != null && !client.isConnected()) { log.info("尝试主动重连MQTT..."); client.reconnect(); } } catch (Exception e) { log.error("MQTT主动重连失败", e); } }, "mqtt-reconnect"); t.setDaemon(true); t.start(); } private List parseTopics() { return Arrays.asList(topics.split(",")); } private void processMessage(String topic, MqttMessage message) { String payload = new String(message.getPayload()); log.debug("收到MQTT消息, topic={}, payload={}", topic, payload); try { // 从topic中提取事件类型,支持多级路径如 event/patrol/xxx 或 event/patrol String[] parts = topic.split("/"); if (parts.length < 2) { log.warn("未知的topic格式: {}", topic); return; } // 查找事件类型关键字在topic中的位置 String eventType = null; for (int i = 0; i < parts.length - 1; i++) { if ("event".equals(parts[i]) && i + 1 < parts.length) { eventType = parts[i + 1]; break; } } if (eventType == null) { log.warn("无法从topic解析事件类型: {}", topic); return; } switch (eventType) { case "usbalarm": handleUsbEvent(payload, topic); break; case "snap": handleSnapEvent(payload, topic); break; case "entrance": handleEntranceEvent(payload, topic); break; case "parking": handleParkingEvent(payload, topic); break; case "roadblock": handleRoadblockEvent(payload, topic); break; case "alarm": handleAlarsasEvent(payload, topic); break; case "patrol": handlePatrolEvent(payload, topic); break; case "acquisition": handleAcquisitionEvent(payload, topic); break; case "perception": handlePerceptionEvent(payload, topic); break; case "collection": handleCollectionEvent(payload, topic); break; default: log.warn("未知的事件类型: {}, topic={}", eventType, topic); } } catch (Exception e) { log.error("处理MQTT消息失败, topic={}", topic, e); } } private void handleUsbEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class); SasUsbEvent event = new SasUsbEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setCardId(message.getIcId()); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); usbEventService.save(event); log.info("USB事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到USB设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理USB事件失败: {}", e.getMessage(), e); } } private void handleSnapEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { SnapEventMessage message = mapper.readValue(payload, SnapEventMessage.class); SasSnapEvent event = new SasSnapEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过 if (snapEventService.getById(eventId) != null) { log.info("抓拍事件已存在, 跳过保存, eventId={}", eventId); return; } event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); snapEventService.save(event); log.info("抓拍事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到抓拍设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理抓拍事件失败: {}", e.getMessage(), e); } } private void handleEntranceEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class); SasEntranceEvent event = new SasEntranceEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setCardId(message.getIcId()); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); entranceEventService.save(event); log.info("门禁事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到门禁设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理门禁事件失败: {}", e.getMessage(), e); } } private void handleParkingEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { ParkingEventMessage message = mapper.readValue(payload, ParkingEventMessage.class); SasParkingEvent event = new SasParkingEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); parkingEventService.save(event); log.info("停车场事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到停车场设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理停车场事件失败: {}", e.getMessage(), e); } } private void handleRoadblockEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class); SasRoadblockEvent event = new SasRoadblockEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); roadblockEventService.save(event); log.info("阻车路障事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到阻车路障设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理阻车路障事件失败: {}", e.getMessage(), e); } } private void handleAlarsasEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { AlarsasEventMessage message = mapper.readValue(payload, AlarsasEventMessage.class); SasAlarsasEvent event = new SasAlarsasEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); alarsasEventService.save(event); log.info("入侵报警事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到入侵报警设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理入侵报警事件失败: {}", e.getMessage(), e); } } private void handlePatrolEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { PatrolEventMessage message = mapper.readValue(payload, PatrolEventMessage.class); SasPatrolEvent event = new SasPatrolEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); patrolEventService.save(event); log.info("巡检事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到巡检设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理巡检事件失败: {}", e.getMessage(), e); } } private void handleAcquisitionEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class); SasAcquisitionEvent event = new SasAcquisitionEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); acquisitionEventService.save(event); log.info("数据采集事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到数据采集设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理数据采集事件失败", e); } } private void handlePerceptionEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class); SasPerceptionEvent event = new SasPerceptionEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); perceptionEventService.save(event); log.info("状态感知事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到状态感知设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理状态感知事件失败", e); } } private void handleCollectionEvent(String payload, String topic) { try { JsonNode root = mapper.readTree(payload); String method = root.path("method").asText(); if ("event".equals(method)) { CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class); SasCollectionEvent event = new SasCollectionEvent(); String eventId = message.getId() != null && !message.getId().isEmpty() ? message.getId() : IdUtil.randomUUID(); event.setEventId(eventId); BeanUtil.copyProperties(message, event, "eventId"); event.setTriggerTime(parseTime(message.getTriggerTime())); event.setCreateTime(LocalDateTime.now()); event.setUpdateTime(LocalDateTime.now()); collectionEventService.save(event); log.info("状态采集事件保存成功, eventId={}", event.getEventId()); } else if ("heart".equals(method)) { log.debug("收到状态采集设备心跳, deviceId={}", extractDeviceId(topic)); } } catch (Exception e) { log.error("处理状态采集事件失败", e); } } private String extractDeviceId(String topic) { String[] parts = topic.split("/"); return parts.length > 0 ? parts[parts.length - 1] : "unknown"; } private LocalDateTime parseTime(String timeStr) { if (timeStr == null || timeStr.isEmpty()) { return LocalDateTime.now(); } try { return LocalDateTime.parse(timeStr, formatter); } catch (Exception e) { log.warn("时间解析失败: {}, 使用当前时间", timeStr); return LocalDateTime.now(); } } public void subscribe(String topic) { try { if (client != null && client.isConnected()) { client.subscribe(topic); log.info("订阅主题成功: {}", topic); } } catch (MqttException e) { log.error("订阅主题失败: {}", topic, e); } } /** * 暂停监听:不再处理新到达的 MQTT 消息,连接保持 */ public void pauseListening() { this.isListening = false; log.info("MQTT监听已暂停"); } /** * 恢复监听:继续处理 MQTT 消息 */ public void resumeListening() { this.isListening = true; log.info("MQTT监听已恢复"); } /** * 查询连接状态:是否已连接 Broker */ public boolean getConnectionStatus() { return client != null && client.isConnected(); } /** * 查询监听状态:是否正在处理消息(未暂停) */ public boolean getListeningStatus() { return isListening; } /** * 主动重连:先断开并关闭旧连接,再重新初始化连接并订阅 */ public void reconnect() { try { if (client != null) { if (client.isConnected()) { client.disconnect(); } client.close(); client = null; } isListening = true; init(); } catch (Exception e) { log.error("MQTT重连失败", e); isListening = false; } } @PreDestroy public void disconnect() { try { if (client != null) { if (client.isConnected()) { client.disconnect(); log.info("MQTT连接已断开"); } client.close(); client = null; } } catch (MqttException e) { log.error("断开MQTT连接失败", e); } } }