MqttService.java 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310
  1. package com.usky.sas.mqtt;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.util.IdUtil;
  5. import cn.hutool.core.util.StrUtil;
  6. import cn.hutool.http.HttpUtil;
  7. import cn.hutool.json.JSONUtil;
  8. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  9. import com.fasterxml.jackson.databind.DeserializationFeature;
  10. import com.fasterxml.jackson.databind.JsonNode;
  11. import com.fasterxml.jackson.databind.ObjectMapper;
  12. import com.usky.sas.common.enums.SystemTypeCodeEnum;
  13. import com.usky.sas.common.global.SasWebSocket;
  14. import com.usky.sas.domain.*;
  15. import com.usky.sas.mapper.*;
  16. import com.usky.sas.mqtt.dto.*;
  17. import com.usky.sas.service.*;
  18. import com.usky.sas.service.dto.agbox.JsonRpcRequest;
  19. import com.usky.sas.service.vo.*;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.eclipse.paho.client.mqttv3.*;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.beans.factory.annotation.Value;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.transaction.annotation.Transactional;
  26. import javax.annotation.PostConstruct;
  27. import javax.annotation.PreDestroy;
  28. import java.time.LocalDateTime;
  29. import java.time.format.DateTimeFormatter;
  30. import java.util.Arrays;
  31. import java.util.HashMap;
  32. import java.util.List;
  33. import java.util.Map;
  34. import java.util.stream.Collectors;
  35. @Slf4j
  36. @Service
  37. public class MqttService {
  38. @Value("${mqtt.client-id:sas-client-${random.uuid}}")
  39. private String clientId;
  40. @Value("${mqtt.topics:}")
  41. private String topics;
  42. private MqttClient client;
  43. private volatile boolean isListening = true;
  44. private final ObjectMapper mapper = new ObjectMapper()
  45. .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  46. private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  47. @Autowired
  48. private SasUsbEventService usbEventService;
  49. @Autowired
  50. private SasSnapEventService snapEventService;
  51. @Autowired
  52. private SasEntranceEventService entranceEventService;
  53. @Autowired
  54. private SasParkingEventService parkingEventService;
  55. @Autowired
  56. private SasRoadblockEventService roadblockEventService;
  57. @Autowired
  58. private SasAlarsasEventService alarsasEventService;
  59. @Autowired
  60. private SasPatrolEventService patrolEventService;
  61. @Autowired
  62. private SasAcquisitionEventService acquisitionEventService;
  63. @Autowired
  64. private SasPerceptionEventService perceptionEventService;
  65. @Autowired
  66. private SasCollectionEventService collectionEventService;
  67. @Autowired
  68. private SasConfigService sasConfigService;
  69. @Autowired
  70. private SasDeviceService deviceService;
  71. @Autowired
  72. private SasEventTypeGroupService eventTypeGroupService;
  73. @Autowired
  74. private SasPatrolUserParamService sasPatrolUserParamService;
  75. @Autowired
  76. private SasGisMapper gisMapper;
  77. @Autowired
  78. private SasPicMapper sasPicMapper;
  79. // Event Code Mappers
  80. @Autowired
  81. private SasUsbEventCodeMapper usbEventCodeMapper;
  82. @Autowired
  83. private SasSnapTypeCodeMapper snapTypeCodeMapper;
  84. @Autowired
  85. private SasEntranceEventCodeMapper entranceEventCodeMapper;
  86. @Autowired
  87. private SasParkingEventCodeMapper parkingEventCodeMapper;
  88. @Autowired
  89. private SasRoadblockEventCodeMapper roadblockEventCodeMapper;
  90. @Autowired
  91. private SasAlarsasEventCodeMapper alarsasEventCodeMapper;
  92. @Autowired
  93. private SasPatrolEventCodeMapper patrolEventCodeMapper;
  94. @Autowired
  95. private SasAcquisitionEventCodeMapper acquisitionEventCodeMapper;
  96. @Autowired
  97. private SasPerceptionEventCodeMapper perceptionEventCodeMapper;
  98. @Autowired
  99. private SasCollectionEventCodeMapper collectionEventCodeMapper;
  100. @PostConstruct
  101. public void init() {
  102. SasConfig config = sasConfigService.getConfig();
  103. if (config == null || config.getHost() == null || config.getHost().isEmpty()) {
  104. log.warn("sas_config 中无 MQTT 配置或 host 为空,跳过 MQTT 连接");
  105. isListening = false;
  106. return;
  107. }
  108. try {
  109. String host = config.getHost();
  110. int port = parsePort(config.getPort(), 1883);
  111. String username = config.getUsername() != null ? config.getUsername() : "";
  112. String password = config.getPassword() != null ? config.getPassword() : "";
  113. String brokerUrl = "tcp://" + host + ":" + port;
  114. client = new MqttClient(brokerUrl, clientId);
  115. MqttConnectOptions options = new MqttConnectOptions();
  116. options.setCleanSession(true);
  117. if (!username.isEmpty()) {
  118. options.setUserName(username);
  119. options.setPassword(password.toCharArray());
  120. }
  121. options.setConnectionTimeout(60);
  122. options.setKeepAliveInterval(60);
  123. options.setAutomaticReconnect(true);
  124. client.setCallback(new MqttCallbackExtended() {
  125. @Override
  126. public void connectComplete(boolean reconnect, String serverURI) {
  127. log.info("MQTT连接成功, reconnect={}", reconnect);
  128. parseTopics().forEach(MqttService.this::subscribe);
  129. isListening = true;
  130. }
  131. @Override
  132. public void connectionLost(Throwable cause) {
  133. log.error("MQTT连接丢失: {}", cause.getMessage());
  134. isListening = false;
  135. scheduleReconnect();
  136. }
  137. @Override
  138. public void messageArrived(String topic, MqttMessage message) throws Exception {
  139. if (isListening) {
  140. processMessage(topic, message);
  141. }
  142. }
  143. @Override
  144. public void deliveryComplete(IMqttDeliveryToken token) {
  145. }
  146. });
  147. client.connect(options);
  148. log.info("MQTT客户端初始化完成, clientId={}", clientId);
  149. isListening = true;
  150. } catch (MqttException e) {
  151. log.error("MQTT初始化失败: {}", e.getMessage(), e);
  152. isListening = false;
  153. }
  154. }
  155. private static int parsePort(String portStr, int defaultPort) {
  156. if (portStr == null || portStr.isEmpty()) {
  157. return defaultPort;
  158. }
  159. try {
  160. return Integer.parseInt(portStr.trim());
  161. } catch (NumberFormatException e) {
  162. return defaultPort;
  163. }
  164. }
  165. private void scheduleReconnect() {
  166. Thread t = new Thread(() -> {
  167. try {
  168. Thread.sleep(5000L);
  169. if (client != null && !client.isConnected()) {
  170. log.info("尝试主动重连MQTT...");
  171. client.reconnect();
  172. }
  173. } catch (Exception e) {
  174. log.error("MQTT主动重连失败", e);
  175. }
  176. }, "mqtt-reconnect");
  177. t.setDaemon(true);
  178. t.start();
  179. }
  180. private List<String> parseTopics() {
  181. return Arrays.asList(topics.split(","));
  182. }
  183. private void processMessage(String topic, MqttMessage message) {
  184. String payload = new String(message.getPayload());
  185. log.debug("收到MQTT消息, topic={}, payload={}", topic, payload);
  186. try {
  187. String[] parts = topic.split("/");
  188. if (parts.length < 2) {
  189. log.warn("未知的topic格式: {}", topic);
  190. return;
  191. }
  192. String eventType = null;
  193. for (int i = 0; i < parts.length - 1; i++) {
  194. if ("event".equals(parts[i]) && i + 1 < parts.length) {
  195. eventType = parts[i + 1];
  196. break;
  197. }
  198. }
  199. if (eventType == null) {
  200. log.warn("无法从topic解析事件类型: {}", topic);
  201. return;
  202. }
  203. switch (eventType) {
  204. case "usbalarm":
  205. handleUsbEvent(payload, topic);
  206. break;
  207. case "snap":
  208. handleSnapEvent(payload, topic);
  209. break;
  210. case "entrance":
  211. handleEntranceEvent(payload, topic);
  212. break;
  213. case "parking":
  214. handleParkingEvent(payload, topic);
  215. break;
  216. case "roadblock":
  217. handleRoadblockEvent(payload, topic);
  218. break;
  219. case "alarm":
  220. handleAlarsasEvent(payload, topic);
  221. break;
  222. case "patrol":
  223. handlePatrolEvent(payload, topic);
  224. break;
  225. case "acquisition":
  226. handleAcquisitionEvent(payload, topic);
  227. break;
  228. case "perception":
  229. handlePerceptionEvent(payload, topic);
  230. break;
  231. case "collection":
  232. handleCollectionEvent(payload, topic);
  233. break;
  234. default:
  235. log.warn("未知的事件类型: {}, topic={}", eventType, topic);
  236. }
  237. } catch (Exception e) {
  238. log.error("处理MQTT消息失败, topic={}", topic, e);
  239. }
  240. }
  241. /**
  242. * 【已修改】只根据 deviceId + deviceTypeCode 查询,不判断 channel
  243. */
  244. private SasDevice findDevice(String deviceId, Integer deviceTypeCode) {
  245. return deviceService.getOne(new LambdaQueryWrapper<SasDevice>()
  246. .eq(SasDevice::getDeviceId, deviceId)
  247. .eq(SasDevice::getDeviceType, deviceTypeCode)
  248. .last("limit 1"));
  249. }
  250. /**
  251. * 兼容旧调用(自动忽略 channel)
  252. */
  253. private SasDevice findDevice(String deviceId, Integer channel, Integer deviceTypeCode) {
  254. return findDevice(deviceId, deviceTypeCode);
  255. }
  256. /**
  257. * 查询事件类型分组信息
  258. */
  259. private SasEventTypeGroup findEventTypeGroup(Integer deviceTypeCode) {
  260. return eventTypeGroupService.getOne(new LambdaQueryWrapper<SasEventTypeGroup>()
  261. .eq(SasEventTypeGroup::getDeviceType, deviceTypeCode)
  262. .last("limit 1"));
  263. }
  264. @Transactional(rollbackFor = Exception.class)
  265. public void handleUsbEvent(String payload, String topic) {
  266. try {
  267. JsonNode root = mapper.readTree(payload);
  268. String method = root.path("method").asText();
  269. if ("event".equals(method)) {
  270. UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class);
  271. if(message.getEventCode() == null && root.get("code") != null) {
  272. message.setEventCode(root.get("code").asInt());
  273. }
  274. SasDevice device = findDevice(message.getDeviceId(), SystemTypeCodeEnum.usb.getCode());
  275. if (device == null) {
  276. log.warn("设备不存在,丢弃USB事件: deviceId={}", message.getDeviceId());
  277. return;
  278. }
  279. SasUsbEvent event = new SasUsbEvent();
  280. String eventId = message.getId() != null && !message.getId().isEmpty()
  281. ? message.getId()
  282. : IdUtil.randomUUID();
  283. if (usbEventService.getById(eventId) != null) {
  284. log.info("USB事件已存在, 跳过保存, eventId={}", eventId);
  285. return;
  286. }
  287. event.setEventId(eventId);
  288. BeanUtil.copyProperties(message, event, "eventId");
  289. event.setCardId(message.getIcId());
  290. event.setTriggerTime(parseTime(message.getTriggerTime()));
  291. event.setCreateTime(LocalDateTime.now());
  292. event.setUpdateTime(LocalDateTime.now());
  293. if (message.getEventPic() != null) {
  294. UsbEventMessage.PicInfo picInfo = message.getEventPic();
  295. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  296. SasPic pic = new SasPic();
  297. pic.setId(IdUtil.randomUUID());
  298. pic.setUrl(picInfo.getUrl());
  299. pic.setPath(picInfo.getPath());
  300. pic.setCreateTime(LocalDateTime.now());
  301. pic.setUpdateTime(LocalDateTime.now());
  302. sasPicMapper.insert(pic);
  303. event.setPicId(pic.getId());
  304. }
  305. }
  306. usbEventService.save(event);
  307. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  308. info.setTriggerTime(message.getTriggerTime());
  309. info.setCreateTime(LocalDateTime.now().format(formatter));
  310. if (event.getPicId() != null) {
  311. SasPic pic = sasPicMapper.selectById(event.getPicId());
  312. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  313. info.setEventUrl(pic.getUrl() + pic.getPath());
  314. }
  315. }
  316. SasUsbEventCode code = usbEventCodeMapper.selectById(event.getEventCode());
  317. if (code != null) {
  318. info.setEventTypeName(code.getName());
  319. }
  320. enrichDeviceInfo(info, device);
  321. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.usb.getCode());
  322. if (eventTypeGroup != null) {
  323. info.setEventLevelId(eventTypeGroup.getEventLevel());
  324. }
  325. info.setDeviceType(SystemTypeCodeEnum.usb.getCode());
  326. SasWebSocket.sendAll(info);
  327. log.info("USB事件保存并推送成功, eventId={}", event.getEventId());
  328. } else if ("heart".equals(method)) {
  329. handleHeartbeat(payload, SystemTypeCodeEnum.usb.getCode());
  330. }
  331. } catch (Exception e) {
  332. log.error("处理USB事件失败", e);
  333. }
  334. }
  335. @Transactional(rollbackFor = Exception.class)
  336. public void handleSnapEvent(String payload, String topic) {
  337. try {
  338. JsonNode root = mapper.readTree(payload);
  339. String method = root.path("method").asText();
  340. if ("event".equals(method)) {
  341. SnapEventResult eventResult = (SnapEventResult)this.mapper.readValue(payload, SnapEventResult.class);
  342. log.info("实时智能分析");
  343. Map<String, Object> map = new HashMap();
  344. map.put("eventId", eventResult.getId());
  345. JsonRpcRequest getEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  346. Map<String, Object> getEventInfo = new HashMap();
  347. getEventInfo.put("key", this.sasConfigService.getConfig().getKeyds());
  348. getEventInfo.put("json", getEventInfoJson.toString());
  349. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getEventInfo));
  350. String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/snap", getEventInfo);
  351. log.info("请求AG响应:{}", resultEvent);
  352. SnapEventInfoVo eventInfoVo = (SnapEventInfoVo)this.mapper.readValue(resultEvent, SnapEventInfoVo.class);
  353. SnapEventInfoResult result = eventInfoVo.getResult();
  354. SnapEventInfo eventInfo = result.getEventInfo();
  355. SasSnapEvent bean = (SasSnapEvent)BeanUtil.toBean(eventInfo, SasSnapEvent.class);
  356. String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty()
  357. ? eventResult.getId()
  358. : IdUtil.randomUUID();
  359. if (snapEventService.getById(eventId) != null) {
  360. log.info("抓拍事件已存在, 跳过保存, eventId={}", eventId);
  361. return;
  362. }
  363. bean.setEventId(eventId);
  364. BrieflyEventInfo info = (BrieflyEventInfo)BeanUtil.toBeanIgnoreError(bean, BrieflyEventInfo.class);
  365. if (eventInfo.getEventPic() != null) {
  366. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  367. pic.setId(IdUtil.randomUUID());
  368. this.sasPicMapper.insert(pic);
  369. bean.setEventPicId(pic.getId());
  370. info.setEventUrl(pic.getUrl() + pic.getPath());
  371. }
  372. if (eventInfo.getScenePic() != null) {
  373. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getScenePic(), SasPic.class);
  374. pic.setId(IdUtil.randomUUID());
  375. this.sasPicMapper.insert(pic);
  376. bean.setScenePicId(pic.getId());
  377. info.setSceneUrl(pic.getUrl() + pic.getPath());
  378. }
  379. if(eventInfo.getEventCode() == null && root.get("code") != null) {
  380. eventInfo.setEventCode(root.get("code").asInt());
  381. }
  382. SasDevice device = findDevice(eventInfo.getDeviceId(), SystemTypeCodeEnum.snap.getCode());
  383. if (device == null) {
  384. log.warn("设备不存在,丢弃抓拍事件: deviceId={}", eventInfo.getDeviceId());
  385. return;
  386. }
  387. BeanUtil.copyProperties(eventInfo, bean, "eventId");
  388. bean.setTriggerTime(parseTime(eventInfo.getTriggerTime()));
  389. bean.setCreateTime(LocalDateTime.now());
  390. bean.setUpdateTime(LocalDateTime.now());
  391. snapEventService.save(bean);
  392. info.setTriggerTime(eventInfo.getTriggerTime());
  393. info.setCreateTime(LocalDateTime.now().format(formatter));
  394. SasSnapTypeCode code = snapTypeCodeMapper.selectById(bean.getEventCode());
  395. if (code != null) {
  396. info.setEventTypeName(code.getName());
  397. }
  398. enrichDeviceInfo(info, device);
  399. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.snap.getCode());
  400. if (eventTypeGroup != null) {
  401. info.setEventLevelId(eventTypeGroup.getEventLevel());
  402. }
  403. info.setDeviceType(SystemTypeCodeEnum.snap.getCode());
  404. SasWebSocket.sendAll(info);
  405. log.info("抓拍事件保存并推送成功, eventId={}", bean.getEventId());
  406. } else if ("heart".equals(method)) {
  407. handleHeartbeat(payload, SystemTypeCodeEnum.snap.getCode());
  408. }
  409. } catch (Exception e) {
  410. log.error("处理抓拍事件失败", e);
  411. }
  412. }
  413. @Transactional(rollbackFor = Exception.class)
  414. public void handleEntranceEvent(String payload, String topic) {
  415. try {
  416. JsonNode root = mapper.readTree(payload);
  417. String method = root.path("method").asText();
  418. if ("event".equals(method)) {
  419. EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class);
  420. if(message.getEventCode() == null && root.get("code") != null) {
  421. message.setEventCode(root.get("code").asInt());
  422. }
  423. SasDevice device = findDevice(message.getDeviceId(), SystemTypeCodeEnum.entrance.getCode());
  424. if (device == null) {
  425. log.warn("设备不存在,丢弃门禁事件: deviceId={}", message.getDeviceId());
  426. return;
  427. }
  428. SasEntranceEvent event = new SasEntranceEvent();
  429. String eventId = message.getId() != null && !message.getId().isEmpty()
  430. ? message.getId()
  431. : IdUtil.randomUUID();
  432. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  433. if (entranceEventService.getById(eventId) != null) {
  434. log.info("门禁事件已存在, 跳过保存, eventId={}", eventId);
  435. return;
  436. }
  437. event.setEventId(eventId);
  438. BeanUtil.copyProperties(message, event, "eventId");
  439. event.setCardId(message.getIcId());
  440. event.setTriggerTime(parseTime(message.getTriggerTime()));
  441. event.setCreateTime(LocalDateTime.now());
  442. event.setUpdateTime(LocalDateTime.now());
  443. // 处理图片信息
  444. if (message.getEntrancePic() != null) {
  445. EntranceEventMessage.PicInfo picInfo = message.getEntrancePic();
  446. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  447. SasPic pic = new SasPic();
  448. pic.setId(IdUtil.randomUUID());
  449. pic.setUrl(picInfo.getUrl());
  450. pic.setPath(picInfo.getPath());
  451. pic.setCreateTime(LocalDateTime.now());
  452. pic.setUpdateTime(LocalDateTime.now());
  453. sasPicMapper.insert(pic);
  454. // 设置事件图片ID
  455. event.setPicId(pic.getId());
  456. }
  457. }
  458. entranceEventService.save(event);
  459. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  460. info.setTriggerTime(message.getTriggerTime());
  461. info.setCreateTime(LocalDateTime.now().format(formatter));
  462. // 设置图片URL(如果有)
  463. if (event.getPicId() != null) {
  464. SasPic pic = sasPicMapper.selectById(event.getPicId());
  465. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  466. info.setEventUrl(pic.getUrl() + pic.getPath());
  467. }
  468. }
  469. SasEntranceEventCode code = entranceEventCodeMapper.selectById(event.getEventCode());
  470. if (code != null) {
  471. info.setEventTypeName(code.getName());
  472. }
  473. enrichDeviceInfo(info, device);
  474. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.entrance.getCode());
  475. if (eventTypeGroup != null) {
  476. info.setEventLevelId(eventTypeGroup.getEventLevel());
  477. }
  478. info.setDeviceType(SystemTypeCodeEnum.entrance.getCode());
  479. SasWebSocket.sendAll(info);
  480. log.info("门禁事件保存并推送成功, eventId={}", event.getEventId());
  481. } else if ("heart".equals(method)) {
  482. handleHeartbeat(payload, SystemTypeCodeEnum.entrance.getCode());
  483. }
  484. } catch (Exception e) {
  485. log.error("处理门禁事件失败", e);
  486. }
  487. }
  488. @Transactional(rollbackFor = Exception.class)
  489. public void handleParkingEvent(String payload, String topic) {
  490. try {
  491. JsonNode root = mapper.readTree(payload);
  492. String method = root.path("method").asText();
  493. if ("event".equals(method)) {
  494. ParkingEventResult eventResult = this.mapper.readValue(payload, ParkingEventResult.class);
  495. SasConfig config = sasConfigService.getConfig();
  496. if (config == null || StrUtil.isBlank(config.getHost()) || StrUtil.isBlank(config.getKeyds())) {
  497. log.error("停车场事件处理失败:系统配置未初始化或host/keyds为空");
  498. return;
  499. }
  500. Map<String, Object> map = new HashMap<>();
  501. map.put("eventId", eventResult.getId());
  502. JsonRpcRequest getParkingEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  503. Map<String, Object> getParkingEventInfo = new HashMap<>();
  504. getParkingEventInfo.put("key", config.getKeyds());
  505. getParkingEventInfo.put("json", getParkingEventInfoJson.toString());
  506. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getParkingEventInfo));
  507. String resultEvent = HttpUtil.post(config.getHost() + "/agbox/device/parking", getParkingEventInfo);
  508. log.info("请求AG响应:{}", resultEvent);
  509. if (StrUtil.isBlank(resultEvent)) {
  510. log.error("停车场事件:AG接口返回空字符串,终止处理");
  511. return;
  512. }
  513. ParkingEventInfoVo parkingEventInfoVo = this.mapper.readValue(resultEvent, ParkingEventInfoVo.class);
  514. if (parkingEventInfoVo == null) {
  515. log.error("停车场事件:解析返回结果为null,响应:{}", resultEvent);
  516. return;
  517. }
  518. ParkingEventInfoResult result = parkingEventInfoVo.getResult();
  519. if (result == null) {
  520. log.error("停车场事件:返回结果result为null,响应:{}", resultEvent);
  521. return;
  522. }
  523. ParkingEventInfo eventInfo = result.getEventInfo();
  524. if (eventInfo == null) {
  525. log.error("停车场事件:eventInfo为null,响应:{}", resultEvent);
  526. return;
  527. }
  528. SasParkingEvent bean = BeanUtil.toBean(eventInfo, SasParkingEvent.class);
  529. bean.setCreateTime(LocalDateTime.parse(eventInfo.getReceivingTime(), this.formatter));
  530. bean.setTriggerTime(LocalDateTime.parse(eventInfo.getTriggerTime(), this.formatter));
  531. bean.setEventId(eventResult.getId());
  532. bean.setAccessType(eventInfo.getIo());
  533. bean.setCarType(eventInfo.getCarCode());
  534. bean.setPlateType(eventInfo.getPlateCode());
  535. if (eventInfo.getEventPic() != null) {
  536. SasPic pic = BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  537. pic.setId(IdUtil.randomUUID());
  538. this.sasPicMapper.insert(pic);
  539. bean.setEventPicId(pic.getId());
  540. }
  541. if (eventInfo.getPlacePic() != null) {
  542. SasPic pic = BeanUtil.toBean(eventInfo.getPlacePic(), SasPic.class);
  543. pic.setId(IdUtil.randomUUID());
  544. this.sasPicMapper.insert(pic);
  545. bean.setPlatePicId(pic.getId());
  546. }
  547. if (eventInfo.getEventCode() == null && root.get("code") != null) {
  548. eventInfo.setEventCode(root.get("code").asInt());
  549. }
  550. SasDevice device = findDevice(eventInfo.getDeviceId(), SystemTypeCodeEnum.parking.getCode());
  551. if (device == null) {
  552. log.warn("设备不存在,丢弃停车场事件: deviceId={}", eventInfo.getDeviceId());
  553. return;
  554. }
  555. SasParkingEvent event = new SasParkingEvent();
  556. String eventId = StrUtil.isNotBlank(eventResult.getId()) ? eventResult.getId() : IdUtil.randomUUID();
  557. if (parkingEventService.getById(eventId) != null) {
  558. log.info("停车场事件已存在, 跳过保存, eventId={}", eventId);
  559. return;
  560. }
  561. event.setEventId(eventId);
  562. BeanUtil.copyProperties(eventInfo, event, "eventId");
  563. event.setTriggerTime(parseTime(eventInfo.getTriggerTime()));
  564. event.setCreateTime(LocalDateTime.now());
  565. event.setUpdateTime(LocalDateTime.now());
  566. parkingEventService.save(event);
  567. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  568. info.setTriggerTime(eventInfo.getTriggerTime());
  569. info.setCreateTime(LocalDateTime.now().format(formatter));
  570. if (eventInfo.getEventPic() != null) {
  571. info.setEventUrl(eventInfo.getEventPic().getUrl() + eventInfo.getEventPic().getPath());
  572. }
  573. if (eventInfo.getPlacePic() != null) {
  574. info.setEventUrl(eventInfo.getPlacePic().getUrl() + eventInfo.getPlacePic().getPath());
  575. }
  576. SasParkingEventCode code = parkingEventCodeMapper.selectById(event.getEventCode());
  577. if (code != null) {
  578. info.setEventTypeName(code.getName());
  579. }
  580. enrichDeviceInfo(info, device);
  581. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.parking.getCode());
  582. if (eventTypeGroup != null) {
  583. info.setEventLevelId(eventTypeGroup.getEventLevel());
  584. }
  585. info.setDeviceType(SystemTypeCodeEnum.parking.getCode());
  586. SasWebSocket.sendAll(info);
  587. log.info("停车场事件保存并推送成功, eventId={}", event.getEventId());
  588. } else if ("heart".equals(method)) {
  589. handleHeartbeat(payload, SystemTypeCodeEnum.parking.getCode());
  590. }
  591. } catch (Exception e) {
  592. log.error("处理停车场事件失败", e);
  593. }
  594. }
  595. @Transactional(rollbackFor = Exception.class)
  596. public void handleRoadblockEvent(String payload, String topic) {
  597. try {
  598. JsonNode root = mapper.readTree(payload);
  599. String method = root.path("method").asText();
  600. if ("event".equals(method)) {
  601. RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class);
  602. if(message.getEventCode() == null && root.get("code") != null) {
  603. message.setEventCode(root.get("code").asInt());
  604. }
  605. SasDevice device = findDevice(message.getDeviceId(), SystemTypeCodeEnum.roadblock.getCode());
  606. if (device == null) {
  607. log.warn("设备不存在,丢弃阻车路障事件: deviceId={}", message.getDeviceId());
  608. return;
  609. }
  610. SasRoadblockEvent event = new SasRoadblockEvent();
  611. String eventId = message.getId() != null && !message.getId().isEmpty()
  612. ? message.getId()
  613. : IdUtil.randomUUID();
  614. if (roadblockEventService.getById(eventId) != null) {
  615. log.info("阻车路障事件已存在, 跳过保存, eventId={}", eventId);
  616. return;
  617. }
  618. event.setEventId(eventId);
  619. BeanUtil.copyProperties(message, event, "eventId");
  620. event.setTriggerTime(parseTime(message.getTriggerTime()));
  621. event.setCreateTime(LocalDateTime.now());
  622. event.setUpdateTime(LocalDateTime.now());
  623. if (message.getRoadblockPic() != null) {
  624. RoadblockEventMessage.PicInfo picInfo = message.getRoadblockPic();
  625. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  626. SasPic pic = new SasPic();
  627. pic.setId(IdUtil.randomUUID());
  628. pic.setUrl(picInfo.getUrl());
  629. pic.setPath(picInfo.getPath());
  630. pic.setCreateTime(LocalDateTime.now());
  631. pic.setUpdateTime(LocalDateTime.now());
  632. sasPicMapper.insert(pic);
  633. event.setPicId(pic.getId());
  634. }
  635. }
  636. roadblockEventService.save(event);
  637. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  638. info.setTriggerTime(message.getTriggerTime());
  639. info.setCreateTime(LocalDateTime.now().format(formatter));
  640. if (event.getPicId() != null) {
  641. SasPic pic = sasPicMapper.selectById(event.getPicId());
  642. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  643. info.setEventUrl(pic.getUrl() + pic.getPath());
  644. }
  645. }
  646. SasRoadblockEventCode code = roadblockEventCodeMapper.selectById(event.getEventCode());
  647. if (code != null) {
  648. info.setEventTypeName(code.getName());
  649. }
  650. enrichDeviceInfo(info, device);
  651. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.roadblock.getCode());
  652. if (eventTypeGroup != null) {
  653. info.setEventLevelId(eventTypeGroup.getEventLevel());
  654. }
  655. info.setDeviceType(SystemTypeCodeEnum.roadblock.getCode());
  656. SasWebSocket.sendAll(info);
  657. log.info("阻车路障事件保存并推送成功, eventId={}", event.getEventId());
  658. } else if ("heart".equals(method)) {
  659. handleHeartbeat(payload, SystemTypeCodeEnum.roadblock.getCode());
  660. }
  661. } catch (Exception e) {
  662. log.error("处理阻车路障事件失败", e);
  663. }
  664. }
  665. @Transactional(rollbackFor = Exception.class)
  666. public void handleAlarsasEvent(String payload, String topic) {
  667. try {
  668. JsonNode root = mapper.readTree(payload);
  669. String method = root.path("method").asText();
  670. // 【核心修复】匹配真实设备的 method: "event"
  671. if ("event".equals(method)) {
  672. // 直接从根节点读取所有字段,完全适配你真实的消息格式
  673. String deviceId = root.path("deviceId").asText();
  674. Integer channel = root.path("channel").asInt();
  675. Integer eventCode = root.path("code").asInt(); // 真实消息里是 code 字段!
  676. String id = root.path("id").asText();
  677. String triggerTime = root.path("triggerTime").asText();
  678. String note = root.path("note").asText();
  679. Integer eventSystem = root.path("eventSystem").asInt();
  680. // 打印日志,确认字段都读到了
  681. log.info("【入侵告警】收到消息:deviceId={}, channel={}, eventCode={}, id={}",
  682. deviceId, channel, eventCode, id);
  683. // 设备校验
  684. SasDevice device = findDevice(deviceId, SystemTypeCodeEnum.alarm.getCode());
  685. if (device == null) {
  686. log.warn("【入侵告警】设备不存在,丢弃消息:deviceId={}", deviceId);
  687. return;
  688. }
  689. // 构建事件实体
  690. SasAlarsasEvent event = new SasAlarsasEvent();
  691. String eventId = StrUtil.isNotBlank(id) ? id : IdUtil.randomUUID();
  692. // 防重复
  693. if (alarsasEventService.getById(eventId) != null) {
  694. log.info("【入侵告警】事件已存在,跳过保存:eventId={}", eventId);
  695. return;
  696. }
  697. // 手动赋值,100% 不会丢字段
  698. event.setEventId(eventId);
  699. event.setDeviceId(deviceId);
  700. event.setChannel(channel);
  701. event.setEventCode(eventCode);
  702. event.setNote(note);
  703. event.setEventSystem(eventSystem);
  704. event.setTriggerTime(parseTime(triggerTime));
  705. event.setCreateTime(LocalDateTime.now());
  706. event.setUpdateTime(LocalDateTime.now());
  707. // 保存到数据库
  708. alarsasEventService.save(event);
  709. log.info("【入侵告警】保存成功!eventId={}, deviceId={}, eventCode={}",
  710. eventId, deviceId, eventCode);
  711. // 后续的 WebSocket 推送逻辑(保持不变)
  712. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  713. info.setTriggerTime(triggerTime);
  714. info.setCreateTime(LocalDateTime.now().format(formatter));
  715. SasAlarsasEventCode code = alarsasEventCodeMapper.selectById(eventCode);
  716. if (code != null) {
  717. info.setEventTypeName(code.getName());
  718. }
  719. enrichDeviceInfo(info, device);
  720. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.alarm.getCode());
  721. if (eventTypeGroup != null) {
  722. info.setEventLevelId(eventTypeGroup.getEventLevel());
  723. }
  724. info.setDeviceType(SystemTypeCodeEnum.alarm.getCode());
  725. SasWebSocket.sendAll(info);
  726. } else if ("heart".equals(method)) {
  727. handleHeartbeat(payload, SystemTypeCodeEnum.alarm.getCode());
  728. }
  729. } catch (Exception e) {
  730. log.error("【入侵告警】处理失败,payload={}", payload, e);
  731. }
  732. }
  733. @Transactional(rollbackFor = Exception.class)
  734. public void handlePatrolEvent(String payload, String topic) {
  735. try {
  736. JsonNode root = mapper.readTree(payload);
  737. String method = root.path("method").asText();
  738. if ("event".equals(method)) {
  739. PatrolEventResult eventResult = (PatrolEventResult)this.mapper.readValue(payload, PatrolEventResult.class);
  740. Map<String, Object> map = new HashMap();
  741. map.put("eventId", eventResult.getId());
  742. JsonRpcRequest getPatrolEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  743. Map<String, Object> getPatrolEventInfo = new HashMap();
  744. getPatrolEventInfo.put("key", this.sasConfigService.getConfig().getKeyds());
  745. getPatrolEventInfo.put("json", getPatrolEventInfoJson.toString());
  746. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getPatrolEventInfo));
  747. String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/patrol", getPatrolEventInfo);
  748. log.info("请求AG响应:{}", resultEvent);
  749. PatrolEventInfoVo patrolEventInfoVo = (PatrolEventInfoVo)this.mapper.readValue(resultEvent, PatrolEventInfoVo.class);
  750. PatrolEventInfoResult result = patrolEventInfoVo.getResult();
  751. PatrolEventInfo eventInfo = result.getEventInfo();
  752. SasDevice device = findDevice(eventInfo.getDeviceId(), SystemTypeCodeEnum.patrol.getCode());
  753. if (device == null) {
  754. log.warn("设备不存在,丢弃巡检事件: deviceId={}", eventInfo.getDeviceId());
  755. return;
  756. }
  757. SasPatrolEvent event = new SasPatrolEvent();
  758. String eventId = patrolEventInfoVo.getId() != null && !patrolEventInfoVo.getId().isEmpty()
  759. ? patrolEventInfoVo.getId()
  760. : IdUtil.randomUUID();
  761. if (patrolEventService.getById(eventId) != null) {
  762. log.info("巡检事件已存在, 跳过保存, eventId={}", eventId);
  763. return;
  764. }
  765. event.setEventId(eventId);
  766. BeanUtil.copyProperties(eventInfo, event, "eventId");
  767. String triggerTimeStr = eventInfo.getTriggerTime().replaceAll("\\s", " ");
  768. event.setTriggerTime(LocalDateTime.parse(triggerTimeStr, formatter));
  769. event.setCreateTime(LocalDateTime.now());
  770. event.setUpdateTime(LocalDateTime.now());
  771. if (eventInfo.getEventPic() != null) {
  772. SasPic pic = (SasPic) BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  773. pic.setId(IdUtil.randomUUID());
  774. this.sasPicMapper.insert(pic);
  775. event.setPicId(pic.getId());
  776. }
  777. if (eventInfo.getUserParams() != null) {
  778. SasPatrolUserParam userParam = (SasPatrolUserParam)BeanUtil.toBean(eventInfo.getUserParams(), SasPatrolUserParam.class);
  779. userParam.setId(IdUtil.randomUUID());
  780. LocalDateTime now = LocalDateTime.now();
  781. userParam.setCreateTime(now);
  782. userParam.setUpdateTime(now);
  783. this.sasPatrolUserParamService.save(userParam);
  784. event.setUserParasasId(userParam.getId());
  785. }
  786. patrolEventService.save(event);
  787. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  788. info.setTriggerTime(eventInfo.getTriggerTime());
  789. info.setCreateTime(LocalDateTime.now().format(formatter));
  790. if (event.getPicId() != null) {
  791. SasPic pic = sasPicMapper.selectById(event.getPicId());
  792. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  793. info.setEventUrl(pic.getUrl() + pic.getPath());
  794. }
  795. }
  796. SasPatrolEventCode code = patrolEventCodeMapper.selectById(event.getEventCode());
  797. if (code != null) {
  798. info.setEventTypeName(code.getName());
  799. }
  800. enrichDeviceInfo(info, device);
  801. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.patrol.getCode());
  802. if (eventTypeGroup != null) {
  803. info.setEventLevelId(eventTypeGroup.getEventLevel());
  804. }
  805. info.setDeviceType(SystemTypeCodeEnum.patrol.getCode());
  806. SasWebSocket.sendAll(info);
  807. log.info("巡检事件保存并推送成功, eventId={}", event.getEventId());
  808. } else if ("heart".equals(method)) {
  809. handleHeartbeat(payload, SystemTypeCodeEnum.patrol.getCode());
  810. }
  811. } catch (Exception e) {
  812. log.error("处理巡检事件失败", e);
  813. }
  814. }
  815. @Transactional(rollbackFor = Exception.class)
  816. public void handleAcquisitionEvent(String payload, String topic) {
  817. try {
  818. JsonNode root = mapper.readTree(payload);
  819. String method = root.path("method").asText();
  820. if ("event".equals(method)) {
  821. AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class);
  822. if(message.getEventCode() == null && root.get("code") != null) {
  823. message.setEventCode(root.get("code").asInt());
  824. }
  825. SasDevice device = findDevice(message.getDeviceId(), SystemTypeCodeEnum.acquisition.getCode());
  826. if (device == null) {
  827. log.warn("设备不存在,丢弃数据采集事件: deviceId={}", message.getDeviceId());
  828. return;
  829. }
  830. SasAcquisitionEvent event = new SasAcquisitionEvent();
  831. String eventId = message.getId() != null && !message.getId().isEmpty()
  832. ? message.getId()
  833. : IdUtil.randomUUID();
  834. if (acquisitionEventService.getById(eventId) != null) {
  835. log.info("数据采集事件已存在, 跳过保存, eventId={}", eventId);
  836. return;
  837. }
  838. event.setEventId(eventId);
  839. BeanUtil.copyProperties(message, event, "eventId");
  840. event.setTriggerTime(parseTime(message.getTriggerTime()));
  841. event.setCreateTime(LocalDateTime.now());
  842. event.setUpdateTime(LocalDateTime.now());
  843. if (message.getAcquisitionPic() != null) {
  844. AcquisitionEventMessage.PicInfo picInfo = message.getAcquisitionPic();
  845. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  846. SasPic pic = new SasPic();
  847. pic.setId(IdUtil.randomUUID());
  848. pic.setUrl(picInfo.getUrl());
  849. pic.setPath(picInfo.getPath());
  850. pic.setCreateTime(LocalDateTime.now());
  851. pic.setUpdateTime(LocalDateTime.now());
  852. sasPicMapper.insert(pic);
  853. event.setPicId(pic.getId());
  854. }
  855. }
  856. acquisitionEventService.save(event);
  857. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  858. info.setTriggerTime(message.getTriggerTime());
  859. info.setCreateTime(LocalDateTime.now().format(formatter));
  860. if (event.getPicId() != null) {
  861. SasPic pic = sasPicMapper.selectById(event.getPicId());
  862. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  863. info.setEventUrl(pic.getUrl() + pic.getPath());
  864. }
  865. }
  866. SasAcquisitionEventCode code = acquisitionEventCodeMapper.selectById(event.getEventCode());
  867. if (code != null) {
  868. info.setEventTypeName(code.getName());
  869. }
  870. enrichDeviceInfo(info, device);
  871. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.acquisition.getCode());
  872. if (eventTypeGroup != null) {
  873. info.setEventLevelId(eventTypeGroup.getEventLevel());
  874. }
  875. info.setDeviceType(SystemTypeCodeEnum.acquisition.getCode());
  876. SasWebSocket.sendAll(info);
  877. log.info("数据采集事件保存并推送成功, eventId={}", event.getEventId());
  878. } else if ("heart".equals(method)) {
  879. handleHeartbeat(payload, SystemTypeCodeEnum.acquisition.getCode());
  880. }
  881. } catch (Exception e) {
  882. log.error("处理数据采集事件失败", e);
  883. }
  884. }
  885. @Transactional(rollbackFor = Exception.class)
  886. public void handlePerceptionEvent(String payload, String topic) {
  887. try {
  888. JsonNode root = mapper.readTree(payload);
  889. String method = root.path("method").asText();
  890. if ("event".equals(method)) {
  891. PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class);
  892. if(message.getEventCode() == null && root.get("code") != null) {
  893. message.setEventCode(root.get("code").asInt());
  894. }
  895. SasDevice device = findDevice(message.getDeviceId(), SystemTypeCodeEnum.perception.getCode());
  896. if (device == null) {
  897. log.warn("设备不存在,丢弃状态感知事件: deviceId={}", message.getDeviceId());
  898. return;
  899. }
  900. SasPerceptionEvent event = new SasPerceptionEvent();
  901. String eventId = message.getId() != null && !message.getId().isEmpty()
  902. ? message.getId()
  903. : IdUtil.randomUUID();
  904. if (perceptionEventService.getById(eventId) != null) {
  905. log.info("状态感知事件已存在, 跳过保存, eventId={}", eventId);
  906. return;
  907. }
  908. event.setEventId(eventId);
  909. BeanUtil.copyProperties(message, event, "eventId");
  910. event.setTriggerTime(parseTime(message.getTriggerTime()));
  911. event.setCreateTime(LocalDateTime.now());
  912. event.setUpdateTime(LocalDateTime.now());
  913. if (message.getPerceptionPic() != null) {
  914. PerceptionEventMessage.PicInfo picInfo = message.getPerceptionPic();
  915. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  916. SasPic pic = new SasPic();
  917. pic.setId(IdUtil.randomUUID());
  918. pic.setUrl(picInfo.getUrl());
  919. pic.setPath(picInfo.getPath());
  920. pic.setCreateTime(LocalDateTime.now());
  921. pic.setUpdateTime(LocalDateTime.now());
  922. sasPicMapper.insert(pic);
  923. event.setPicId(pic.getId());
  924. }
  925. }
  926. perceptionEventService.save(event);
  927. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  928. info.setTriggerTime(message.getTriggerTime());
  929. info.setCreateTime(LocalDateTime.now().format(formatter));
  930. if (event.getPicId() != null) {
  931. SasPic pic = sasPicMapper.selectById(event.getPicId());
  932. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  933. info.setEventUrl(pic.getUrl() + pic.getPath());
  934. }
  935. }
  936. SasPerceptionEventCode code = perceptionEventCodeMapper.selectById(event.getEventCode());
  937. if (code != null) {
  938. info.setEventTypeName(code.getName());
  939. }
  940. enrichDeviceInfo(info, device);
  941. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.perception.getCode());
  942. if (eventTypeGroup != null) {
  943. info.setEventLevelId(eventTypeGroup.getEventLevel());
  944. }
  945. info.setDeviceType(SystemTypeCodeEnum.perception.getCode());
  946. SasWebSocket.sendAll(info);
  947. log.info("状态感知事件保存并推送成功, eventId={}", event.getEventId());
  948. } else if ("heart".equals(method)) {
  949. handleHeartbeat(payload, SystemTypeCodeEnum.perception.getCode());
  950. }
  951. } catch (Exception e) {
  952. log.error("处理状态感知事件失败", e);
  953. }
  954. }
  955. @Transactional(rollbackFor = Exception.class)
  956. public void handleCollectionEvent(String payload, String topic) {
  957. try {
  958. JsonNode root = mapper.readTree(payload);
  959. String method = root.path("method").asText();
  960. if ("event".equals(method)) {
  961. CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class);
  962. if(message.getEventCode() == null && root.get("code") != null) {
  963. message.setEventCode(root.get("code").asInt());
  964. }
  965. SasDevice device = findDevice(message.getDeviceId(), SystemTypeCodeEnum.collection.getCode());
  966. if (device == null) {
  967. log.warn("设备不存在,丢弃状态采集事件: deviceId={}", message.getDeviceId());
  968. return;
  969. }
  970. SasCollectionEvent event = new SasCollectionEvent();
  971. String eventId = message.getId() != null && !message.getId().isEmpty()
  972. ? message.getId()
  973. : IdUtil.randomUUID();
  974. if (collectionEventService.getById(eventId) != null) {
  975. log.info("状态采集事件已存在, 跳过保存, eventId={}", eventId);
  976. return;
  977. }
  978. event.setEventId(eventId);
  979. BeanUtil.copyProperties(message, event, "eventId");
  980. event.setTriggerTime(parseTime(message.getTriggerTime()));
  981. event.setCreateTime(LocalDateTime.now());
  982. event.setUpdateTime(LocalDateTime.now());
  983. if (message.getScenePicInfo() != null) {
  984. SasPic bean = (SasPic)BeanUtil.toBean(message.getScenePicInfo(), SasPic.class);
  985. bean.setId(IdUtil.randomUUID());
  986. this.sasPicMapper.insert(bean);
  987. event.setScenePicId(bean.getId());
  988. }
  989. if (message.getEventPicInfo() != null) {
  990. SasPic bean = (SasPic)BeanUtil.toBean(message.getEventPicInfo(), SasPic.class);
  991. bean.setId(IdUtil.randomUUID());
  992. this.sasPicMapper.insert(bean);
  993. event.setPicId(bean.getId());
  994. }
  995. collectionEventService.save(event);
  996. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  997. info.setTriggerTime(message.getTriggerTime());
  998. info.setCreateTime(LocalDateTime.now().format(formatter));
  999. if (message.getEventPicInfo() != null) {
  1000. info.setEventUrl(message.getEventPicInfo().getUrl() + message.getEventPicInfo().getPath());
  1001. }
  1002. if (message.getScenePicInfo() != null) {
  1003. info.setEventUrl(message.getScenePicInfo().getUrl() + message.getScenePicInfo().getPath());
  1004. }
  1005. SasCollectionEventCode code = collectionEventCodeMapper.selectById(event.getEventCode());
  1006. if (code != null) {
  1007. info.setEventTypeName(code.getName());
  1008. }
  1009. enrichDeviceInfo(info, device);
  1010. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.collection.getCode());
  1011. if (eventTypeGroup != null) {
  1012. info.setEventLevelId(eventTypeGroup.getEventLevel());
  1013. }
  1014. info.setDeviceType(SystemTypeCodeEnum.collection.getCode());
  1015. SasWebSocket.sendAll(info);
  1016. log.info("状态采集事件保存并推送成功, eventId={}", event.getEventId());
  1017. } else if ("heart".equals(method)) {
  1018. handleHeartbeat(payload, SystemTypeCodeEnum.collection.getCode());
  1019. }
  1020. } catch (Exception e) {
  1021. log.error("处理状态采集事件失败", e);
  1022. }
  1023. }
  1024. private void handleHeartbeat(String payload, int deviceTypeCode) {
  1025. try {
  1026. GlobalDeviceHeartInfo heartInfo = mapper.readValue(payload, GlobalDeviceHeartInfo.class);
  1027. List<SasDevice> devices = deviceService.list(new LambdaQueryWrapper<SasDevice>()
  1028. .eq(SasDevice::getDeviceId, heartInfo.getDeviceId())
  1029. .eq(SasDevice::getDeviceType, deviceTypeCode));
  1030. if (CollUtil.isNotEmpty(devices)) {
  1031. GisHeartInfo heartInfoGis = heartInfo.getGis();
  1032. List<SasDevice> updateList = devices.stream().map(device -> {
  1033. device.setTriggerTime(LocalDateTime.now());
  1034. device.setUpdateTime(LocalDateTime.now());
  1035. if (heartInfoGis != null && StrUtil.isNotBlank(device.getGisId())) {
  1036. SasGis gis = gisMapper.selectById(device.getGisId());
  1037. if (gis != null) {
  1038. if (heartInfoGis.getLongitude() != null) gis.setLon(heartInfoGis.getLongitude());
  1039. if (heartInfoGis.getLatitude() != null) gis.setLat(heartInfoGis.getLatitude());
  1040. if (heartInfoGis.getAltitude() != null) gis.setAlt(heartInfoGis.getAltitude());
  1041. gisMapper.updateById(gis);
  1042. }
  1043. }
  1044. return device;
  1045. }).collect(Collectors.toList());
  1046. deviceService.updateBatchById(updateList);
  1047. log.debug("更新设备心跳成功, deviceId={}, count={}", heartInfo.getDeviceId(), updateList.size());
  1048. }
  1049. } catch (Exception e) {
  1050. log.error("处理设备心跳失败", e);
  1051. }
  1052. }
  1053. private void enrichDeviceInfo(BrieflyEventInfo info, SasDevice device) {
  1054. if (device != null) {
  1055. info.setAddress(device.getAddress());
  1056. if (StrUtil.isBlank(info.getNote())) {
  1057. info.setNote(device.getNote());
  1058. }
  1059. }
  1060. }
  1061. private void enrichDeviceInfo(BrieflyEventInfo info, String deviceId, Integer channel, Integer deviceTypeCode) {
  1062. SasDevice device = findDevice(deviceId, deviceTypeCode);
  1063. enrichDeviceInfo(info, device);
  1064. }
  1065. private String extractDeviceId(String topic) {
  1066. String[] parts = topic.split("/");
  1067. return parts.length > 0 ? parts[parts.length - 1] : "unknown";
  1068. }
  1069. private LocalDateTime parseTime(String timeStr) {
  1070. if (timeStr == null || timeStr.isEmpty()) {
  1071. return LocalDateTime.now();
  1072. }
  1073. try {
  1074. return LocalDateTime.parse(timeStr, formatter);
  1075. } catch (Exception e) {
  1076. log.warn("时间解析失败: {}, 使用当前时间", timeStr);
  1077. return LocalDateTime.now();
  1078. }
  1079. }
  1080. public void subscribe(String topic) {
  1081. try {
  1082. if (client != null && client.isConnected()) {
  1083. client.subscribe(topic);
  1084. log.info("订阅主题成功: {}", topic);
  1085. }
  1086. } catch (MqttException e) {
  1087. log.error("订阅主题失败: {}", topic, e);
  1088. }
  1089. }
  1090. public void pauseListening() {
  1091. this.isListening = false;
  1092. log.info("MQTT监听已暂停");
  1093. }
  1094. public void resumeListening() {
  1095. this.isListening = true;
  1096. log.info("MQTT监听已恢复");
  1097. }
  1098. public boolean getConnectionStatus() {
  1099. return client != null && client.isConnected();
  1100. }
  1101. public boolean getListeningStatus() {
  1102. return isListening;
  1103. }
  1104. public void reconnect() {
  1105. try {
  1106. if (client != null) {
  1107. if (client.isConnected()) {
  1108. client.disconnect();
  1109. }
  1110. client.close();
  1111. client = null;
  1112. }
  1113. isListening = true;
  1114. init();
  1115. } catch (Exception e) {
  1116. log.error("MQTT重连失败", e);
  1117. isListening = false;
  1118. }
  1119. }
  1120. @PreDestroy
  1121. public void disconnect() {
  1122. try {
  1123. if (client != null) {
  1124. if (client.isConnected()) {
  1125. client.disconnect();
  1126. log.info("MQTT连接已断开");
  1127. }
  1128. client.close();
  1129. client = null;
  1130. }
  1131. } catch (MqttException e) {
  1132. log.error("断开MQTT连接失败", e);
  1133. }
  1134. }
  1135. }