MqttService.java 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341
  1. package com.usky.sas.mqtt;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.util.IdUtil;
  5. import cn.hutool.core.util.StrUtil;
  6. import cn.hutool.http.HttpUtil;
  7. import cn.hutool.json.JSONUtil;
  8. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  9. import com.fasterxml.jackson.databind.DeserializationFeature;
  10. import com.fasterxml.jackson.databind.JsonNode;
  11. import com.fasterxml.jackson.databind.ObjectMapper;
  12. import com.usky.sas.common.enums.SystemTypeCodeEnum;
  13. import com.usky.sas.common.global.SasWebSocket;
  14. import com.usky.sas.domain.*;
  15. import com.usky.sas.mapper.*;
  16. import com.usky.sas.mqtt.dto.*;
  17. import com.usky.sas.service.*;
  18. import com.usky.sas.service.dto.agbox.JsonRpcRequest;
  19. import com.usky.sas.service.vo.*;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.eclipse.paho.client.mqttv3.*;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.beans.factory.annotation.Value;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.transaction.annotation.Transactional;
  26. import javax.annotation.PostConstruct;
  27. import javax.annotation.PreDestroy;
  28. import java.time.LocalDateTime;
  29. import java.time.format.DateTimeFormatter;
  30. import java.util.Arrays;
  31. import java.util.HashMap;
  32. import java.util.List;
  33. import java.util.Map;
  34. import java.util.stream.Collectors;
  35. @Slf4j
  36. @Service
  37. public class MqttService {
  38. @Value("${mqtt.client-id:sas-client-${random.uuid}}")
  39. private String clientId;
  40. @Value("${mqtt.topics:}")
  41. private String topics;
  42. private MqttClient client;
  43. private volatile boolean isListening = true;
  44. private final ObjectMapper mapper = new ObjectMapper()
  45. .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  46. private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  47. @Autowired
  48. private SasUsbEventService usbEventService;
  49. @Autowired
  50. private SasSnapEventService snapEventService;
  51. @Autowired
  52. private SasEntranceEventService entranceEventService;
  53. @Autowired
  54. private SasParkingEventService parkingEventService;
  55. @Autowired
  56. private SasRoadblockEventService roadblockEventService;
  57. @Autowired
  58. private SasAlarsasEventService alarsasEventService;
  59. @Autowired
  60. private SasPatrolEventService patrolEventService;
  61. @Autowired
  62. private SasAcquisitionEventService acquisitionEventService;
  63. @Autowired
  64. private SasPerceptionEventService perceptionEventService;
  65. @Autowired
  66. private SasCollectionEventService collectionEventService;
  67. @Autowired
  68. private SasConfigService sasConfigService;
  69. @Autowired
  70. private SasDeviceService deviceService;
  71. @Autowired
  72. private SasEventTypeGroupService eventTypeGroupService;
  73. @Autowired
  74. private SasPatrolUserParamService sasPatrolUserParamService;
  75. @Autowired
  76. private SasGisMapper gisMapper;
  77. @Autowired
  78. private SasPicMapper sasPicMapper;
  79. // Event Code Mappers
  80. @Autowired
  81. private SasUsbEventCodeMapper usbEventCodeMapper;
  82. @Autowired
  83. private SasSnapTypeCodeMapper snapTypeCodeMapper;
  84. @Autowired
  85. private SasEntranceEventCodeMapper entranceEventCodeMapper;
  86. @Autowired
  87. private SasParkingEventCodeMapper parkingEventCodeMapper;
  88. @Autowired
  89. private SasRoadblockEventCodeMapper roadblockEventCodeMapper;
  90. @Autowired
  91. private SasAlarsasEventCodeMapper alarsasEventCodeMapper;
  92. @Autowired
  93. private SasPatrolEventCodeMapper patrolEventCodeMapper;
  94. @Autowired
  95. private SasAcquisitionEventCodeMapper acquisitionEventCodeMapper;
  96. @Autowired
  97. private SasPerceptionEventCodeMapper perceptionEventCodeMapper;
  98. @Autowired
  99. private SasCollectionEventCodeMapper collectionEventCodeMapper;
  100. @PostConstruct
  101. public void init() {
  102. SasConfig config = sasConfigService.getConfig();
  103. if (config == null || config.getHost() == null || config.getHost().isEmpty()) {
  104. log.warn("sas_config 中无 MQTT 配置或 host 为空,跳过 MQTT 连接");
  105. isListening = false;
  106. return;
  107. }
  108. try {
  109. String host = config.getHost();
  110. int port = parsePort(config.getPort(), 1883);
  111. String username = config.getUsername() != null ? config.getUsername() : "";
  112. String password = config.getPassword() != null ? config.getPassword() : "";
  113. String brokerUrl = "tcp://" + host + ":" + port;
  114. client = new MqttClient(brokerUrl, clientId);
  115. MqttConnectOptions options = new MqttConnectOptions();
  116. options.setCleanSession(true);
  117. if (!username.isEmpty()) {
  118. options.setUserName(username);
  119. options.setPassword(password.toCharArray());
  120. }
  121. options.setConnectionTimeout(60);
  122. options.setKeepAliveInterval(60);
  123. options.setAutomaticReconnect(true);
  124. client.setCallback(new MqttCallbackExtended() {
  125. @Override
  126. public void connectComplete(boolean reconnect, String serverURI) {
  127. log.info("MQTT连接成功, reconnect={}", reconnect);
  128. parseTopics().forEach(MqttService.this::subscribe);
  129. isListening = true;
  130. }
  131. @Override
  132. public void connectionLost(Throwable cause) {
  133. log.error("MQTT连接丢失: {}", cause.getMessage());
  134. isListening = false;
  135. scheduleReconnect();
  136. }
  137. @Override
  138. public void messageArrived(String topic, MqttMessage message) throws Exception {
  139. if (isListening) {
  140. processMessage(topic, message);
  141. }
  142. }
  143. @Override
  144. public void deliveryComplete(IMqttDeliveryToken token) {
  145. }
  146. });
  147. client.connect(options);
  148. log.info("MQTT客户端初始化完成, clientId={}", clientId);
  149. isListening = true;
  150. } catch (MqttException e) {
  151. log.error("MQTT初始化失败: {}", e.getMessage(), e);
  152. isListening = false;
  153. }
  154. }
  155. private static int parsePort(String portStr, int defaultPort) {
  156. if (portStr == null || portStr.isEmpty()) {
  157. return defaultPort;
  158. }
  159. try {
  160. return Integer.parseInt(portStr.trim());
  161. } catch (NumberFormatException e) {
  162. return defaultPort;
  163. }
  164. }
  165. private void scheduleReconnect() {
  166. Thread t = new Thread(() -> {
  167. try {
  168. Thread.sleep(5000L);
  169. if (client != null && !client.isConnected()) {
  170. log.info("尝试主动重连MQTT...");
  171. client.reconnect();
  172. }
  173. } catch (Exception e) {
  174. log.error("MQTT主动重连失败", e);
  175. }
  176. }, "mqtt-reconnect");
  177. t.setDaemon(true);
  178. t.start();
  179. }
  180. private List<String> parseTopics() {
  181. return Arrays.asList(topics.split(","));
  182. }
  183. private void processMessage(String topic, MqttMessage message) {
  184. String payload = new String(message.getPayload());
  185. log.debug("收到MQTT消息, topic={}, payload={}", topic, payload);
  186. try {
  187. String[] parts = topic.split("/");
  188. if (parts.length < 2) {
  189. log.warn("未知的topic格式: {}", topic);
  190. return;
  191. }
  192. String eventType = null;
  193. for (int i = 0; i < parts.length - 1; i++) {
  194. if ("event".equals(parts[i]) && i + 1 < parts.length) {
  195. eventType = parts[i + 1];
  196. break;
  197. }
  198. }
  199. if (eventType == null) {
  200. log.warn("无法从topic解析事件类型: {}", topic);
  201. return;
  202. }
  203. switch (eventType) {
  204. case "usbalarm":
  205. handleUsbEvent(payload, topic);
  206. break;
  207. case "snap":
  208. handleSnapEvent(payload, topic);
  209. break;
  210. case "entrance":
  211. handleEntranceEvent(payload, topic);
  212. break;
  213. case "parking":
  214. handleParkingEvent(payload, topic);
  215. break;
  216. case "roadblock":
  217. handleRoadblockEvent(payload, topic);
  218. break;
  219. case "alarm":
  220. handleAlarsasEvent(payload, topic);
  221. break;
  222. case "patrol":
  223. handlePatrolEvent(payload, topic);
  224. break;
  225. case "acquisition":
  226. handleAcquisitionEvent(payload, topic);
  227. break;
  228. case "perception":
  229. handlePerceptionEvent(payload, topic);
  230. break;
  231. case "collection":
  232. handleCollectionEvent(payload, topic);
  233. break;
  234. default:
  235. log.warn("未知的事件类型: {}, topic={}", eventType, topic);
  236. }
  237. } catch (Exception e) {
  238. log.error("处理MQTT消息失败, topic={}", topic, e);
  239. }
  240. }
  241. /**
  242. * 查询设备信息,如果设备不存在返回 null
  243. */
  244. private SasDevice findDevice(String deviceId, Integer channel, Integer deviceTypeCode) {
  245. return deviceService.getOne(new LambdaQueryWrapper<SasDevice>()
  246. .eq(SasDevice::getDeviceId, deviceId)
  247. .eq(SasDevice::getChannel, channel)
  248. .eq(SasDevice::getDeviceType, deviceTypeCode)
  249. .last("limit 1"));
  250. }
  251. /**
  252. * 查询事件类型分组信息
  253. */
  254. private SasEventTypeGroup findEventTypeGroup(Integer deviceTypeCode) {
  255. return eventTypeGroupService.getOne(new LambdaQueryWrapper<SasEventTypeGroup>()
  256. .eq(SasEventTypeGroup::getDeviceType, deviceTypeCode)
  257. .last("limit 1"));
  258. }
  259. @Transactional(rollbackFor = Exception.class)
  260. public void handleUsbEvent(String payload, String topic) {
  261. try {
  262. JsonNode root = mapper.readTree(payload);
  263. String method = root.path("method").asText();
  264. if ("event".equals(method)) {
  265. UsbEventMessage message = mapper.readValue(payload, UsbEventMessage.class);
  266. if(message.getEventCode() == null && root.get("code") != null) {
  267. message.setEventCode(root.get("code").asInt());
  268. }
  269. // 1. 检查设备是否存在
  270. SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.usb.getCode());
  271. if (device == null) {
  272. log.warn("设备不存在,丢弃USB事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel());
  273. return;
  274. }
  275. SasUsbEvent event = new SasUsbEvent();
  276. String eventId = message.getId() != null && !message.getId().isEmpty()
  277. ? message.getId()
  278. : IdUtil.randomUUID();
  279. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  280. if (usbEventService.getById(eventId) != null) {
  281. log.info("USB事件已存在, 跳过保存, eventId={}", eventId);
  282. return;
  283. }
  284. event.setEventId(eventId);
  285. BeanUtil.copyProperties(message, event, "eventId");
  286. event.setCardId(message.getIcId());
  287. event.setTriggerTime(parseTime(message.getTriggerTime()));
  288. event.setCreateTime(LocalDateTime.now());
  289. event.setUpdateTime(LocalDateTime.now());
  290. // 处理图片信息
  291. if (message.getEventPic() != null) {
  292. UsbEventMessage.PicInfo picInfo = message.getEventPic();
  293. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  294. SasPic pic = new SasPic();
  295. pic.setId(IdUtil.randomUUID());
  296. pic.setUrl(picInfo.getUrl());
  297. pic.setPath(picInfo.getPath());
  298. pic.setCreateTime(LocalDateTime.now());
  299. pic.setUpdateTime(LocalDateTime.now());
  300. sasPicMapper.insert(pic);
  301. // 设置事件图片ID
  302. event.setPicId(pic.getId());
  303. }
  304. }
  305. usbEventService.save(event);
  306. // WebSocket 推送
  307. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  308. info.setTriggerTime(message.getTriggerTime());
  309. info.setCreateTime(LocalDateTime.now().format(formatter));
  310. // 设置图片URL(如果有)
  311. if (event.getPicId() != null) {
  312. SasPic pic = sasPicMapper.selectById(event.getPicId());
  313. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  314. info.setEventUrl(pic.getUrl() + pic.getPath());
  315. }
  316. }
  317. SasUsbEventCode code = usbEventCodeMapper.selectById(event.getEventCode());
  318. if (code != null) {
  319. info.setEventTypeName(code.getName());
  320. }
  321. enrichDeviceInfo(info, device);
  322. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.usb.getCode());
  323. if (eventTypeGroup != null) {
  324. info.setEventLevelId(eventTypeGroup.getEventLevel());
  325. }
  326. info.setDeviceType(SystemTypeCodeEnum.usb.getCode());
  327. SasWebSocket.sendAll(info);
  328. log.info("USB事件保存并推送成功, eventId={}", event.getEventId());
  329. } else if ("heart".equals(method)) {
  330. handleHeartbeat(payload, SystemTypeCodeEnum.usb.getCode());
  331. }
  332. } catch (Exception e) {
  333. log.error("处理USB事件失败", e);
  334. }
  335. }
  336. @Transactional(rollbackFor = Exception.class)
  337. public void handleSnapEvent(String payload, String topic) {
  338. try {
  339. JsonNode root = mapper.readTree(payload);
  340. String method = root.path("method").asText();
  341. if ("event".equals(method)) {
  342. SnapEventResult eventResult = (SnapEventResult)this.mapper.readValue(payload, SnapEventResult.class);
  343. log.info("实时智能分析");
  344. Map<String, Object> map = new HashMap();
  345. map.put("eventId", eventResult.getId());
  346. JsonRpcRequest getEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  347. Map<String, Object> getEventInfo = new HashMap();
  348. getEventInfo.put("key", this.sasConfigService.getConfig().getKeyds());
  349. getEventInfo.put("json", getEventInfoJson.toString());
  350. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getEventInfo));
  351. String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/snap", getEventInfo);
  352. log.info("请求AG响应:{}", resultEvent);
  353. SnapEventInfoVo eventInfoVo = (SnapEventInfoVo)this.mapper.readValue(resultEvent, SnapEventInfoVo.class);
  354. SnapEventInfoResult result = eventInfoVo.getResult();
  355. SnapEventInfo eventInfo = result.getEventInfo();
  356. SasSnapEvent bean = (SasSnapEvent)BeanUtil.toBean(eventInfo, SasSnapEvent.class);
  357. BrieflyEventInfo info = (BrieflyEventInfo)BeanUtil.toBeanIgnoreError(bean, BrieflyEventInfo.class);
  358. if (eventInfo.getEventPic() != null) {
  359. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  360. pic.setId(IdUtil.randomUUID());
  361. this.sasPicMapper.insert(pic);
  362. bean.setEventPicId(pic.getId());
  363. info.setEventUrl(pic.getUrl() + pic.getPath());
  364. }
  365. if (eventInfo.getScenePic() != null) {
  366. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getScenePic(), SasPic.class);
  367. pic.setId(IdUtil.randomUUID());
  368. this.sasPicMapper.insert(pic);
  369. bean.setScenePicId(pic.getId());
  370. info.setSceneUrl(pic.getUrl() + pic.getPath());
  371. }
  372. if(eventInfo.getEventCode() == null && root.get("code") != null) {
  373. eventInfo.setEventCode(root.get("code").asInt());
  374. }
  375. // 1. 检查设备是否存在
  376. SasDevice device = findDevice(eventInfo.getDeviceId(), eventInfo.getChannel(), SystemTypeCodeEnum.snap.getCode());
  377. if (device == null) {
  378. log.warn("设备不存在,丢弃抓拍事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel());
  379. return;
  380. }
  381. String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty()
  382. ? eventResult.getId()
  383. : IdUtil.randomUUID();
  384. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  385. if (snapEventService.getById(eventId) != null) {
  386. log.info("抓拍事件已存在, 跳过保存, eventId={}", eventId);
  387. return;
  388. }
  389. bean.setEventId(eventId);
  390. BeanUtil.copyProperties(eventInfo, bean, "eventId");
  391. bean.setTriggerTime(parseTime(eventInfo.getTriggerTime()));
  392. bean.setCreateTime(LocalDateTime.now());
  393. bean.setUpdateTime(LocalDateTime.now());
  394. snapEventService.save(bean);
  395. info.setTriggerTime(eventInfo.getTriggerTime());
  396. info.setCreateTime(LocalDateTime.now().format(formatter));
  397. SasSnapTypeCode code = snapTypeCodeMapper.selectById(bean.getEventCode());
  398. if (code != null) {
  399. info.setEventTypeName(code.getName());
  400. }
  401. enrichDeviceInfo(info, device);
  402. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.snap.getCode());
  403. if (eventTypeGroup != null) {
  404. info.setEventLevelId(eventTypeGroup.getEventLevel());
  405. }
  406. info.setDeviceType(SystemTypeCodeEnum.snap.getCode());
  407. SasWebSocket.sendAll(info);
  408. log.info("抓拍事件保存并推送成功, eventId={}", bean.getEventId());
  409. } else if ("heart".equals(method)) {
  410. handleHeartbeat(payload, SystemTypeCodeEnum.snap.getCode());
  411. }
  412. } catch (Exception e) {
  413. log.error("处理抓拍事件失败", e);
  414. }
  415. }
  416. @Transactional(rollbackFor = Exception.class)
  417. public void handleEntranceEvent(String payload, String topic) {
  418. try {
  419. JsonNode root = mapper.readTree(payload);
  420. String method = root.path("method").asText();
  421. if ("event".equals(method)) {
  422. EntranceEventMessage message = mapper.readValue(payload, EntranceEventMessage.class);
  423. if(message.getEventCode() == null && root.get("code") != null) {
  424. message.setEventCode(root.get("code").asInt());
  425. }
  426. // 1. 检查设备是否存在
  427. SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.entrance.getCode());
  428. if (device == null) {
  429. log.warn("设备不存在,丢弃门禁事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel());
  430. return;
  431. }
  432. SasEntranceEvent event = new SasEntranceEvent();
  433. String eventId = message.getId() != null && !message.getId().isEmpty()
  434. ? message.getId()
  435. : IdUtil.randomUUID();
  436. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  437. if (entranceEventService.getById(eventId) != null) {
  438. log.info("门禁事件已存在, 跳过保存, eventId={}", eventId);
  439. return;
  440. }
  441. event.setEventId(eventId);
  442. BeanUtil.copyProperties(message, event, "eventId");
  443. event.setCardId(message.getIcId());
  444. event.setTriggerTime(parseTime(message.getTriggerTime()));
  445. event.setCreateTime(LocalDateTime.now());
  446. event.setUpdateTime(LocalDateTime.now());
  447. // 处理图片信息
  448. if (message.getEntrancePic() != null) {
  449. EntranceEventMessage.PicInfo picInfo = message.getEntrancePic();
  450. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  451. SasPic pic = new SasPic();
  452. pic.setId(IdUtil.randomUUID());
  453. pic.setUrl(picInfo.getUrl());
  454. pic.setPath(picInfo.getPath());
  455. pic.setCreateTime(LocalDateTime.now());
  456. pic.setUpdateTime(LocalDateTime.now());
  457. sasPicMapper.insert(pic);
  458. // 设置事件图片ID
  459. event.setPicId(pic.getId());
  460. }
  461. }
  462. entranceEventService.save(event);
  463. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  464. info.setTriggerTime(message.getTriggerTime());
  465. info.setCreateTime(LocalDateTime.now().format(formatter));
  466. // 设置图片URL(如果有)
  467. if (event.getPicId() != null) {
  468. SasPic pic = sasPicMapper.selectById(event.getPicId());
  469. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  470. info.setEventUrl(pic.getUrl() + pic.getPath());
  471. }
  472. }
  473. SasEntranceEventCode code = entranceEventCodeMapper.selectById(event.getEventCode());
  474. if (code != null) {
  475. info.setEventTypeName(code.getName());
  476. }
  477. enrichDeviceInfo(info, device);
  478. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.entrance.getCode());
  479. if (eventTypeGroup != null) {
  480. info.setEventLevelId(eventTypeGroup.getEventLevel());
  481. }
  482. info.setDeviceType(SystemTypeCodeEnum.entrance.getCode());
  483. SasWebSocket.sendAll(info);
  484. log.info("门禁事件保存并推送成功, eventId={}", event.getEventId());
  485. } else if ("heart".equals(method)) {
  486. handleHeartbeat(payload, SystemTypeCodeEnum.entrance.getCode());
  487. }
  488. } catch (Exception e) {
  489. log.error("处理门禁事件失败", e);
  490. }
  491. }
  492. @Transactional(rollbackFor = Exception.class)
  493. public void handleParkingEvent(String payload, String topic) {
  494. try {
  495. JsonNode root = mapper.readTree(payload);
  496. String method = root.path("method").asText();
  497. if ("event".equals(method)) {
  498. ParkingEventResult eventResult = (ParkingEventResult)this.mapper.readValue(payload, ParkingEventResult.class);
  499. Map<String, Object> map = new HashMap();
  500. map.put("eventId", eventResult.getId());
  501. JsonRpcRequest getParkingEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  502. Map<String, Object> getParkingEventInfo = new HashMap();
  503. getParkingEventInfo.put("key", this.sasConfigService.getConfig().getKeyds());
  504. getParkingEventInfo.put("json", getParkingEventInfoJson.toString());
  505. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getParkingEventInfo));
  506. String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/parking", getParkingEventInfo);
  507. log.info("请求AG响应:{}", resultEvent);
  508. ParkingEventInfoVo parkingEventInfoVo = (ParkingEventInfoVo)this.mapper.readValue(resultEvent, ParkingEventInfoVo.class);
  509. ParkingEventInfoResult result = parkingEventInfoVo.getResult();
  510. ParkingEventInfo eventInfo = result.getEventInfo();
  511. SasParkingEvent bean = (SasParkingEvent)BeanUtil.toBean(eventInfo, SasParkingEvent.class);
  512. bean.setCreateTime(LocalDateTime.parse(eventInfo.getReceivingTime(), this.formatter));
  513. bean.setTriggerTime(LocalDateTime.parse(eventInfo.getTriggerTime(), this.formatter));
  514. bean.setEventId(eventResult.getId());
  515. bean.setAccessType(eventInfo.getIo());
  516. bean.setCarType(eventInfo.getCarCode());
  517. bean.setPlateType(eventInfo.getPlateCode());
  518. if (eventInfo.getEventPic() != null) {
  519. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  520. pic.setId(IdUtil.randomUUID());
  521. this.sasPicMapper.insert(pic);
  522. bean.setEventPicId(pic.getId());
  523. }
  524. if (eventInfo.getPlacePic() != null) {
  525. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getPlacePic(), SasPic.class);
  526. pic.setId(IdUtil.randomUUID());
  527. this.sasPicMapper.insert(pic);
  528. bean.setPlatePicId(pic.getId());
  529. }
  530. if(eventInfo.getEventCode() == null && root.get("code") != null) {
  531. eventInfo.setEventCode(root.get("code").asInt());
  532. }
  533. // 1. 检查设备是否存在
  534. SasDevice device = findDevice(eventInfo.getDeviceId(), eventInfo.getChannel(), SystemTypeCodeEnum.parking.getCode());
  535. if (device == null) {
  536. log.warn("设备不存在,丢弃停车场事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel());
  537. return;
  538. }
  539. SasParkingEvent event = new SasParkingEvent();
  540. String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty()
  541. ? eventResult.getId()
  542. : IdUtil.randomUUID();
  543. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  544. if (parkingEventService.getById(eventId) != null) {
  545. log.info("停车场事件已存在, 跳过保存, eventId={}", eventId);
  546. return;
  547. }
  548. event.setEventId(eventId);
  549. BeanUtil.copyProperties(eventInfo, event, "eventId");
  550. event.setTriggerTime(parseTime(eventInfo.getTriggerTime()));
  551. event.setCreateTime(LocalDateTime.now());
  552. event.setUpdateTime(LocalDateTime.now());
  553. parkingEventService.save(event);
  554. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  555. info.setTriggerTime(eventInfo.getTriggerTime());
  556. info.setCreateTime(LocalDateTime.now().format(formatter));
  557. // 设置图片URL(如果有)
  558. if (eventInfo.getEventPic() != null) {
  559. info.setEventUrl(eventInfo.getEventPic().getUrl() + eventInfo.getEventPic().getPath());
  560. }
  561. if (eventInfo.getPlacePic() != null) {
  562. info.setEventUrl(eventInfo.getPlacePic().getUrl() + eventInfo.getPlacePic().getPath());
  563. }
  564. SasParkingEventCode code = parkingEventCodeMapper.selectById(event.getEventCode());
  565. if (code != null) {
  566. info.setEventTypeName(code.getName());
  567. }
  568. enrichDeviceInfo(info, device);
  569. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.parking.getCode());
  570. if (eventTypeGroup != null) {
  571. info.setEventLevelId(eventTypeGroup.getEventLevel());
  572. }
  573. info.setDeviceType(SystemTypeCodeEnum.parking.getCode());
  574. SasWebSocket.sendAll(info);
  575. log.info("停车场事件保存并推送成功, eventId={}", event.getEventId());
  576. } else if ("heart".equals(method)) {
  577. handleHeartbeat(payload, SystemTypeCodeEnum.parking.getCode());
  578. }
  579. } catch (Exception e) {
  580. log.error("处理停车场事件失败", e);
  581. }
  582. }
  583. @Transactional(rollbackFor = Exception.class)
  584. public void handleRoadblockEvent(String payload, String topic) {
  585. try {
  586. JsonNode root = mapper.readTree(payload);
  587. String method = root.path("method").asText();
  588. if ("event".equals(method)) {
  589. RoadblockEventMessage message = mapper.readValue(payload, RoadblockEventMessage.class);
  590. if(message.getEventCode() == null && root.get("code") != null) {
  591. message.setEventCode(root.get("code").asInt());
  592. }
  593. // 1. 检查设备是否存在
  594. SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.roadblock.getCode());
  595. if (device == null) {
  596. log.warn("设备不存在,丢弃阻车路障事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel());
  597. return;
  598. }
  599. SasRoadblockEvent event = new SasRoadblockEvent();
  600. String eventId = message.getId() != null && !message.getId().isEmpty()
  601. ? message.getId()
  602. : IdUtil.randomUUID();
  603. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  604. if (roadblockEventService.getById(eventId) != null) {
  605. log.info("阻车路障事件已存在, 跳过保存, eventId={}", eventId);
  606. return;
  607. }
  608. event.setEventId(eventId);
  609. BeanUtil.copyProperties(message, event, "eventId");
  610. event.setTriggerTime(parseTime(message.getTriggerTime()));
  611. event.setCreateTime(LocalDateTime.now());
  612. event.setUpdateTime(LocalDateTime.now());
  613. // 处理图片信息
  614. if (message.getRoadblockPic() != null) {
  615. RoadblockEventMessage.PicInfo picInfo = message.getRoadblockPic();
  616. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  617. SasPic pic = new SasPic();
  618. pic.setId(IdUtil.randomUUID());
  619. pic.setUrl(picInfo.getUrl());
  620. pic.setPath(picInfo.getPath());
  621. pic.setCreateTime(LocalDateTime.now());
  622. pic.setUpdateTime(LocalDateTime.now());
  623. sasPicMapper.insert(pic);
  624. // 设置事件图片ID
  625. event.setPicId(pic.getId());
  626. }
  627. }
  628. roadblockEventService.save(event);
  629. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  630. info.setTriggerTime(message.getTriggerTime());
  631. info.setCreateTime(LocalDateTime.now().format(formatter));
  632. // 设置图片URL(如果有)
  633. if (event.getPicId() != null) {
  634. SasPic pic = sasPicMapper.selectById(event.getPicId());
  635. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  636. info.setEventUrl(pic.getUrl() + pic.getPath());
  637. }
  638. }
  639. SasRoadblockEventCode code = roadblockEventCodeMapper.selectById(event.getEventCode());
  640. if (code != null) {
  641. info.setEventTypeName(code.getName());
  642. }
  643. enrichDeviceInfo(info, device);
  644. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.roadblock.getCode());
  645. if (eventTypeGroup != null) {
  646. info.setEventLevelId(eventTypeGroup.getEventLevel());
  647. }
  648. info.setDeviceType(SystemTypeCodeEnum.roadblock.getCode());
  649. SasWebSocket.sendAll(info);
  650. log.info("阻车路障事件保存并推送成功, eventId={}", event.getEventId());
  651. } else if ("heart".equals(method)) {
  652. handleHeartbeat(payload, SystemTypeCodeEnum.roadblock.getCode());
  653. }
  654. } catch (Exception e) {
  655. log.error("处理阻车路障事件失败", e);
  656. }
  657. }
  658. @Transactional(rollbackFor = Exception.class)
  659. public void handleAlarsasEvent(String payload, String topic) {
  660. try {
  661. JsonNode root = mapper.readTree(payload);
  662. String method = root.path("method").asText();
  663. if ("event".equals(method)) {
  664. AlarmEventResult eventResult = (AlarmEventResult)this.mapper.readValue(payload, AlarmEventResult.class);
  665. Map<String, Object> map = new HashMap();
  666. map.put("eventId", eventResult.getId());
  667. JsonRpcRequest getAlarmEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  668. Map<String, Object> getAlarmEventInfo = new HashMap();
  669. getAlarmEventInfo.put("key", this.sasConfigService.getConfig().getKeyds());
  670. getAlarmEventInfo.put("json", getAlarmEventInfoJson.toString());
  671. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getAlarmEventInfo));
  672. String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/alarm", getAlarmEventInfo);
  673. log.info("请求AG响应:{}", resultEvent);
  674. AlarmEventInfoVo alarmEventInfoVo = (AlarmEventInfoVo)this.mapper.readValue(resultEvent, AlarmEventInfoVo.class);
  675. AlarmEventInfoResult result = alarmEventInfoVo.getResult();
  676. AlarmEventInfo eventInfo = result.getEventInfo();
  677. SasAlarsasEvent event = new SasAlarsasEvent();
  678. if (eventInfo.getEventPic() != null) {
  679. SasPic pic = (SasPic)BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  680. pic.setId(IdUtil.randomUUID());
  681. this.sasPicMapper.insert(pic);
  682. event.setPicId(pic.getId());
  683. }
  684. if (eventInfo.getGis() != null) {
  685. SasGis gis = (SasGis)BeanUtil.toBean(eventInfo.getGis(), SasGis.class);
  686. gis.setId(IdUtil.randomUUID());
  687. this.gisMapper.insert(gis);
  688. event.setGisId(gis.getId());
  689. }
  690. if(eventInfo.getEventCode() == null && root.get("code") != null) {
  691. eventInfo.setEventCode(root.get("code").asInt());
  692. }
  693. // 1. 检查设备是否存在
  694. SasDevice device = findDevice(eventInfo.getDeviceId(), eventInfo.getChannel(), SystemTypeCodeEnum.alarm.getCode());
  695. if (device == null) {
  696. log.warn("设备不存在,丢弃入侵报警事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel());
  697. return;
  698. }
  699. String eventId = eventResult.getId() != null && !eventResult.getId().isEmpty()
  700. ? eventResult.getId()
  701. : IdUtil.randomUUID();
  702. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  703. // if (alarsasEventService.getById(eventId) != null) {
  704. // log.info("入侵报警事件已存在, 跳过保存, eventId={}", eventId);
  705. // return;
  706. // }
  707. event.setEventId(eventId);
  708. BeanUtil.copyProperties(eventResult, event, "eventId");
  709. event.setTriggerTime(parseTime(eventInfo.getTriggerTime()));
  710. event.setCreateTime(LocalDateTime.now());
  711. event.setUpdateTime(LocalDateTime.now());
  712. alarsasEventService.saveOrUpdate(event);
  713. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  714. info.setTriggerTime(eventInfo.getTriggerTime());
  715. info.setCreateTime(LocalDateTime.now().format(formatter));
  716. // 设置图片URL(如果有)
  717. if (event.getPicId() != null) {
  718. SasPic pic = sasPicMapper.selectById(event.getPicId());
  719. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  720. info.setEventUrl(pic.getUrl() + pic.getPath());
  721. }
  722. }
  723. SasAlarsasEventCode code = alarsasEventCodeMapper.selectById(event.getEventCode());
  724. if (code != null) {
  725. info.setEventTypeName(code.getName());
  726. }
  727. enrichDeviceInfo(info, device);
  728. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.alarm.getCode());
  729. if (eventTypeGroup != null) {
  730. info.setEventLevelId(eventTypeGroup.getEventLevel());
  731. }
  732. info.setDeviceType(SystemTypeCodeEnum.alarm.getCode());
  733. SasWebSocket.sendAll(info);
  734. log.info("入侵报警事件保存并推送成功, eventId={}", event.getEventId());
  735. } else if ("heart".equals(method)) {
  736. handleHeartbeat(payload, SystemTypeCodeEnum.alarm.getCode());
  737. }
  738. } catch (Exception e) {
  739. log.error("处理入侵报警事件失败", e);
  740. }
  741. }
  742. @Transactional(rollbackFor = Exception.class)
  743. public void handlePatrolEvent(String payload, String topic) {
  744. try {
  745. JsonNode root = mapper.readTree(payload);
  746. String method = root.path("method").asText();
  747. if ("event".equals(method)) {
  748. PatrolEventResult eventResult = (PatrolEventResult)this.mapper.readValue(payload, PatrolEventResult.class);
  749. Map<String, Object> map = new HashMap();
  750. map.put("eventId", eventResult.getId());
  751. JsonRpcRequest getPatrolEventInfoJson = new JsonRpcRequest("getEvent", map, (Long)null);
  752. Map<String, Object> getPatrolEventInfo = new HashMap();
  753. getPatrolEventInfo.put("key", this.sasConfigService.getConfig().getKeyds());
  754. getPatrolEventInfo.put("json", getPatrolEventInfoJson.toString());
  755. log.info("请求AG报文:{}", JSONUtil.toJsonStr(getPatrolEventInfo));
  756. String resultEvent = HttpUtil.post(this.sasConfigService.getConfig().getHost() + "/agbox/device/patrol", getPatrolEventInfo);
  757. log.info("请求AG响应:{}", resultEvent);
  758. PatrolEventInfoVo patrolEventInfoVo = (PatrolEventInfoVo)this.mapper.readValue(resultEvent, PatrolEventInfoVo.class);
  759. PatrolEventInfoResult result = patrolEventInfoVo.getResult();
  760. PatrolEventInfo eventInfo = result.getEventInfo();
  761. // 1. 检查设备是否存在
  762. SasDevice device = findDevice(eventInfo.getDeviceId(), 0, SystemTypeCodeEnum.patrol.getCode());
  763. if (device == null) {
  764. log.warn("设备不存在,丢弃巡检事件: deviceId={}, channel={}", eventInfo.getDeviceId(), eventInfo.getChannel());
  765. return;
  766. }
  767. SasPatrolEvent event = new SasPatrolEvent();
  768. String eventId = patrolEventInfoVo.getId() != null && !patrolEventInfoVo.getId().isEmpty()
  769. ? patrolEventInfoVo.getId()
  770. : IdUtil.randomUUID();
  771. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  772. if (patrolEventService.getById(eventId) != null) {
  773. log.info("巡检事件已存在, 跳过保存, eventId={}", eventId);
  774. return;
  775. }
  776. event.setEventId(eventId);
  777. BeanUtil.copyProperties(eventInfo, event, "eventId");
  778. // 关键:清洗空格 + 指定格式化器解析
  779. String triggerTimeStr = eventInfo.getTriggerTime().replaceAll("\\s", " ");
  780. event.setTriggerTime(LocalDateTime.parse(triggerTimeStr, formatter));
  781. event.setCreateTime(LocalDateTime.now());
  782. event.setUpdateTime(LocalDateTime.now());
  783. // 处理图片信息
  784. if (eventInfo.getEventPic() != null) {
  785. SasPic pic = (SasPic) BeanUtil.toBean(eventInfo.getEventPic(), SasPic.class);
  786. pic.setId(IdUtil.randomUUID());
  787. this.sasPicMapper.insert(pic);
  788. event.setPicId(pic.getId());
  789. }
  790. if (eventInfo.getUserParams() != null) {
  791. SasPatrolUserParam userParam = (SasPatrolUserParam)BeanUtil.toBean(eventInfo.getUserParams(), SasPatrolUserParam.class);
  792. userParam.setId(IdUtil.randomUUID());
  793. this.sasPatrolUserParamService.save(userParam);
  794. event.setUserParasasId(userParam.getId());
  795. }
  796. patrolEventService.save(event);
  797. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  798. info.setTriggerTime(eventInfo.getTriggerTime());
  799. info.setCreateTime(LocalDateTime.now().format(formatter));
  800. // 设置图片URL(如果有)
  801. if (event.getPicId() != null) {
  802. SasPic pic = sasPicMapper.selectById(event.getPicId());
  803. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  804. info.setEventUrl(pic.getUrl() + pic.getPath());
  805. }
  806. }
  807. SasPatrolEventCode code = patrolEventCodeMapper.selectById(event.getEventCode());
  808. if (code != null) {
  809. info.setEventTypeName(code.getName());
  810. }
  811. enrichDeviceInfo(info, device);
  812. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.patrol.getCode());
  813. if (eventTypeGroup != null) {
  814. info.setEventLevelId(eventTypeGroup.getEventLevel());
  815. }
  816. info.setDeviceType(SystemTypeCodeEnum.patrol.getCode());
  817. SasWebSocket.sendAll(info);
  818. log.info("巡检事件保存并推送成功, eventId={}", event.getEventId());
  819. } else if ("heart".equals(method)) {
  820. handleHeartbeat(payload, SystemTypeCodeEnum.patrol.getCode());
  821. }
  822. } catch (Exception e) {
  823. log.error("处理巡检事件失败", e);
  824. }
  825. }
  826. @Transactional(rollbackFor = Exception.class)
  827. public void handleAcquisitionEvent(String payload, String topic) {
  828. try {
  829. JsonNode root = mapper.readTree(payload);
  830. String method = root.path("method").asText();
  831. if ("event".equals(method)) {
  832. AcquisitionEventMessage message = mapper.readValue(payload, AcquisitionEventMessage.class);
  833. if(message.getEventCode() == null && root.get("code") != null) {
  834. message.setEventCode(root.get("code").asInt());
  835. }
  836. // 1. 检查设备是否存在
  837. SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.acquisition.getCode());
  838. if (device == null) {
  839. log.warn("设备不存在,丢弃数据采集事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel());
  840. return;
  841. }
  842. SasAcquisitionEvent event = new SasAcquisitionEvent();
  843. String eventId = message.getId() != null && !message.getId().isEmpty()
  844. ? message.getId()
  845. : IdUtil.randomUUID();
  846. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  847. if (acquisitionEventService.getById(eventId) != null) {
  848. log.info("数据采集事件已存在, 跳过保存, eventId={}", eventId);
  849. return;
  850. }
  851. event.setEventId(eventId);
  852. BeanUtil.copyProperties(message, event, "eventId");
  853. event.setTriggerTime(parseTime(message.getTriggerTime()));
  854. event.setCreateTime(LocalDateTime.now());
  855. event.setUpdateTime(LocalDateTime.now());
  856. // 处理图片信息
  857. if (message.getAcquisitionPic() != null) {
  858. AcquisitionEventMessage.PicInfo picInfo = message.getAcquisitionPic();
  859. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  860. SasPic pic = new SasPic();
  861. pic.setId(IdUtil.randomUUID());
  862. pic.setUrl(picInfo.getUrl());
  863. pic.setPath(picInfo.getPath());
  864. pic.setCreateTime(LocalDateTime.now());
  865. pic.setUpdateTime(LocalDateTime.now());
  866. sasPicMapper.insert(pic);
  867. // 设置事件图片ID
  868. event.setPicId(pic.getId());
  869. }
  870. }
  871. acquisitionEventService.save(event);
  872. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  873. info.setTriggerTime(message.getTriggerTime());
  874. info.setCreateTime(LocalDateTime.now().format(formatter));
  875. // 设置图片URL(如果有)
  876. if (event.getPicId() != null) {
  877. SasPic pic = sasPicMapper.selectById(event.getPicId());
  878. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  879. info.setEventUrl(pic.getUrl() + pic.getPath());
  880. }
  881. }
  882. SasAcquisitionEventCode code = acquisitionEventCodeMapper.selectById(event.getEventCode());
  883. if (code != null) {
  884. info.setEventTypeName(code.getName());
  885. }
  886. enrichDeviceInfo(info, device);
  887. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.acquisition.getCode());
  888. if (eventTypeGroup != null) {
  889. info.setEventLevelId(eventTypeGroup.getEventLevel());
  890. }
  891. info.setDeviceType(SystemTypeCodeEnum.acquisition.getCode());
  892. SasWebSocket.sendAll(info);
  893. log.info("数据采集事件保存并推送成功, eventId={}", event.getEventId());
  894. } else if ("heart".equals(method)) {
  895. handleHeartbeat(payload, SystemTypeCodeEnum.acquisition.getCode());
  896. }
  897. } catch (Exception e) {
  898. log.error("处理数据采集事件失败", e);
  899. }
  900. }
  901. @Transactional(rollbackFor = Exception.class)
  902. public void handlePerceptionEvent(String payload, String topic) {
  903. try {
  904. JsonNode root = mapper.readTree(payload);
  905. String method = root.path("method").asText();
  906. if ("event".equals(method)) {
  907. PerceptionEventMessage message = mapper.readValue(payload, PerceptionEventMessage.class);
  908. if(message.getEventCode() == null && root.get("code") != null) {
  909. message.setEventCode(root.get("code").asInt());
  910. }
  911. // 1. 检查设备是否存在
  912. SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.perception.getCode());
  913. if (device == null) {
  914. log.warn("设备不存在,丢弃状态感知事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel());
  915. return;
  916. }
  917. SasPerceptionEvent event = new SasPerceptionEvent();
  918. String eventId = message.getId() != null && !message.getId().isEmpty()
  919. ? message.getId()
  920. : IdUtil.randomUUID();
  921. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  922. if (perceptionEventService.getById(eventId) != null) {
  923. log.info("状态感知事件已存在, 跳过保存, eventId={}", eventId);
  924. return;
  925. }
  926. event.setEventId(eventId);
  927. BeanUtil.copyProperties(message, event, "eventId");
  928. event.setTriggerTime(parseTime(message.getTriggerTime()));
  929. event.setCreateTime(LocalDateTime.now());
  930. event.setUpdateTime(LocalDateTime.now());
  931. // 处理图片信息
  932. if (message.getPerceptionPic() != null) {
  933. PerceptionEventMessage.PicInfo picInfo = message.getPerceptionPic();
  934. if (picInfo != null && StrUtil.isNotBlank(picInfo.getUrl()) && StrUtil.isNotBlank(picInfo.getPath())) {
  935. SasPic pic = new SasPic();
  936. pic.setId(IdUtil.randomUUID());
  937. pic.setUrl(picInfo.getUrl());
  938. pic.setPath(picInfo.getPath());
  939. pic.setCreateTime(LocalDateTime.now());
  940. pic.setUpdateTime(LocalDateTime.now());
  941. sasPicMapper.insert(pic);
  942. // 设置事件图片ID
  943. event.setPicId(pic.getId());
  944. }
  945. }
  946. perceptionEventService.save(event);
  947. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  948. info.setTriggerTime(message.getTriggerTime());
  949. info.setCreateTime(LocalDateTime.now().format(formatter));
  950. // 设置图片URL(如果有)
  951. if (event.getPicId() != null) {
  952. SasPic pic = sasPicMapper.selectById(event.getPicId());
  953. if (pic != null && StrUtil.isNotBlank(pic.getUrl()) && StrUtil.isNotBlank(pic.getPath())) {
  954. info.setEventUrl(pic.getUrl() + pic.getPath());
  955. }
  956. }
  957. SasPerceptionEventCode code = perceptionEventCodeMapper.selectById(event.getEventCode());
  958. if (code != null) {
  959. info.setEventTypeName(code.getName());
  960. }
  961. enrichDeviceInfo(info, device);
  962. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.perception.getCode());
  963. if (eventTypeGroup != null) {
  964. info.setEventLevelId(eventTypeGroup.getEventLevel());
  965. }
  966. info.setDeviceType(SystemTypeCodeEnum.perception.getCode());
  967. SasWebSocket.sendAll(info);
  968. log.info("状态感知事件保存并推送成功, eventId={}", event.getEventId());
  969. } else if ("heart".equals(method)) {
  970. handleHeartbeat(payload, SystemTypeCodeEnum.perception.getCode());
  971. }
  972. } catch (Exception e) {
  973. log.error("处理状态感知事件失败", e);
  974. }
  975. }
  976. @Transactional(rollbackFor = Exception.class)
  977. public void handleCollectionEvent(String payload, String topic) {
  978. try {
  979. JsonNode root = mapper.readTree(payload);
  980. String method = root.path("method").asText();
  981. if ("event".equals(method)) {
  982. CollectionEventMessage message = mapper.readValue(payload, CollectionEventMessage.class);
  983. if(message.getEventCode() == null && root.get("code") != null) {
  984. message.setEventCode(root.get("code").asInt());
  985. }
  986. // 1. 检查设备是否存在
  987. SasDevice device = findDevice(message.getDeviceId(), message.getChannel(), SystemTypeCodeEnum.collection.getCode());
  988. if (device == null) {
  989. log.warn("设备不存在,丢弃状态采集事件: deviceId={}, channel={}", message.getDeviceId(), message.getChannel());
  990. return;
  991. }
  992. SasCollectionEvent event = new SasCollectionEvent();
  993. String eventId = message.getId() != null && !message.getId().isEmpty()
  994. ? message.getId()
  995. : IdUtil.randomUUID();
  996. // 如果该事件已存在(设备重发/重复消息),避免主键冲突,直接跳过
  997. if (collectionEventService.getById(eventId) != null) {
  998. log.info("状态采集事件已存在, 跳过保存, eventId={}", eventId);
  999. return;
  1000. }
  1001. event.setEventId(eventId);
  1002. BeanUtil.copyProperties(message, event, "eventId");
  1003. event.setTriggerTime(parseTime(message.getTriggerTime()));
  1004. event.setCreateTime(LocalDateTime.now());
  1005. event.setUpdateTime(LocalDateTime.now());
  1006. // 处理图片信息
  1007. if (message.getScenePicInfo() != null) {
  1008. SasPic bean = (SasPic)BeanUtil.toBean(message.getScenePicInfo(), SasPic.class);
  1009. bean.setId(IdUtil.randomUUID());
  1010. this.sasPicMapper.insert(bean);
  1011. event.setScenePicId(bean.getId());
  1012. }
  1013. if (message.getEventPicInfo() != null) {
  1014. SasPic bean = (SasPic)BeanUtil.toBean(message.getEventPicInfo(), SasPic.class);
  1015. bean.setId(IdUtil.randomUUID());
  1016. this.sasPicMapper.insert(bean);
  1017. event.setPicId(bean.getId());
  1018. }
  1019. collectionEventService.save(event);
  1020. BrieflyEventInfo info = BeanUtil.toBean(event, BrieflyEventInfo.class);
  1021. info.setTriggerTime(message.getTriggerTime());
  1022. info.setCreateTime(LocalDateTime.now().format(formatter));
  1023. // 设置图片URL(如果有)
  1024. if (message.getEventPicInfo() != null) {
  1025. info.setEventUrl(message.getEventPicInfo().getUrl() + message.getEventPicInfo().getPath());
  1026. }
  1027. if (message.getScenePicInfo() != null) {
  1028. info.setEventUrl(message.getScenePicInfo().getUrl() + message.getScenePicInfo().getPath());
  1029. }
  1030. SasCollectionEventCode code = collectionEventCodeMapper.selectById(event.getEventCode());
  1031. if (code != null) {
  1032. info.setEventTypeName(code.getName());
  1033. }
  1034. enrichDeviceInfo(info, device);
  1035. SasEventTypeGroup eventTypeGroup = findEventTypeGroup(SystemTypeCodeEnum.collection.getCode());
  1036. if (eventTypeGroup != null) {
  1037. info.setEventLevelId(eventTypeGroup.getEventLevel());
  1038. }
  1039. info.setDeviceType(SystemTypeCodeEnum.collection.getCode());
  1040. SasWebSocket.sendAll(info);
  1041. log.info("状态采集事件保存并推送成功, eventId={}", event.getEventId());
  1042. } else if ("heart".equals(method)) {
  1043. handleHeartbeat(payload, SystemTypeCodeEnum.collection.getCode());
  1044. }
  1045. } catch (Exception e) {
  1046. log.error("处理状态采集事件失败", e);
  1047. }
  1048. }
  1049. /**
  1050. * 统一处理心跳逻辑
  1051. */
  1052. private void handleHeartbeat(String payload, int deviceTypeCode) {
  1053. try {
  1054. GlobalDeviceHeartInfo heartInfo = mapper.readValue(payload, GlobalDeviceHeartInfo.class);
  1055. List<SasDevice> devices = deviceService.list(new LambdaQueryWrapper<SasDevice>()
  1056. .eq(SasDevice::getDeviceId, heartInfo.getDeviceId())
  1057. .eq(SasDevice::getDeviceType, deviceTypeCode));
  1058. if (CollUtil.isNotEmpty(devices)) {
  1059. GisHeartInfo heartInfoGis = heartInfo.getGis();
  1060. List<SasDevice> updateList = devices.stream().map(device -> {
  1061. device.setTriggerTime(LocalDateTime.now());
  1062. device.setUpdateTime(LocalDateTime.now());
  1063. // 更新 GIS 信息
  1064. if (heartInfoGis != null && StrUtil.isNotBlank(device.getGisId())) {
  1065. SasGis gis = gisMapper.selectById(device.getGisId());
  1066. if (gis != null) {
  1067. // GisHeartInfo: longitude, latitude, altitude
  1068. // SasGis: lon, lat, alt
  1069. if (heartInfoGis.getLongitude() != null) gis.setLon(heartInfoGis.getLongitude());
  1070. if (heartInfoGis.getLatitude() != null) gis.setLat(heartInfoGis.getLatitude());
  1071. if (heartInfoGis.getAltitude() != null) gis.setAlt(heartInfoGis.getAltitude());
  1072. gisMapper.updateById(gis);
  1073. }
  1074. }
  1075. return device;
  1076. }).collect(Collectors.toList());
  1077. deviceService.updateBatchById(updateList);
  1078. log.debug("更新设备心跳成功, deviceId={}, count={}", heartInfo.getDeviceId(), updateList.size());
  1079. }
  1080. } catch (Exception e) {
  1081. log.error("处理设备心跳失败", e);
  1082. }
  1083. }
  1084. /**
  1085. * 补充设备信息到 BrieflyEventInfo
  1086. */
  1087. private void enrichDeviceInfo(BrieflyEventInfo info, SasDevice device) {
  1088. if (device != null) {
  1089. info.setAddress(device.getAddress());
  1090. if (StrUtil.isBlank(info.getNote())) {
  1091. info.setNote(device.getNote());
  1092. }
  1093. }
  1094. }
  1095. /**
  1096. * 补充设备信息到 BrieflyEventInfo (重载,用于兼容)
  1097. */
  1098. private void enrichDeviceInfo(BrieflyEventInfo info, String deviceId, Integer channel, Integer deviceTypeCode) {
  1099. SasDevice device = findDevice(deviceId, channel, deviceTypeCode);
  1100. enrichDeviceInfo(info, device);
  1101. }
  1102. private String extractDeviceId(String topic) {
  1103. String[] parts = topic.split("/");
  1104. return parts.length > 0 ? parts[parts.length - 1] : "unknown";
  1105. }
  1106. private LocalDateTime parseTime(String timeStr) {
  1107. if (timeStr == null || timeStr.isEmpty()) {
  1108. return LocalDateTime.now();
  1109. }
  1110. try {
  1111. return LocalDateTime.parse(timeStr, formatter);
  1112. } catch (Exception e) {
  1113. log.warn("时间解析失败: {}, 使用当前时间", timeStr);
  1114. return LocalDateTime.now();
  1115. }
  1116. }
  1117. public void subscribe(String topic) {
  1118. try {
  1119. if (client != null && client.isConnected()) {
  1120. client.subscribe(topic);
  1121. log.info("订阅主题成功: {}", topic);
  1122. }
  1123. } catch (MqttException e) {
  1124. log.error("订阅主题失败: {}", topic, e);
  1125. }
  1126. }
  1127. public void pauseListening() {
  1128. this.isListening = false;
  1129. log.info("MQTT监听已暂停");
  1130. }
  1131. public void resumeListening() {
  1132. this.isListening = true;
  1133. log.info("MQTT监听已恢复");
  1134. }
  1135. public boolean getConnectionStatus() {
  1136. return client != null && client.isConnected();
  1137. }
  1138. public boolean getListeningStatus() {
  1139. return isListening;
  1140. }
  1141. public void reconnect() {
  1142. try {
  1143. if (client != null) {
  1144. if (client.isConnected()) {
  1145. client.disconnect();
  1146. }
  1147. client.close();
  1148. client = null;
  1149. }
  1150. isListening = true;
  1151. init();
  1152. } catch (Exception e) {
  1153. log.error("MQTT重连失败", e);
  1154. isListening = false;
  1155. }
  1156. }
  1157. @PreDestroy
  1158. public void disconnect() {
  1159. try {
  1160. if (client != null) {
  1161. if (client.isConnected()) {
  1162. client.disconnect();
  1163. log.info("MQTT连接已断开");
  1164. }
  1165. client.close();
  1166. client = null;
  1167. }
  1168. } catch (MqttException e) {
  1169. log.error("断开MQTT连接失败", e);
  1170. }
  1171. }
  1172. }