| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621 |
- package com.usky.sas.mqtt;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.util.IdUtil;
- import com.fasterxml.jackson.databind.DeserializationFeature;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.usky.sas.domain.*;
- import com.usky.sas.mqtt.dto.*;
- import com.usky.sas.service.*;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.Arrays;
- import java.util.List;
- @Slf4j
- @Service
- public class MqttService {
- /** clientId、topics 仍从配置文件读取,连接参数 host/port/username/password 从 sas_config 表获取 */
- @Value("${mqtt.client-id:sas-client-${random.uuid}}")
- private String clientId;
- @Value("${mqtt.topics:}")
- private String topics;
- private MqttClient client;
- /** 是否处理消息:false 时收到消息不进入业务处理,用于启停监听 */
- private volatile boolean isListening = true;
- private final ObjectMapper mapper = new ObjectMapper()
- .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- @Autowired
- private SasUsbEventService usbEventService;
- @Autowired
- private SasSnapEventService snapEventService;
- @Autowired
- private SasEntranceEventService entranceEventService;
- @Autowired
- private SasParkingEventService parkingEventService;
- @Autowired
- private SasRoadblockEventService roadblockEventService;
- @Autowired
- private SasAlarsasEventService alarsasEventService;
- @Autowired
- private SasPatrolEventService patrolEventService;
- @Autowired
- private SasAcquisitionEventService acquisitionEventService;
- @Autowired
- private SasPerceptionEventService perceptionEventService;
- @Autowired
- private SasCollectionEventService collectionEventService;
- @Autowired
- private SasConfigService sasConfigService;
- @PostConstruct
- public void init() {
- SasConfig config = sasConfigService.getConfig();
- if (config == null || config.getHost() == null || config.getHost().isEmpty()) {
- log.warn("sas_config 中无 MQTT 配置或 host 为空,跳过 MQTT 连接");
- isListening = false;
- return;
- }
- try {
- String host = config.getHost();
- int port = parsePort(config.getPort(), 1883);
- String username = config.getUsername() != null ? config.getUsername() : "";
- String password = config.getPassword() != null ? config.getPassword() : "";
- String brokerUrl = "tcp://" + host + ":" + port;
- client = new MqttClient(brokerUrl, clientId);
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);
- if (!username.isEmpty()) {
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- }
- options.setConnectionTimeout(60);
- options.setKeepAliveInterval(60);
- options.setAutomaticReconnect(true);
- client.setCallback(new MqttCallbackExtended() {
- @Override
- public void connectComplete(boolean reconnect, String serverURI) {
- log.info("MQTT连接成功, reconnect={}", reconnect);
- parseTopics().forEach(MqttService.this::subscribe);
- isListening = true;
- }
- @Override
- public void connectionLost(Throwable cause) {
- log.error("MQTT连接丢失: {}", cause.getMessage());
- isListening = false;
- scheduleReconnect();
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- if (isListening) {
- processMessage(topic, message);
- }
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- }
- });
- client.connect(options);
- log.info("MQTT客户端初始化完成, clientId={}", clientId);
- isListening = true;
- } catch (MqttException e) {
- log.error("MQTT初始化失败: {}", e.getMessage(), e);
- isListening = false;
- }
- }
- private static int parsePort(String portStr, int defaultPort) {
- if (portStr == null || portStr.isEmpty()) {
- return defaultPort;
- }
- try {
- return Integer.parseInt(portStr.trim());
- } catch (NumberFormatException e) {
- return defaultPort;
- }
- }
- /**
- * 连接丢失后延迟尝试主动重连(与自动重连互补)
- */
- private void scheduleReconnect() {
- Thread t = new Thread(() -> {
- try {
- Thread.sleep(5000L);
- if (client != null && !client.isConnected()) {
- log.info("尝试主动重连MQTT...");
- client.reconnect();
- }
- } catch (Exception e) {
- log.error("MQTT主动重连失败", e);
- }
- }, "mqtt-reconnect");
- t.setDaemon(true);
- t.start();
- }
- private List<String> parseTopics() {
- return Arrays.asList(topics.split(","));
- }
- private void processMessage(String topic, MqttMessage message) {
- String payload = new String(message.getPayload());
- log.debug("收到MQTT消息, topic={}, payload={}", topic, payload);
- try {
- // 从topic中提取事件类型,支持多级路径如 event/patrol/xxx 或 event/patrol
- String[] parts = topic.split("/");
- if (parts.length < 2) {
- log.warn("未知的topic格式: {}", topic);
- return;
- }
- // 查找事件类型关键字在topic中的位置
- String eventType = null;
- for (int i = 0; i < parts.length - 1; i++) {
- if ("event".equals(parts[i]) && i + 1 < parts.length) {
- eventType = parts[i + 1];
- break;
- }
- }
- if (eventType == null) {
- log.warn("无法从topic解析事件类型: {}", topic);
- return;
- }
- switch (eventType) {
- case "usbalarm":
- handleUsbEvent(payload, topic);
- break;
- case "snap":
- handleSnapEvent(payload, topic);
- break;
- case "entrance":
- handleEntranceEvent(payload, topic);
- break;
- case "parking":
- handleParkingEvent(payload, topic);
- break;
- case "roadblock":
- handleRoadblockEvent(payload, topic);
- break;
- case "alarm":
- handleAlarsasEvent(payload, topic);
- break;
- case "patrol":
- handlePatrolEvent(payload, topic);
- break;
- case "acquisition":
- handleAcquisitionEvent(payload, topic);
- break;
- case "perception":
- handlePerceptionEvent(payload, topic);
- break;
- case "collection":
- handleCollectionEvent(payload, topic);
- break;
- default:
- log.warn("未知的事件类型: {}, topic={}", eventType, topic);
- }
- } catch (Exception e) {
- log.error("处理MQTT消息失败, topic={}", topic, e);
- }
- }
- private void handleUsbEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class);
- SasUsbEvent event = new SasUsbEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setCardId(message.getIcId());
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- usbEventService.save(event);
- log.info("USB事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到USB设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理USB事件失败: {}", e.getMessage(), e);
- }
- }
- private void handleSnapEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- SnapEventMessage message = mapper.readValue(payload, SnapEventMessage.class);
- SasSnapEvent event = new SasSnapEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
- if (snapEventService.getById(eventId) != null) {
- log.info("抓拍事件已存在, 跳过保存, eventId={}", eventId);
- return;
- }
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- snapEventService.save(event);
- log.info("抓拍事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到抓拍设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理抓拍事件失败: {}", e.getMessage(), e);
- }
- }
- private void handleEntranceEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class);
- SasEntranceEvent event = new SasEntranceEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setCardId(message.getIcId());
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- entranceEventService.save(event);
- log.info("门禁事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到门禁设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理门禁事件失败: {}", e.getMessage(), e);
- }
- }
- private void handleParkingEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- ParkingEventMessage message = mapper.readValue(payload, ParkingEventMessage.class);
- SasParkingEvent event = new SasParkingEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- parkingEventService.save(event);
- log.info("停车场事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到停车场设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理停车场事件失败: {}", e.getMessage(), e);
- }
- }
- private void handleRoadblockEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class);
- SasRoadblockEvent event = new SasRoadblockEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- roadblockEventService.save(event);
- log.info("阻车路障事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到阻车路障设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理阻车路障事件失败: {}", e.getMessage(), e);
- }
- }
- private void handleAlarsasEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- AlarsasEventMessage message = mapper.readValue(payload, AlarsasEventMessage.class);
- SasAlarsasEvent event = new SasAlarsasEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- alarsasEventService.save(event);
- log.info("入侵报警事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到入侵报警设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理入侵报警事件失败: {}", e.getMessage(), e);
- }
- }
- private void handlePatrolEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- PatrolEventMessage message = mapper.readValue(payload, PatrolEventMessage.class);
- SasPatrolEvent event = new SasPatrolEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- patrolEventService.save(event);
- log.info("巡检事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到巡检设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理巡检事件失败: {}", e.getMessage(), e);
- }
- }
- private void handleAcquisitionEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class);
- SasAcquisitionEvent event = new SasAcquisitionEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- acquisitionEventService.save(event);
- log.info("数据采集事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到数据采集设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理数据采集事件失败", e);
- }
- }
- private void handlePerceptionEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class);
- SasPerceptionEvent event = new SasPerceptionEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- perceptionEventService.save(event);
- log.info("状态感知事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到状态感知设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理状态感知事件失败", e);
- }
- }
- private void handleCollectionEvent(String payload, String topic) {
- try {
- JsonNode root = mapper.readTree(payload);
- String method = root.path("method").asText();
- if ("event".equals(method)) {
- CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class);
- SasCollectionEvent event = new SasCollectionEvent();
- String eventId = message.getId() != null && !message.getId().isEmpty()
- ? message.getId()
- : IdUtil.randomUUID();
- event.setEventId(eventId);
- BeanUtil.copyProperties(message, event, "eventId");
- event.setTriggerTime(parseTime(message.getTriggerTime()));
- event.setCreateTime(LocalDateTime.now());
- event.setUpdateTime(LocalDateTime.now());
- collectionEventService.save(event);
- log.info("状态采集事件保存成功, eventId={}", event.getEventId());
- } else if ("heart".equals(method)) {
- log.debug("收到状态采集设备心跳, deviceId={}", extractDeviceId(topic));
- }
- } catch (Exception e) {
- log.error("处理状态采集事件失败", e);
- }
- }
- private String extractDeviceId(String topic) {
- String[] parts = topic.split("/");
- return parts.length > 0 ? parts[parts.length - 1] : "unknown";
- }
- private LocalDateTime parseTime(String timeStr) {
- if (timeStr == null || timeStr.isEmpty()) {
- return LocalDateTime.now();
- }
- try {
- return LocalDateTime.parse(timeStr, formatter);
- } catch (Exception e) {
- log.warn("时间解析失败: {}, 使用当前时间", timeStr);
- return LocalDateTime.now();
- }
- }
- public void subscribe(String topic) {
- try {
- if (client != null && client.isConnected()) {
- client.subscribe(topic);
- log.info("订阅主题成功: {}", topic);
- }
- } catch (MqttException e) {
- log.error("订阅主题失败: {}", topic, e);
- }
- }
- /**
- * 暂停监听:不再处理新到达的 MQTT 消息,连接保持
- */
- public void pauseListening() {
- this.isListening = false;
- log.info("MQTT监听已暂停");
- }
- /**
- * 恢复监听:继续处理 MQTT 消息
- */
- public void resumeListening() {
- this.isListening = true;
- log.info("MQTT监听已恢复");
- }
- /**
- * 查询连接状态:是否已连接 Broker
- */
- public boolean getConnectionStatus() {
- return client != null && client.isConnected();
- }
- /**
- * 查询监听状态:是否正在处理消息(未暂停)
- */
- public boolean getListeningStatus() {
- return isListening;
- }
- /**
- * 主动重连:先断开并关闭旧连接,再重新初始化连接并订阅
- */
- public void reconnect() {
- try {
- if (client != null) {
- if (client.isConnected()) {
- client.disconnect();
- }
- client.close();
- client = null;
- }
- isListening = true;
- init();
- } catch (Exception e) {
- log.error("MQTT重连失败", e);
- isListening = false;
- }
- }
- @PreDestroy
- public void disconnect() {
- try {
- if (client != null) {
- if (client.isConnected()) {
- client.disconnect();
- log.info("MQTT连接已断开");
- }
- client.close();
- client = null;
- }
- } catch (MqttException e) {
- log.error("断开MQTT连接失败", e);
- }
- }
- }
|