|
@@ -0,0 +1,299 @@
|
|
|
+package com.usky.ai.service.listener;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.databind.JsonNode;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.usky.ai.controller.web.BoardPingController;
|
|
|
+import com.usky.ai.domain.AiDevice;
|
|
|
+import com.usky.ai.dto.*;
|
|
|
+import com.usky.ai.mapper.AiDeviceMapper;
|
|
|
+import com.usky.ai.service.mqtt.MqttInConfig;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
+import org.springframework.integration.annotation.ServiceActivator;
|
|
|
+import org.springframework.messaging.Message;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+@ConditionalOnProperty(prefix = "mqtt", value = "enabled", havingValue = "true")
|
|
|
+public class BoardPingListener {
|
|
|
+
|
|
|
+ private static final Map<String, Long> LAST_PING = new ConcurrentHashMap<>();
|
|
|
+ private static final long TIMEOUT = 6_000;
|
|
|
+
|
|
|
+ private final AiDeviceMapper aiDeviceMapper;
|
|
|
+
|
|
|
+ private final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ public BoardPingListener(AiDeviceMapper aiDeviceMapper) {
|
|
|
+ this.aiDeviceMapper = aiDeviceMapper;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class ReplyCache {
|
|
|
+ public static final Map<String, EdgeAppControllerReplyDTO> REPLY = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, EdgeAppFetchResponseDTO> FETCH = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, EdgeAppDeleteResponseDTO> DELETE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, EdgeAlgTaskResponseDTO> TASK = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, EdgeAlgTaskListResponseDTO> ALG_TASK_LIST = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgAbilityFetchResponseDTO> ALG_ABILITY = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, RepositoriesInfoResponseDTO> REPOSITORIES_INFO = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, RepositoryCreateResponseDTO> REPOSITORY_CREATE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, RepositoryUpdateResponseDTO> REPOSITORY_UPDATE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, RepositoryDeleteResponseDTO> REPOSITORY_DELETE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgTaskSnapResponseDTO> TASK_PREVIEW = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgScheduleCreateResponseDTO> SCHEDULE_CREATE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgScheduleFetchResponseDTO> SCHEDULE_FETCH = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgScheduleDeleteResponseDTO> SCHEDULE_DELETE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgSuitCreateResponseDTO> SUIT_CREATE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgSuitGroupRemoveResponseDTO> SUIT_REMOVE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgSuitAppendResponseDTO> SUIT_APPEND = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgSuitRemoveResponseDTO> SUIT_REMOVE_TEMPLATE = new ConcurrentHashMap<>();
|
|
|
+ public static final Map<String, AlgSuitFetchResponseDTO> SUIT_FETCH = new ConcurrentHashMap<>();
|
|
|
+ }
|
|
|
+
|
|
|
+ @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
|
|
|
+ public void handleHeartbeat(Message<?> message) {
|
|
|
+ String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
|
|
|
+ if (!"/board_ping".equals(topic)) return;
|
|
|
+
|
|
|
+ String payload = (String) message.getPayload();
|
|
|
+ try {
|
|
|
+ if (payload == null) {
|
|
|
+ log.error("接收到的 payload 为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ BoardPingDTO dto = objectMapper.readValue(payload, BoardPingDTO.class);
|
|
|
+ if (dto == null) {
|
|
|
+ log.error("解析心跳消息失败,结果为 null");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ BoardPingController.CACHE.put(dto.getBoardId(), dto);
|
|
|
+ log.info("收到心跳:{}", dto.getBoardId());
|
|
|
+
|
|
|
+ // 将 BoardPingDTO 转换为 AiDevice
|
|
|
+ AiDevice aiDevice = new AiDevice();
|
|
|
+ aiDevice.setBoardId(dto.getBoardId());
|
|
|
+ aiDevice.setBoardIp(dto.getBoardIp());
|
|
|
+ aiDevice.setDescription(null);
|
|
|
+ aiDevice.setStatus(dto.getKey().equals("运行中") ? 1 : 0);
|
|
|
+ aiDevice.setCreateTime(new java.sql.Timestamp(System.currentTimeMillis()));
|
|
|
+ aiDevice.setCreateBy("system");
|
|
|
+ aiDevice.setUpdateTime(new java.sql.Timestamp(System.currentTimeMillis()));
|
|
|
+ aiDevice.setUpdateBy("system");
|
|
|
+
|
|
|
+ aiDeviceMapper.saveOrUpdateDevice(aiDevice);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析心跳失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
|
|
|
+ public void handleAllReplies(Message<?> message) {
|
|
|
+ String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
|
|
|
+ if (!"/edge_app_controller_reply".equals(topic)) return;
|
|
|
+
|
|
|
+ String payload = (String) message.getPayload();
|
|
|
+ log.info("收到 /edge_app_controller_reply 原始消息: {}", payload);
|
|
|
+ try {
|
|
|
+ if (payload == null) {
|
|
|
+ log.error("接收到的 payload 为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ JsonNode root = objectMapper.readTree(payload);
|
|
|
+ if (root == null) {
|
|
|
+ log.error("解析 payload 为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ JsonNode eventNode = root.get("Event");
|
|
|
+ if (eventNode == null) {
|
|
|
+ log.error("JSON 中缺少 'Event' 字段");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String event = eventNode.asText();
|
|
|
+ if (event == null || event.isEmpty()) {
|
|
|
+ log.error("Event 字段为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (event) {
|
|
|
+ case "/alg_media_config":
|
|
|
+ EdgeAppControllerReplyDTO configResp = objectMapper.readValue(payload, EdgeAppControllerReplyDTO.class);
|
|
|
+ String configKey = configResp.getBoardId() + "-" + configResp.getMediaName();
|
|
|
+ ReplyCache.REPLY.put(configKey, configResp);
|
|
|
+ log.info("收到通道配置响应:{}", configKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_media_fetch":
|
|
|
+ EdgeAppFetchResponseDTO fetchResp = objectMapper.readValue(payload, EdgeAppFetchResponseDTO.class);
|
|
|
+ ReplyCache.FETCH.put(fetchResp.getBoardId(), fetchResp);
|
|
|
+ log.info("收到通道查询响应:{}", fetchResp.getBoardId());
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_media_delete":
|
|
|
+ EdgeAppDeleteResponseDTO deleteResp = objectMapper.readValue(payload, EdgeAppDeleteResponseDTO.class);
|
|
|
+ String deleteKey = deleteResp.getBoardId() + "-" + deleteResp.getMediaName();
|
|
|
+ ReplyCache.DELETE.put(deleteKey, deleteResp);
|
|
|
+ log.info("收到通道删除响应:{}", deleteKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_task_config":
|
|
|
+ case "/alg_task_control":
|
|
|
+ case "/alg_task_delete":
|
|
|
+ EdgeAlgTaskResponseDTO taskResp = objectMapper.readValue(payload, EdgeAlgTaskResponseDTO.class);
|
|
|
+ String taskKey = taskResp.getBoardId() + "-" + taskResp.getAlgTaskSession();
|
|
|
+ ReplyCache.TASK.put(taskKey, taskResp);
|
|
|
+ log.info("收到任务响应:{}-{}", taskResp.getBoardId(), taskResp.getAlgTaskSession());
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_task_fetch":
|
|
|
+ EdgeAlgTaskListResponseDTO taskListResp = objectMapper.readValue(payload, EdgeAlgTaskListResponseDTO.class);
|
|
|
+ ReplyCache.ALG_TASK_LIST.put(taskListResp.getBoardId(), taskListResp);
|
|
|
+ log.info("✅ 收到任务列表响应:{},内容数量:{}", taskListResp.getBoardId(), taskListResp.getContent().size());
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_ability_fetch":
|
|
|
+ AlgAbilityFetchResponseDTO abilityResp = objectMapper.readValue(payload, AlgAbilityFetchResponseDTO.class);
|
|
|
+ String abilityKey = abilityResp.getBoardId() + "-alg_ability_fetch";
|
|
|
+ ReplyCache.ALG_ABILITY.put(abilityKey, abilityResp);
|
|
|
+ log.info("收到算法能力获取响应:{}", abilityKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/api_repositories_info":
|
|
|
+ RepositoriesInfoResponseDTO repositoriesInfoResp = objectMapper.readValue(payload, RepositoriesInfoResponseDTO.class);
|
|
|
+ ReplyCache.REPOSITORIES_INFO.put("repositories_info", repositoriesInfoResp);
|
|
|
+ log.info("收到人脸库列表响应");
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/api_repository_create":
|
|
|
+ RepositoryCreateResponseDTO createResp = objectMapper.readValue(payload, RepositoryCreateResponseDTO.class);
|
|
|
+ String createKey = "repository_create-" + createResp.getAlbumName();
|
|
|
+ ReplyCache.REPOSITORY_CREATE.put(createKey, createResp);
|
|
|
+ log.info("收到人脸库创建响应:{}", createKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/api_repository_update":
|
|
|
+ RepositoryUpdateResponseDTO updateResp = objectMapper.readValue(payload, RepositoryUpdateResponseDTO.class);
|
|
|
+ String updateKey = "repository_update-" + updateResp.getAlbumName();
|
|
|
+ ReplyCache.REPOSITORY_UPDATE.put(updateKey, updateResp);
|
|
|
+ log.info("收到人脸库更新响应:{}", updateKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/api_repository_delete":
|
|
|
+ RepositoryDeleteResponseDTO repoDeleteResp = objectMapper.readValue(payload, RepositoryDeleteResponseDTO.class);
|
|
|
+ String repoDeleteKey = "repository_delete-" + repoDeleteResp.getAlbumName();
|
|
|
+ ReplyCache.REPOSITORY_DELETE.put(repoDeleteKey, repoDeleteResp);
|
|
|
+ log.info("收到人脸库删除响应:{}", repoDeleteKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_task_snap":
|
|
|
+ AlgTaskSnapResponseDTO snapResp = objectMapper.readValue(payload, AlgTaskSnapResponseDTO.class);
|
|
|
+ String snapKey = snapResp.getBoardId() + "-" + snapResp.getAlgTaskSession();
|
|
|
+ ReplyCache.TASK_PREVIEW.put(snapKey, snapResp);
|
|
|
+ log.info("收到任务预览图响应:{}", snapKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_schedule_create":
|
|
|
+ AlgScheduleCreateResponseDTO scheduleResp = objectMapper.readValue(payload, AlgScheduleCreateResponseDTO.class);
|
|
|
+ String scheduleKey = scheduleResp.getBoardId() + "-schedule-create";
|
|
|
+ ReplyCache.SCHEDULE_CREATE.put(scheduleKey, scheduleResp);
|
|
|
+ log.info("收到计划模板创建响应:{}", scheduleKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_schedule_fetch":
|
|
|
+ AlgScheduleFetchResponseDTO scfetchResp = objectMapper.readValue(payload, AlgScheduleFetchResponseDTO.class);
|
|
|
+ String scfetchKey = scfetchResp.getBoardId() + "-schedule-fetch";
|
|
|
+ ReplyCache.SCHEDULE_FETCH.put(scfetchKey, scfetchResp);
|
|
|
+ log.info("收到计划模板查询响应:{}", scfetchKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_schedule_delete":
|
|
|
+ AlgScheduleDeleteResponseDTO scheduleDeleteResp = objectMapper.readValue(payload, AlgScheduleDeleteResponseDTO.class);
|
|
|
+ String scheduleDeleteKey = scheduleDeleteResp.getBoardId() + "-schedule-delete-" + scheduleDeleteResp.getScheduleId();
|
|
|
+ ReplyCache.SCHEDULE_DELETE.put(scheduleDeleteKey, scheduleDeleteResp);
|
|
|
+ log.info("收到计划模板删除响应:{}", scheduleDeleteKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_suit_create":
|
|
|
+ AlgSuitCreateResponseDTO suitCreateResp = objectMapper.readValue(payload, AlgSuitCreateResponseDTO.class);
|
|
|
+ String suitCreateKey = suitCreateResp.getBoardId() + "-suit-create";
|
|
|
+ ReplyCache.SUIT_CREATE.put(suitCreateKey, suitCreateResp);
|
|
|
+ log.info("收到创建工装组响应:{}", suitCreateKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_suit_group_remove":
|
|
|
+ AlgSuitGroupRemoveResponseDTO suitRemoveResp = objectMapper.readValue(payload, AlgSuitGroupRemoveResponseDTO.class);
|
|
|
+ String suitRemoveKey = suitRemoveResp.getBoardId() + "-suit-remove";
|
|
|
+ ReplyCache.SUIT_REMOVE.put(suitRemoveKey, suitRemoveResp);
|
|
|
+ log.info("收到删除工装组响应:{}", suitRemoveKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_suit_append":
|
|
|
+ AlgSuitAppendResponseDTO suitAppendResp = objectMapper.readValue(payload, AlgSuitAppendResponseDTO.class);
|
|
|
+ String suitAppendKey = suitAppendResp.getBoardId() + "-suit-append";
|
|
|
+ ReplyCache.SUIT_APPEND.put(suitAppendKey, suitAppendResp);
|
|
|
+ log.info("收到上传工装模板响应:{}", suitAppendKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_suit_remove":
|
|
|
+ AlgSuitRemoveResponseDTO suitRemoveTemplateResp = objectMapper.readValue(payload, AlgSuitRemoveResponseDTO.class);
|
|
|
+ String suitRemoveTemplateKey = suitRemoveTemplateResp.getBoardId() + "-suit-remove-template";
|
|
|
+ ReplyCache.SUIT_REMOVE_TEMPLATE.put(suitRemoveTemplateKey, suitRemoveTemplateResp);
|
|
|
+ log.info("收到删除工装模板响应:{}", suitRemoveTemplateKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/alg_suit_fetch":
|
|
|
+ AlgSuitFetchResponseDTO suitFetchResp = objectMapper.readValue(payload, AlgSuitFetchResponseDTO.class);
|
|
|
+ String suitFetchKey = suitFetchResp.getBoardId() + "-suit-fetch";
|
|
|
+ ReplyCache.SUIT_FETCH.put(suitFetchKey, suitFetchResp);
|
|
|
+ log.info("收到工装组信息响应:{}", suitFetchKey);
|
|
|
+ break;
|
|
|
+
|
|
|
+ default:
|
|
|
+ log.warn("未知事件类型:{}", event);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析 /edge_app_controller_reply 失败,原始消息: {}", payload, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Scheduled(fixedDelay = 300_000) // 每 5 分钟
|
|
|
+ public void cleanExpiredCache() {
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ long timeout = 300_000;
|
|
|
+
|
|
|
+ ReplyCache.REPLY.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.FETCH.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.DELETE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.TASK.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.ALG_TASK_LIST.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.ALG_ABILITY.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.REPOSITORIES_INFO.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.REPOSITORY_CREATE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.REPOSITORY_UPDATE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.REPOSITORY_DELETE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.TASK_PREVIEW.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SCHEDULE_CREATE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SCHEDULE_FETCH.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SCHEDULE_DELETE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SUIT_CREATE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SUIT_REMOVE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SUIT_APPEND.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SUIT_REMOVE_TEMPLATE.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+ ReplyCache.SUIT_FETCH.entrySet().removeIf(e -> now - e.getValue().getTime() > timeout);
|
|
|
+
|
|
|
+ log.info("清理过期缓存:REPLY={} FETCH={} DELETE={} TASK={} TASK_FETCH={}",
|
|
|
+ ReplyCache.REPLY.size(), ReplyCache.FETCH.size(), ReplyCache.DELETE.size(),
|
|
|
+ ReplyCache.TASK.size(), ReplyCache.ALG_TASK_LIST.size());
|
|
|
+ }
|
|
|
+}
|