|
|
@@ -0,0 +1,735 @@
|
|
|
+package com.usky.sas.mqtt;
|
|
|
+
|
|
|
+import cn.hutool.core.bean.BeanUtil;
|
|
|
+import cn.hutool.core.util.IdUtil;
|
|
|
+import com.fasterxml.jackson.databind.JsonNode;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.usky.sas.domain.*;
|
|
|
+import com.usky.sas.mqtt.dto.*;
|
|
|
+import com.usky.sas.service.*;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class MqttService {
|
|
|
+
|
|
|
+ @Value("${mqtt.host:localhost}")
|
|
|
+ private String host;
|
|
|
+
|
|
|
+ @Value("${mqtt.port:1883}")
|
|
|
+ private int port;
|
|
|
+
|
|
|
+ @Value("${mqtt.client-id:sas-client-${random.uuid}}")
|
|
|
+ private String clientId;
|
|
|
+
|
|
|
+ @Value("${mqtt.username:}")
|
|
|
+ private String username;
|
|
|
+
|
|
|
+ @Value("${mqtt.password:}")
|
|
|
+ private String password;
|
|
|
+
|
|
|
+ // 修改为 String 类型,逗号分隔
|
|
|
+ @Value("${mqtt.topics:}")
|
|
|
+ private String topics;
|
|
|
+
|
|
|
+ private MqttClient client;
|
|
|
+ private final ObjectMapper mapper = new ObjectMapper();
|
|
|
+ private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasUsbEventService usbEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasSnapEventService snapEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasEntranceEventService entranceEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasParkingEventService parkingEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasRoadblockEventService roadblockEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasAlarsasEventService alarsasEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasPatrolEventService patrolEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasAcquisitionEventService acquisitionEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasPerceptionEventService perceptionEventService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private SasCollectionEventService collectionEventService;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化MQTT连接
|
|
|
+ */
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ try {
|
|
|
+ String brokerUrl = "tcp://" + host + ":" + port;
|
|
|
+ client = new MqttClient(brokerUrl, clientId);
|
|
|
+
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ options.setCleanSession(true);
|
|
|
+ if (!username.isEmpty()) {
|
|
|
+ options.setUserName(username);
|
|
|
+ options.setPassword(password.toCharArray());
|
|
|
+ }
|
|
|
+ options.setConnectionTimeout(60);
|
|
|
+ options.setKeepAliveInterval(60);
|
|
|
+ options.setAutomaticReconnect(true);
|
|
|
+
|
|
|
+ // 设置回调
|
|
|
+ client.setCallback(new MqttCallbackExtended() {
|
|
|
+ @Override
|
|
|
+ public void connectComplete(boolean reconnect, String serverURI) {
|
|
|
+ log.info("MQTT连接成功, reconnect={}", reconnect);
|
|
|
+ // 解析并订阅所有主题
|
|
|
+ parseTopics().forEach(MqttService.this::subscribe);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ log.error("MQTT连接丢失: {}", cause.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
+ processMessage(topic, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ client.connect(options);
|
|
|
+ log.info("MQTT客户端初始化完成, clientId={}", clientId);
|
|
|
+
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("MQTT初始化失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解析逗号分隔的主题
|
|
|
+ */
|
|
|
+ private List<String> parseTopics() {
|
|
|
+ return Arrays.asList(topics.split(","));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消息处理分发
|
|
|
+ */
|
|
|
+ private void processMessage(String topic, MqttMessage message) {
|
|
|
+ String payload = new String(message.getPayload());
|
|
|
+ log.info("收到MQTT消息, topic={}, payload={}", topic, payload);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // USB事件
|
|
|
+ if (topic.contains("event/usbalarm")) {
|
|
|
+ handleUsbEvent(payload, topic);
|
|
|
+ }
|
|
|
+ // 抓拍事件
|
|
|
+ else if (topic.contains("event/snap")) {
|
|
|
+ handleSnapEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 出入门禁事件
|
|
|
+ else if (topic.contains("event/entrance")) {
|
|
|
+ handleEntranceEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 停车场车牌抓拍事件
|
|
|
+ else if (topic.contains("event/parking")) {
|
|
|
+ handleParkingEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 阻车路障探测事件
|
|
|
+ else if (topic.contains("event/roadblock")) {
|
|
|
+ handleRoadblockEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 入侵紧急报警事件
|
|
|
+ else if (topic.contains("event/alarm")) {
|
|
|
+ handleAlarsasEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 实时电子巡检事件
|
|
|
+ else if (topic.contains("event/patrol")) {
|
|
|
+ handlePatrolEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据采集事件
|
|
|
+ else if (topic.contains("event/acquisition")) {
|
|
|
+ handleAcquisitionEvent(payload, topic);
|
|
|
+ }
|
|
|
+ // 状态感知事件
|
|
|
+ else if (topic.contains("event/perception")) {
|
|
|
+ handlePerceptionEvent(payload, topic);
|
|
|
+ }
|
|
|
+ // 状态采集事件
|
|
|
+ else if (topic.contains("event/collection")) {
|
|
|
+ handleCollectionEvent(payload, topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理MQTT消息失败, topic={}", topic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理USB事件(修复版:确保eventId正确设置)
|
|
|
+ */
|
|
|
+ private void handleUsbEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class);
|
|
|
+
|
|
|
+ // 关键修复:先创建实体,先生成ID,再复制其他属性
|
|
|
+ SasUsbEvent event = new SasUsbEvent();
|
|
|
+
|
|
|
+ // 步骤1:先生成并设置 eventId(必须在copyProperties之前)
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志:确认eventId已设置
|
|
|
+ log.debug("生成的eventId: {}", eventId);
|
|
|
+ log.debug("设置后event.getEventId(): {}", event.getEventId());
|
|
|
+
|
|
|
+ // 步骤2:复制其他属性,但忽略eventId字段(防止被null覆盖)
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 步骤3:设置其他特殊字段映射
|
|
|
+ event.setCardId(message.getIcId());
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime()));
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存USB事件: eventId={}, deviceId={}, eventCode={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getEventCode());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ usbEventService.save(event);
|
|
|
+ log.info("USB事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理USB事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理抓拍事件
|
|
|
+ */
|
|
|
+ private void handleSnapEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ SnapEventMessage message = mapper.readValue(payload, SnapEventMessage.class);
|
|
|
+
|
|
|
+ // 1. 创建实体并生成唯一eventId(防止被覆盖)
|
|
|
+ SasSnapEvent event = new SasSnapEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志
|
|
|
+ log.debug("生成的抓拍事件eventId: {}", eventId);
|
|
|
+
|
|
|
+ // 2. 复制属性,忽略eventId字段
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 3. 特殊字段映射和转换
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime())); // 时间字符串转LocalDateTime
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("抓拍事件eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存抓拍事件: eventId={}, deviceId={}, eventCode={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getEventCode());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ snapEventService.save(event);
|
|
|
+ log.info("抓拍事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到抓拍设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理抓拍事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理出入门禁事件
|
|
|
+ */
|
|
|
+ private void handleEntranceEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class);
|
|
|
+
|
|
|
+ // 1. 创建实体并生成唯一eventId(防止被覆盖)
|
|
|
+ SasEntranceEvent event = new SasEntranceEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志
|
|
|
+ log.debug("生成的门禁事件eventId: {}", eventId);
|
|
|
+
|
|
|
+ // 2. 复制属性,忽略eventId字段
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 3. 特殊字段映射和转换
|
|
|
+ event.setCardId(message.getIcId()); // icId映射为cardId(与USB事件一致)
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime())); // 时间字符串转LocalDateTime
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("门禁事件eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存门禁事件: eventId={}, deviceId={}, eventCode={}, name={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getEventCode(), event.getName());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ entranceEventService.save(event);
|
|
|
+ log.info("门禁事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到门禁设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理门禁事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理停车场车牌抓拍事件
|
|
|
+ */
|
|
|
+ private void handleParkingEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ ParkingEventMessage message = mapper.readValue(payload, ParkingEventMessage.class);
|
|
|
+
|
|
|
+ // 1. 创建实体并生成唯一eventId(防止被覆盖)
|
|
|
+ SasParkingEvent event = new SasParkingEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志
|
|
|
+ log.debug("生成的停车场事件eventId: {}", eventId);
|
|
|
+
|
|
|
+ // 2. 复制属性,忽略eventId字段
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 3. 特殊字段映射和转换
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime())); // 时间字符串转LocalDateTime
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("停车场事件eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存停车场事件: eventId={}, deviceId={}, plateNo={}, accessType={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getPlateNo(), event.getAccessType());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ parkingEventService.save(event);
|
|
|
+ log.info("停车场事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到停车场设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理停车场事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理阻车路障探测事件
|
|
|
+ */
|
|
|
+ private void handleRoadblockEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class);
|
|
|
+
|
|
|
+ // 1. 创建实体并生成唯一eventId(防止被覆盖)
|
|
|
+ SasRoadblockEvent event = new SasRoadblockEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志
|
|
|
+ log.debug("生成的阻车路障事件eventId: {}", eventId);
|
|
|
+
|
|
|
+ // 2. 复制属性,忽略eventId字段
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 3. 特殊字段映射和转换
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime())); // 时间字符串转LocalDateTime
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("阻车路障事件eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存阻车路障事件: eventId={}, deviceId={}, eventCode={}, val={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getEventCode(), event.getVal());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ roadblockEventService.save(event);
|
|
|
+ log.info("阻车路障事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到阻车路障设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理阻车路障事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理入侵紧急报警事件
|
|
|
+ */
|
|
|
+ private void handleAlarsasEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ AlarsasEventMessage message = mapper.readValue(payload, AlarsasEventMessage.class);
|
|
|
+
|
|
|
+ // 1. 创建实体并生成唯一eventId(防止被覆盖)
|
|
|
+ SasAlarsasEvent event = new SasAlarsasEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志
|
|
|
+ log.debug("生成的入侵报警事件eventId: {}", eventId);
|
|
|
+
|
|
|
+ // 2. 复制属性,忽略eventId字段
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 3. 特殊字段映射和转换
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime())); // 时间字符串转LocalDateTime
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("入侵报警事件eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存入侵报警事件: eventId={}, deviceId={}, eventCode={}, relPerson={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getEventCode(), event.getRelPerson());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ alarsasEventService.save(event);
|
|
|
+ log.info("入侵报警事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到入侵报警设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理入侵报警事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理实时电子巡检事件
|
|
|
+ */
|
|
|
+ private void handlePatrolEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ PatrolEventMessage message = mapper.readValue(payload, PatrolEventMessage.class);
|
|
|
+
|
|
|
+ // 1. 创建实体并生成唯一eventId(防止被覆盖)
|
|
|
+ SasPatrolEvent event = new SasPatrolEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 调试日志
|
|
|
+ log.debug("生成的巡检事件eventId: {}", eventId);
|
|
|
+
|
|
|
+ // 2. 复制属性,忽略eventId字段
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 3. 特殊字段映射和转换
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime())); // 时间字符串转LocalDateTime
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 最终检查:确保eventId不为null
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ log.error("巡检事件eventId为null,重新生成");
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调试日志:保存前的最终状态
|
|
|
+ log.info("保存巡检事件: eventId={}, deviceId={}, eventCode={}, name={}",
|
|
|
+ event.getEventId(), event.getDeviceId(), event.getEventCode(), event.getName());
|
|
|
+
|
|
|
+ // 保存到数据库
|
|
|
+ patrolEventService.save(event);
|
|
|
+ log.info("巡检事件保存成功, eventId={}", event.getEventId());
|
|
|
+ }
|
|
|
+ else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到巡检设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理巡检事件失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理数据采集设备事件
|
|
|
+ */
|
|
|
+ private void handleAcquisitionEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class);
|
|
|
+
|
|
|
+ // 生成eventId并创建实体
|
|
|
+ SasAcquisitionEvent event = new SasAcquisitionEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 复制属性(忽略eventId)
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 时间转换和通用字段设置
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime()));
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 兜底检查
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 保存数据
|
|
|
+ acquisitionEventService.save(event);
|
|
|
+ log.info("数据采集事件保存成功, eventId={}, deviceId={}", event.getEventId(), event.getDeviceId());
|
|
|
+ } else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到数据采集设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理数据采集事件失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理状态感知探测事件
|
|
|
+ */
|
|
|
+ private void handlePerceptionEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class);
|
|
|
+
|
|
|
+ // 生成eventId并创建实体
|
|
|
+ SasPerceptionEvent event = new SasPerceptionEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 复制属性(忽略eventId)
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 时间转换和通用字段设置
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime()));
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 兜底检查
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 保存数据
|
|
|
+ perceptionEventService.save(event);
|
|
|
+ log.info("状态感知事件保存成功, eventId={}, deviceId={}", event.getEventId(), event.getDeviceId());
|
|
|
+ } else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到状态感知设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理状态感知事件失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理状态采集探测事件
|
|
|
+ */
|
|
|
+ private void handleCollectionEvent(String payload, String topic) {
|
|
|
+ try {
|
|
|
+ JsonNode root = mapper.readTree(payload);
|
|
|
+ String method = root.path("method").asText();
|
|
|
+
|
|
|
+ if ("event".equals(method)) {
|
|
|
+ CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class);
|
|
|
+
|
|
|
+ // 生成eventId并创建实体
|
|
|
+ SasCollectionEvent event = new SasCollectionEvent();
|
|
|
+ String eventId = IdUtil.randomUUID();
|
|
|
+ event.setEventId(eventId);
|
|
|
+
|
|
|
+ // 复制属性(忽略eventId)
|
|
|
+ BeanUtil.copyProperties(message, event, "eventId");
|
|
|
+
|
|
|
+ // 时间转换和通用字段设置
|
|
|
+ event.setTriggerTime(parseTime(message.getTriggerTime()));
|
|
|
+ event.setCreateTime(LocalDateTime.now());
|
|
|
+ event.setUpdateTime(LocalDateTime.now());
|
|
|
+
|
|
|
+ // 兜底检查
|
|
|
+ if (event.getEventId() == null) {
|
|
|
+ event.setEventId(IdUtil.randomUUID());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 保存数据
|
|
|
+ collectionEventService.save(event);
|
|
|
+ log.info("状态采集事件保存成功, eventId={}, deviceId={}", event.getEventId(), event.getDeviceId());
|
|
|
+ } else if ("heart".equals(method)) {
|
|
|
+ String deviceId = extractDeviceId(topic);
|
|
|
+ log.info("收到状态采集设备心跳, deviceId={}", deviceId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理状态采集事件失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从主题提取设备ID
|
|
|
+ */
|
|
|
+ private String extractDeviceId(String topic) {
|
|
|
+ String[] parts = topic.split("/");
|
|
|
+ return parts.length > 0 ? parts[parts.length - 1] : "unknown";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解析时间字符串
|
|
|
+ */
|
|
|
+ private LocalDateTime parseTime(String timeStr) {
|
|
|
+ if (timeStr == null || timeStr.isEmpty()) {
|
|
|
+ return LocalDateTime.now();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ return LocalDateTime.parse(timeStr, formatter);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("时间解析失败: {}, 使用当前时间", timeStr);
|
|
|
+ return LocalDateTime.now();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅主题
|
|
|
+ */
|
|
|
+ public void subscribe(String topic) {
|
|
|
+ try {
|
|
|
+ if (client != null && client.isConnected()) {
|
|
|
+ client.subscribe(topic);
|
|
|
+ log.info("订阅主题成功: {}", topic);
|
|
|
+ }
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("订阅主题失败: {}", topic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 断开连接
|
|
|
+ */
|
|
|
+ @PreDestroy
|
|
|
+ public void disconnect() {
|
|
|
+ try {
|
|
|
+ if (client != null && client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ log.info("MQTT连接已断开");
|
|
|
+ }
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("断开MQTT连接失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|