#840 万象城增加告警查询,外滩27号增加数据转发

Chiuso
hanzhengyi vorrebbe unire 2 commit da uskycloud/han a uskycloud/server-165

+ 5 - 5
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java

@@ -47,11 +47,11 @@ public class DeviceFieldConfig {
         // 一氧化碳传感器(711)
         fieldMapping.put("711", "co");
 
-        // 倾斜传感器
-        fieldMapping.put("712", "qx");
-
-        // 裂缝传感器
-        fieldMapping.put("713", "cd");
+        // // 倾斜传感器
+        // fieldMapping.put("712", "qx");
+        //
+        // // 裂缝传感器
+        // fieldMapping.put("713", "cd");
 
         // 位移传感器
         fieldMapping.put("714", "wy");

+ 5 - 3
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -73,10 +73,12 @@ public class MqttOutConfig {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
 
         // 设置默认的MqttConnectOptions,确保serverURIs不为null
-        // 实际使用时,会在createMqttConnection方法中重新配置
+        // 使用时,会在createMqttConnection方法中重新配置
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"}); // 设置默认的服务器地址
-        options.setKeepAliveInterval(60); // 设置默认的心跳间隔
+        // 设置默认的服务器地址
+        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"});
+        // 设置默认的心跳间隔
+        options.setKeepAliveInterval(60);
         factory.setConnectionOptions(options);
 
         return factory;

+ 2 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/BaseDataTransferService.java

@@ -224,8 +224,8 @@ public class BaseDataTransferService {
             userIdToName.put(709, 15);
             userIdToName.put(710, 16);
             userIdToName.put(711, 2);
-            userIdToName.put(712, 34);
-            userIdToName.put(713, 36);
+            //userIdToName.put(712, 34);
+            //userIdToName.put(713, 36);
             userIdToName.put(714, 37);
 
             HashMap<String, Object> map = new HashMap<>();

+ 213 - 204
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -7,9 +7,9 @@ import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.domain.DmpProduct;
 import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.mapper.DmpProductMapper;
-import com.usky.cdi.service.config.mqtt.MqttBaseConfig;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
 import com.usky.cdi.service.util.DeviceDataQuery;
 import com.usky.cdi.service.util.SnowflakeIdGenerator;
 import com.usky.cdi.service.vo.IotDataTransferVO;
@@ -18,24 +18,24 @@ import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.beans.factory.support.DefaultListableBeanFactory;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.stereotype.Service;
-import org.springframework.web.context.ContextLoader;
 
 import javax.annotation.PostConstruct;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -49,6 +49,9 @@ public class IotDataTransferService {
 
     private MqttOutConfig.MqttGateway mqttGateway;
 
+    @Autowired
+    private MqttConnectionTool mqttConnectionTool;
+
     // 注入ApplicationContext,确保总是能获取到
     @Autowired
     private ApplicationContext context;
@@ -59,6 +62,10 @@ public class IotDataTransferService {
     private static final int KEEP_ALIVE_INTERVAL = 60;
     private static final int COMPLETION_TIMEOUT = 5000;
 
+    // 存储每个任务的MQTT客户端工厂和网关
+    private final Map<String, MqttConnectionTool.MqttGateway> mqttGatewayMap = new ConcurrentHashMap<>();
+    private final Map<String, DefaultMqttPahoClientFactory> mqttClientFactoryMap = new ConcurrentHashMap<>();
+
     private SnowflakeIdGenerator idGenerator;
 
     @Autowired
@@ -110,7 +117,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -154,7 +161,7 @@ public class IotDataTransferService {
                 vo.setDataEndTime(dataEndTime);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息");
+                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
@@ -184,7 +191,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -229,19 +236,19 @@ public class IotDataTransferService {
                 try {
                     switch (deviceType) {
                         case 707:
-                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 708:
-                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 709:
-                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 710:
-                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 711:
-                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                     }
                 } catch (Exception e) {
@@ -278,7 +285,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -316,7 +323,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(0);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况");
+                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
@@ -345,7 +352,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -395,7 +402,7 @@ public class IotDataTransferService {
                         deviceDataItem.getFloat("active_power"));
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况");
+                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
@@ -421,8 +428,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("wd");
         if (value == null) {
             log.warn("设备{}的温度数据为空", deviceId);
@@ -435,7 +443,7 @@ public class IotDataTransferService {
         tempVO.setPublishTime(getCurrentTime());
         tempVO.setSensorValue(value);
         tempVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息", username);
     }
 
     /**
@@ -445,8 +453,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("sd");
         if (value == null) {
             log.warn("设备{}的湿度数据为空", deviceId);
@@ -459,7 +468,7 @@ public class IotDataTransferService {
         humidityVO.setPublishTime(getCurrentTime());
         humidityVO.setSensorValue(value);
         humidityVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息", username);
     }
 
     /**
@@ -469,8 +478,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("o2");
         if (value == null) {
             log.warn("设备{}的氧气浓度数据为空", deviceId);
@@ -483,7 +493,7 @@ public class IotDataTransferService {
         oxygenVO.setPublishTime(getCurrentTime());
         oxygenVO.setSensorValue(value);
         oxygenVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息", username);
     }
 
     /**
@@ -493,8 +503,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("co");
         if (value == null) {
             log.warn("设备{}的一氧化碳浓度数据为空", deviceId);
@@ -507,7 +518,7 @@ public class IotDataTransferService {
         coVO.setPublishTime(getCurrentTime());
         coVO.setSensorValue(value);
         coVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息", username);
     }
 
     /**
@@ -517,8 +528,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("co2");
         if (value == null) {
             log.warn("设备{}的二氧化碳浓度数据为空", deviceId);
@@ -531,7 +543,7 @@ public class IotDataTransferService {
         co2VO.setPublishTime(getCurrentTime());
         co2VO.setSensorValue(value);
         co2VO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息", username);
     }
 
     /**
@@ -539,144 +551,144 @@ public class IotDataTransferService {
      *
      * @return 推送结果,包含成功数和失败数
      **/
-    public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
-        Map<String, Integer> result = new HashMap<>();
-        result.put("successCount", 0);
-        result.put("failureCount", 0);
-
-        if (!validateMqttGateway()) {
-            return result;
-        }
-
-        try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
-            Integer deviceType = transferVO.getDeviceType();
-            Integer totalDevices = transferVO.getDevices().size();
-
-            log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
-                    deviceType, totalDevices, deviceData.size());
-
-            if (deviceData.isEmpty()) {
-                log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
-                result.put("failureCount", totalDevices);
-                return result;
-            }
-
-            Long engineeringId = transferVO.getEngineeringId();
-            for (JSONObject deviceDataItem : deviceData) {
-                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
-                if (dataEndTime == null) {
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                Integer deviceId = deviceDataItem.getIntValue("device_id");
-                Integer value = deviceDataItem.getInteger("qx");
-                if (value == null) {
-                    log.warn("设备{}的倾斜数据为空", deviceId);
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                TiltVO vo = new TiltVO();
-                vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceId);
-                vo.setEngineeringID(engineeringId);
-                vo.setPublishTime(getCurrentTime());
-                vo.setDataEndTime(dataEndTime);
-                vo.setSensorValue(value == 0 ? 0 : 1);
-
-                try {
-                    sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
-                    result.put("successCount", result.get("successCount") + 1);
-                } catch (Exception e) {
-                    log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
-                    result.put("failureCount", result.get("failureCount") + 1);
-                }
-            }
-
-            log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
-                    deviceType, result.get("successCount"), result.get("failureCount"));
-
-            return result;
-        } catch (Exception e) {
-            log.error("倾斜数据推送发生异常", e);
-            result.put("failureCount", transferVO.getDevices().size());
-            return result;
-        }
-    }
+    // public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
+    //     Map<String, Integer> result = new HashMap<>();
+    //     result.put("successCount", 0);
+    //     result.put("failureCount", 0);
+    //
+    //     if (!validateMqttGateway()) {
+    //         return result;
+    //     }
+    //
+    //     try {
+    //         List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+    //         Integer deviceType = transferVO.getDeviceType();
+    //         Integer totalDevices = transferVO.getDevices().size();
+    //
+    //         log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+    //                 deviceType, totalDevices, deviceData.size());
+    //
+    //         if (deviceData.isEmpty()) {
+    //             log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
+    //             result.put("failureCount", totalDevices);
+    //             return result;
+    //         }
+    //
+    //         Long engineeringId = transferVO.getEngineeringId();
+    //         for (JSONObject deviceDataItem : deviceData) {
+    //             LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+    //             if (dataEndTime == null) {
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             Integer deviceId = deviceDataItem.getIntValue("device_id");
+    //             Integer value = deviceDataItem.getInteger("qx");
+    //             if (value == null) {
+    //                 log.warn("设备{}的倾斜数据为空", deviceId);
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             TiltVO vo = new TiltVO();
+    //             vo.setDataPacketID(generateDataPacketID());
+    //             vo.setSensorID(deviceId);
+    //             vo.setEngineeringID(engineeringId);
+    //             vo.setPublishTime(getCurrentTime());
+    //             vo.setDataEndTime(dataEndTime);
+    //             vo.setSensorValue(value == 0 ? 0 : 1);
+    //
+    //             try {
+    //                 sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
+    //                 result.put("successCount", result.get("successCount") + 1);
+    //             } catch (Exception e) {
+    //                 log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //             }
+    //         }
+    //
+    //         log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
+    //                 deviceType, result.get("successCount"), result.get("failureCount"));
+    //
+    //         return result;
+    //     } catch (Exception e) {
+    //         log.error("倾斜数据推送发生异常", e);
+    //         result.put("failureCount", transferVO.getDevices().size());
+    //         return result;
+    //     }
+    // }
 
     /**
      * 发送裂缝数据(713)
      *
      * @return 推送结果,包含成功数和失败数
      **/
-    public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
-        Map<String, Integer> result = new HashMap<>();
-        result.put("successCount", 0);
-        result.put("failureCount", 0);
-
-        if (!validateMqttGateway()) {
-            return result;
-        }
-
-        try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
-            Integer deviceType = transferVO.getDeviceType();
-            Integer totalDevices = transferVO.getDevices().size();
-
-            log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
-                    deviceType, totalDevices, deviceData.size());
-
-            if (deviceData.isEmpty()) {
-                log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
-                result.put("failureCount", totalDevices);
-                return result;
-            }
-
-            Long engineeringId = transferVO.getEngineeringId();
-            for (JSONObject deviceDataItem : deviceData) {
-                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
-                if (dataEndTime == null) {
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                Integer deviceId = deviceDataItem.getIntValue("device_id");
-                Integer value = deviceDataItem.getInteger("cd");
-                if (value == null) {
-                    log.warn("设备{}的裂缝数据为空", deviceId);
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                CrackVO vo = new CrackVO();
-                vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceId);
-                vo.setEngineeringID(engineeringId);
-                vo.setPublishTime(getCurrentTime());
-                vo.setDataEndTime(dataEndTime);
-                vo.setSensorValue(value == 0 ? 0 : 1);
-
-                try {
-                    sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
-                    result.put("successCount", result.get("successCount") + 1);
-                } catch (Exception e) {
-                    log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
-                    result.put("failureCount", result.get("failureCount") + 1);
-                }
-            }
-
-            log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
-                    deviceType, result.get("successCount"), result.get("failureCount"));
-
-            return result;
-        } catch (Exception e) {
-            log.error("裂缝数据推送发生异常", e);
-            result.put("failureCount", transferVO.getDevices().size());
-            return result;
-        }
-    }
+    // public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
+    //     Map<String, Integer> result = new HashMap<>();
+    //     result.put("successCount", 0);
+    //     result.put("failureCount", 0);
+    //
+    //     if (!validateMqttGateway()) {
+    //         return result;
+    //     }
+    //
+    //     try {
+    //         List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+    //         Integer deviceType = transferVO.getDeviceType();
+    //         Integer totalDevices = transferVO.getDevices().size();
+    //
+    //         log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+    //                 deviceType, totalDevices, deviceData.size());
+    //
+    //         if (deviceData.isEmpty()) {
+    //             log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
+    //             result.put("failureCount", totalDevices);
+    //             return result;
+    //         }
+    //
+    //         Long engineeringId = transferVO.getEngineeringId();
+    //         for (JSONObject deviceDataItem : deviceData) {
+    //             LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+    //             if (dataEndTime == null) {
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             Integer deviceId = deviceDataItem.getIntValue("device_id");
+    //             Integer value = deviceDataItem.getInteger("cd");
+    //             if (value == null) {
+    //                 log.warn("设备{}的裂缝数据为空", deviceId);
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             CrackVO vo = new CrackVO();
+    //             vo.setDataPacketID(generateDataPacketID());
+    //             vo.setSensorID(deviceId);
+    //             vo.setEngineeringID(engineeringId);
+    //             vo.setPublishTime(getCurrentTime());
+    //             vo.setDataEndTime(dataEndTime);
+    //             vo.setSensorValue(value == 0 ? 0 : 1);
+    //
+    //             try {
+    //                 sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
+    //                 result.put("successCount", result.get("successCount") + 1);
+    //             } catch (Exception e) {
+    //                 log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //             }
+    //         }
+    //
+    //         log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
+    //                 deviceType, result.get("successCount"), result.get("failureCount"));
+    //
+    //         return result;
+    //     } catch (Exception e) {
+    //         log.error("裂缝数据推送发生异常", e);
+    //         result.put("failureCount", transferVO.getDevices().size());
+    //         return result;
+    //     }
+    // }
 
     /**
      * 发送位移数据(714)
@@ -688,7 +700,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -731,7 +743,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(value == 0 ? 0 : 1);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息");
+                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
@@ -758,6 +770,7 @@ public class IotDataTransferService {
      * @param password MQTT密码
      */
     public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        log.info("用户名:{},密码:{}", username, password);
         // 参数校验
         if (engineeringId == null || username == null || password == null) {
             log.error("工程ID、MQTT用户名或密码不能为空");
@@ -789,6 +802,7 @@ public class IotDataTransferService {
             transferVO.setDeviceType(deviceType);
             transferVO.setDevices(devices);
             transferVO.setEngineeringId(engineeringId);
+            transferVO.setUsername(username); // 保存当前任务的用户名
             transferList.add(transferVO);
         });
 
@@ -839,12 +853,12 @@ public class IotDataTransferService {
                 case 704:
                     result = sendElectricityLoad(transferVO);
                     break;
-                case 712:
-                    result = sendTiltData(transferVO);
-                    break;
-                case 713:
-                    result = sendCrackData(transferVO);
-                    break;
+                // case 712:
+                //     result = sendTiltData(transferVO);
+                //     break;
+                // case 713:
+                //     result = sendCrackData(transferVO);
+                //     break;
                 case 714:
                     result = sendDeviationData(transferVO);
                     break;
@@ -899,43 +913,25 @@ public class IotDataTransferService {
     }
 
     /**
-     * 手动创建MQTT连接
+     * 手动创建/刷新 MQTT 连接(含动态 clientId)
      * @param username MQTT用户名
      * @param password MQTT密码
      */
-    public void createMqttConnection(String username, String password) {
+    public synchronized void createMqttConnection(String username, String password) {
+        log.info("手动创建/刷新 MQTT 连接(含动态 clientId),用户名:{},密码:{}", username, password);
         try {
-            // 使用注入的ApplicationContext获取已有的mqttGateway实例
-            // 因为我们保留了@MessagingGateway注解,Spring会自动创建这个实例
-            if (this.context == null) {
-                throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
+            // 检查MqttConnectionTool是否已注入
+            if (this.mqttConnectionTool == null) {
+                throw new IllegalStateException("MqttConnectionTool未注入,无法获取MQTT Gateway");
             }
 
-            // 1. 获取mqttGateway实例
-            this.mqttGateway = this.context.getBean(MqttOutConfig.MqttGateway.class);
-            if (this.mqttGateway == null) {
-                throw new IllegalStateException("MQTT Gateway未找到,无法发送消息");
-            }
+            // 使用MqttConnectionTool创建或刷新MQTT连接
+            MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(username, password);
 
-            // 2. 获取现有的mqttClientFactory实例
-            DefaultMqttPahoClientFactory mqttClientFactory = this.context.getBean(DefaultMqttPahoClientFactory.class);
-            if (mqttClientFactory == null) {
-                throw new IllegalStateException("MQTT Client Factory未找到,无法创建MQTT连接");
-            }
-
-            // 3. 创建并配置MqttConnectOptions
-            MqttConnectOptions options = new MqttConnectOptions();
-            options.setServerURIs(new String[]{MQTT_URL});
-            options.setUserName(username);
-            options.setPassword(password.toCharArray());
-            options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-
-            // 4. 更新mqttClientFactory的连接选项
-            mqttClientFactory.setConnectionOptions(options);
-
-            log.info("MQTT Gateway初始化成功,用户名:{}", username);
-            log.info("MQTT连接配置完成,服务器地址:{},客户端ID:mqttx-{}", MQTT_URL, username);
-        } catch (Exception e) {
+            // 存储到映射中
+            mqttGatewayMap.put(username, gateway);
+            log.info("MQTT连接创建/刷新成功,用户名:{}", username);
+            } catch (Exception e) {
             log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
             throw new RuntimeException("初始化MQTT连接失败", e);
         }
@@ -943,11 +939,18 @@ public class IotDataTransferService {
 
     /**
      * 验证MQTT网关是否初始化
+     * @param username 用户名
      * @return 是否初始化
      */
-    private boolean validateMqttGateway() {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
+    private boolean validateMqttGateway(String username) {
+        if (username == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息,用户名:null");
+            return false;
+        }
+        // 一次性获取网关实例,避免竞态条件
+        MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
+        if (gateway == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息,用户名:{}", username);
             return false;
         }
         return true;
@@ -973,12 +976,18 @@ public class IotDataTransferService {
      * @param topicEnum 主题枚举
      * @param vo 消息对象
      * @param messageType 消息类型描述
+     * @param username 用户名
      */
-    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType) {
+    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType, String username) {
         String json = JSON.toJSONString(vo);
         String topic = topicEnum.getTopic();
         // 不再记录每条数据的详情,只记录发送操作
-        mqttGateway.sendToMqtt(topic, json);
+        MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
+        if (gateway != null) {
+            gateway.sendToMqtt(topic, json);
+        } else {
+            log.warn("MQTT Gateway未找到,无法发送消息,用户名:{}", username);
+        }
     }
 
     public void allData(Long engineeringId, String username, String password) {

+ 230 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/mqtt/MqttConnectionTool.java

@@ -0,0 +1,230 @@
+package com.usky.cdi.service.mqtt;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.endpoint.EventDrivenConsumer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.SubscribableChannel;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/22
+ * 动态 MQTT 连接工具类
+ * 用法:注入后调用 connectOrRefresh(...) 即可
+ *
+ */
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class MqttConnectionTool {
+
+    private final GenericApplicationContext context;
+
+    /* 默认 topic,可外部再 set */
+    private String defaultTopic = "testTopic";
+
+    /* 默认 keep-alive,可外部再 set */
+    private int keepAlive = 60;
+
+    private static final String MQTT_URL = "ssl://114.80.201.143:8883";
+
+    /**
+     * 存储每个用户名对应的连接信息
+     */
+    private final Map<String, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
+
+    /**
+     * 连接信息内部类
+     */
+    private static class ConnectionInfo {
+        private final String handlerBeanName;
+        private final String consumerBeanName;
+        private final String factoryBeanName;
+        private final String gatewayBeanName;
+
+        public ConnectionInfo(String username) {
+            this.handlerBeanName = "mqttHandler_" + username;
+            this.consumerBeanName = "mqttConsumer_" + username;
+            this.factoryBeanName = "mqttFactory_" + username;
+            this.gatewayBeanName = "mqttGateway_" + username;
+        }
+    }
+
+    /**
+     * 一键创建/刷新连接
+     *
+     * @param username  用户名
+     * @param password  密码
+     * @return 可直接发消息的 MqttGateway
+     */
+    public synchronized MqttGateway connectOrRefresh(String username, String password) {
+        Assert.notNull(username, "username cannot be null");
+        Assert.notNull(password, "password cannot be null");
+
+        String clientId = "mqttx-" + username;
+        try {
+            /* 1. 获取或创建连接信息 */
+            ConnectionInfo connectionInfo = connectionMap.computeIfAbsent(username, ConnectionInfo::new);
+
+            /* 2. 创建或更新专属工厂 */
+            DefaultMqttPahoClientFactory factory;
+            if (context.containsBean(connectionInfo.factoryBeanName)) {
+                factory = context.getBean(connectionInfo.factoryBeanName, DefaultMqttPahoClientFactory.class);
+                factory.setConnectionOptions(buildOptions(username, password, MQTT_URL));
+                log.info("已更新 MQTT 客户端工厂 -> {}", connectionInfo.factoryBeanName);
+            } else {
+                factory = new DefaultMqttPahoClientFactory();
+                factory.setConnectionOptions(buildOptions(username, password, MQTT_URL));
+                context.registerBean(connectionInfo.factoryBeanName, DefaultMqttPahoClientFactory.class, () -> factory);
+                log.info("已创建 MQTT 客户端工厂 -> {}", connectionInfo.factoryBeanName);
+            }
+
+            /* 3. 移除旧的 Handler 和 Consumer */
+            removeOldConnection(connectionInfo);
+
+            /* 4. 创建新的 Handler */
+            MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, factory);
+            handler.setAsync(true);
+            handler.setDefaultTopic(defaultTopic);
+            handler.afterPropertiesSet();
+
+            /* 5. 注册新的 Handler */
+            context.registerBean(connectionInfo.handlerBeanName, MqttPahoMessageHandler.class, () -> handler);
+
+            /* 6. 创建并注册新的专属网关 */
+            // 创建一个简单的Gateway实现,直接使用Handler发送消息
+            MqttGateway gateway = new MqttGateway() {
+                @Override
+                public void sendToMqtt(String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, defaultTopic)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+
+                @Override
+                public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, topic)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+
+                @Override
+                public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
+                                        @Header(MqttHeaders.QOS) int qos, String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, topic)
+                                .setHeader(MqttHeaders.QOS, qos)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+            };
+
+            log.info("MQTT 连接刷新完成 -> {} / {}", username, clientId);
+            return gateway;
+        } catch (Exception e) {
+            log.error("MQTT 连接失败 -> {}", clientId, e);
+            throw new RuntimeException("MQTT 连接失败", e);
+        }
+    }
+
+    /* ---------- 私有辅助 ---------- */
+
+    private org.eclipse.paho.client.mqttv3.MqttConnectOptions
+    buildOptions(String u, String p, String url) {
+        org.eclipse.paho.client.mqttv3.MqttConnectOptions opt = 
+                new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
+        opt.setServerURIs(new String[]{url});
+        opt.setUserName(u);
+        if (p != null) opt.setPassword(p.toCharArray());
+        opt.setKeepAliveInterval(keepAlive);
+        return opt;
+    }
+
+    /**
+     * 移除旧的连接实例
+     * @param connectionInfo 连接信息
+     */
+    private void removeOldConnection(ConnectionInfo connectionInfo) {
+        // 移除旧的 Handler
+        if (context.containsBeanDefinition(connectionInfo.handlerBeanName)) {
+            try {
+                MqttPahoMessageHandler oldHandler = context.getBean(connectionInfo.handlerBeanName, MqttPahoMessageHandler.class);
+                oldHandler.stop();
+            } catch (Exception e) {
+                log.warn("停止旧的MQTT处理器时出错: {}", e.getMessage(), e);
+            }
+            context.removeBeanDefinition(connectionInfo.handlerBeanName);
+            log.info("已移除旧的 MQTT 处理器 -> {}", connectionInfo.handlerBeanName);
+        }
+        
+        // 从单例缓存中移除旧的 Handler
+        if (context.getDefaultListableBeanFactory().containsSingleton(connectionInfo.handlerBeanName)) {
+            context.getDefaultListableBeanFactory().destroySingleton(connectionInfo.handlerBeanName);
+        }
+
+        // 移除旧的 Factory
+        if (context.containsBeanDefinition(connectionInfo.factoryBeanName)) {
+            context.removeBeanDefinition(connectionInfo.factoryBeanName);
+            log.info("已移除旧的 MQTT 工厂 -> {}", connectionInfo.factoryBeanName);
+        }
+        
+        // 从单例缓存中移除旧的 Factory
+        if (context.getDefaultListableBeanFactory().containsSingleton(connectionInfo.factoryBeanName)) {
+            context.getDefaultListableBeanFactory().destroySingleton(connectionInfo.factoryBeanName);
+        }
+    }
+
+    /* ---------- 对外可调用的 setter ---------- */
+
+    public MqttConnectionTool defaultTopic(String topic) {
+        this.defaultTopic = topic;
+        return this;
+    }
+
+    public MqttConnectionTool keepAlive(int keepAlive) {
+        this.keepAlive = keepAlive;
+        return this;
+    }
+
+    /* ---------- 复用原来的 Gateway 接口 ---------- */
+    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+    public interface MqttGateway {
+        void sendToMqtt(String payload);
+
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
+                        @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 6 - 6
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,13 +26,13 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    // @Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
+    // @Scheduled(fixedDelay = 26 * 60 * 1000, initialDelay = 0)
     // public void scheduledDeviceDataSync() {
-    //     Integer tenantId = 1208;
-    //     Long engineeringId = 3101130019L;
-    //     String username = "3101130019";
-    //     String password = "ptrEQZK2";
-    //     log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+    //     Integer tenantId = 1205;
+    //     Long engineeringId = 3101070011L;
+    //     String username = "3101070011";
+    //     String password = "5RqhJ7VG";
+    //     log.info("开始执行桃浦象屿人防设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
     //
     //     try {
     //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);

+ 117 - 86
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/WeatherFetcher.java

@@ -2,7 +2,7 @@ package com.usky.cdi.service.util;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
@@ -33,107 +33,138 @@ public class WeatherFetcher {
     private static final double DEFAULT_TEMPERATURE = 15.0;
     private static final int DEFAULT_HUMIDITY = 40;
 
+    // 异步执行的线程池
+    private static final ExecutorService WEATHER_FETCHER_POOL = Executors.newFixedThreadPool(3);
+    private static final int ASYNC_TIMEOUT = 5000; // 异步调用超时时间:5秒
+
     /**
      * 获取天气数据
      * 1. 优先使用缓存数据(如果缓存有效)
-     * 2. 缓存无效时,调用API获取新数据
-     * 3. API调用失败时,使用默认值
+     * 2. 缓存无效时,异步调用API获取新数据
+     * 3. API调用失败或超时,使用默认值
      *
      * @return 包含温度和湿度的Map
      */
     public static Map<String, Double> fetchWeather() {
         // 1. 检查缓存是否有效
         long currentTime = System.currentTimeMillis();
-        if (!weatherCache.isEmpty() && (currentTime - lastUpdateTime) < CACHE_EXPIRE_TIME) {
-            log.debug("使用缓存的天气数据,温度:{}°C,湿度:{}%",
-                    weatherCache.get("temperature"), weatherCache.get("humidity"));
-            return new HashMap<>(weatherCache);
+        synchronized (WeatherFetcher.class) {
+            if (!weatherCache.isEmpty() && (currentTime - lastUpdateTime) < CACHE_EXPIRE_TIME) {
+                log.debug("使用缓存的天气数据,温度:{}°C,湿度:{}%",
+                        weatherCache.get("temperature"), weatherCache.get("humidity"));
+                return new HashMap<>(weatherCache);
+            }
+        }
+
+        // 2. 如果缓存无效,返回当前缓存值(如果有)或默认值
+        Map<String, Double> fallbackData = new HashMap<>();
+        synchronized (WeatherFetcher.class) {
+            if (!weatherCache.isEmpty()) {
+                fallbackData.putAll(weatherCache);
+                log.debug("缓存已过期,使用过期缓存作为临时数据");
+            } else {
+                fallbackData.put("temperature", DEFAULT_TEMPERATURE);
+                fallbackData.put("humidity", (double) DEFAULT_HUMIDITY);
+                log.debug("缓存为空,使用默认天气数据作为临时数据");
+            }
         }
 
-        double tempCelsius = DEFAULT_TEMPERATURE;
-        int humidity = DEFAULT_HUMIDITY;
+        // 3. 异步调用API更新缓存,不阻塞主线程
+        CompletableFuture.supplyAsync(() -> {
+            long asyncStartTime = System.currentTimeMillis();
+            double tempCelsius = DEFAULT_TEMPERATURE;
+            int humidity = DEFAULT_HUMIDITY;
+
+            try {
+                log.info("开始异步调用OpenWeatherMap API获取天气数据");
+                // 1. 构造请求URL
+                URL url = new URL(API_URL);
+
+                // 2. 建立连接并发送请求
+                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+                conn.setRequestMethod("GET");
+                conn.setConnectTimeout(10000); // 减少超时时间到10秒
+                conn.setReadTimeout(10000);
+
+                // 3. 读取响应
+                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+                StringBuilder response = new StringBuilder();
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    response.append(line);
+                }
+                reader.close();
+
+                // 4. 解析JSON数据(使用org.json库)
+                JSONObject jsonResponse = new JSONObject(response.toString());
+                JSONObject main = jsonResponse.getJSONObject("main");
+
+                // 注意:温度默认是开尔文单位,转换为摄氏度需要 -273.15
+                double tempKelvin = main.getDouble("temp");
+                tempCelsius = tempKelvin - 273.15;
+                humidity = main.getInt("humidity");
+                double feelsLikeKelvin = main.getDouble("feels_like");
+                double feelsLikeCelsius = feelsLikeKelvin - 273.15;
+
+                // 提取天气状况描述
+                JSONObject weather = jsonResponse.getJSONArray("weather").getJSONObject(0);
+                String description = weather.getString("description");
+
+                // 记录日志
+                log.info("=== 天气解析结果 ===");
+                log.info("城市: {}", jsonResponse.getString("name"));
+                log.info("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
+                log.info("体感温度: {:.2f}°C", feelsLikeCelsius);
+                log.info("湿度: {}%", humidity);
+                log.info("天气状况: {}", description);
+                log.info("时区偏移: {}小时", (jsonResponse.getInt("timezone") / 3600));
+
+                // 检查是否包含臭氧数据
+                if (jsonResponse.has("air_quality") || jsonResponse.has("o3") || jsonResponse.has("components")) {
+                    log.info("包含空气质量数据");
+                } else {
+                    log.info("当前数据不包含臭氧浓度等空气质量指标");
+                }
+
+                // 更新缓存
+                synchronized (WeatherFetcher.class) {
+                    weatherCache.clear();
+                    weatherCache.put("temperature", tempCelsius);
+                    weatherCache.put("humidity", (double) humidity);
+                    lastUpdateTime = System.currentTimeMillis();
+                    log.info("天气数据缓存更新成功");
+                }
+
+            } catch (Exception e) {
+                log.error("异步获取天气数据失败:{}", e.getMessage());
+                // 异常时使用默认值,但不更新缓存
+            } finally {
+                // 打印API调用结束时间和时长
+                long asyncEndTime = System.currentTimeMillis();
+                long duration = asyncEndTime - asyncStartTime;
+                log.info("OpenWeatherMap API异步调用结束,时长: {}ms", duration);
+            }
 
-        log.debug("开始调用OpenWeatherMap API获取天气数据");
-        long startTime = System.currentTimeMillis();
+            return null;
+        }, WEATHER_FETCHER_POOL);
 
+        // 立即返回临时数据,不等待异步调用完成
+        return fallbackData;
+    }
+
+    /**
+     * 关闭线程池(仅用于测试或应用关闭时)
+     */
+    public static void shutdown() {
+        WEATHER_FETCHER_POOL.shutdown();
         try {
-            // 1. 构造请求URL
-            URL url = new URL(API_URL);
-
-            // 2. 建立连接并发送请求
-            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-            conn.setRequestMethod("GET");
-            conn.setConnectTimeout(15000);
-            conn.setReadTimeout(15000);
-
-            // 3. 读取响应
-            BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
-            StringBuilder response = new StringBuilder();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                response.append(line);
+            if (!WEATHER_FETCHER_POOL.awaitTermination(5, TimeUnit.SECONDS)) {
+                WEATHER_FETCHER_POOL.shutdownNow();
             }
-            reader.close();
-
-            // 4. 解析JSON数据(使用org.json库)
-            JSONObject jsonResponse = new JSONObject(response.toString());
-            JSONObject main = jsonResponse.getJSONObject("main");
-
-            // 提取基础信息
-            String cityName = jsonResponse.getString("name");
-            int timezoneOffset = jsonResponse.getInt("timezone");
-
-            // 注意:温度默认是开尔文单位,转换为摄氏度需要 -273.15
-            double tempKelvin = main.getDouble("temp");
-            tempCelsius = tempKelvin - 273.15;
-            humidity = main.getInt("humidity");
-            double feelsLikeKelvin = main.getDouble("feels_like");
-            double feelsLikeCelsius = feelsLikeKelvin - 273.15;
-
-            // 提取天气状况描述
-            JSONObject weather = jsonResponse.getJSONArray("weather").getJSONObject(0);
-            String description = weather.getString("description");
-
-            // 记录日志,不输出到控制台
-            log.debug("=== 天气解析结果 ===");
-            log.debug("城市: {}", cityName);
-            log.debug("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
-            System.out.println("体感温度: {:.2f}°C (原始: {}K)" + feelsLikeCelsius + "==========" + feelsLikeKelvin);
-            log.debug("体感温度: {:.2f}°C", feelsLikeCelsius);
-            log.debug("湿度: {}%", humidity);
-            log.debug("天气状况: {}", description);
-            log.debug("时区偏移: {}小时", (timezoneOffset / 3600));
-
-            // 检查是否包含臭氧数据
-            if (jsonResponse.has("air_quality") || jsonResponse.has("o3") || jsonResponse.has("components")) {
-                log.debug("包含空气质量数据");
-            } else {
-                log.debug("当前数据不包含臭氧浓度等空气质量指标");
-            }
-
-            // 更新缓存
-            weatherCache.clear();
-            weatherCache.put("temperature", tempCelsius);
-            weatherCache.put("humidity", (double) humidity);
-            lastUpdateTime = currentTime;
-            log.debug("天气数据缓存更新成功");
-
-        } catch (Exception e) {
-            log.error("获取天气数据失败:{}", e.getMessage());
-            // 异常时使用默认值
-            log.warn("使用默认天气数据,温度:{}°C,湿度:{}%", DEFAULT_TEMPERATURE, DEFAULT_HUMIDITY);
-        } finally {
-            // 打印API调用结束时间和时长
-            long endTime = System.currentTimeMillis();
-            long duration = endTime - startTime;
-            System.out.println("第三方天气API调用时长" + duration + "毫秒");
-            log.info("OpenWeatherMap API调用结束,开始时间: {}, 结束时间: {}, 时长: {}ms", startTime, endTime, duration);
+        } catch (InterruptedException e) {
+            WEATHER_FETCHER_POOL.shutdownNow();
+            Thread.currentThread().interrupt();
         }
-
-        Map<String, Double> resultMap = new HashMap<>();
-        resultMap.put("temperature", tempCelsius);
-        resultMap.put("humidity", (double) humidity);
-        return resultMap;
     }
 
     public static void main(String[] args) {

+ 5 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/IotDataTransferVO.java

@@ -25,4 +25,9 @@ public class IotDataTransferVO {
      * 产品ID
      */
     private Integer deviceType;
+    
+    /**
+     * MQTT用户名
+     */
+    private String username;
 }