Kaynağa Gözat

修复天气获取慢和代码优化

fuyuchuan 1 gün önce
ebeveyn
işleme
304fa803d0

+ 5 - 5
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java

@@ -47,11 +47,11 @@ public class DeviceFieldConfig {
         // 一氧化碳传感器(711)
         fieldMapping.put("711", "co");
 
-        // 倾斜传感器
-        fieldMapping.put("712", "qx");
-
-        // 裂缝传感器
-        fieldMapping.put("713", "cd");
+        // // 倾斜传感器
+        // fieldMapping.put("712", "qx");
+        //
+        // // 裂缝传感器
+        // fieldMapping.put("713", "cd");
 
         // 位移传感器
         fieldMapping.put("714", "wy");

+ 5 - 3
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -73,10 +73,12 @@ public class MqttOutConfig {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
 
         // 设置默认的MqttConnectOptions,确保serverURIs不为null
-        // 实际使用时,会在createMqttConnection方法中重新配置
+        // 使用时,会在createMqttConnection方法中重新配置
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"}); // 设置默认的服务器地址
-        options.setKeepAliveInterval(60); // 设置默认的心跳间隔
+        // 设置默认的服务器地址
+        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"});
+        // 设置默认的心跳间隔
+        options.setKeepAliveInterval(60);
         factory.setConnectionOptions(options);
 
         return factory;

+ 2 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/BaseDataTransferService.java

@@ -224,8 +224,8 @@ public class BaseDataTransferService {
             userIdToName.put(709, 15);
             userIdToName.put(710, 16);
             userIdToName.put(711, 2);
-            userIdToName.put(712, 34);
-            userIdToName.put(713, 36);
+            //userIdToName.put(712, 34);
+            //userIdToName.put(713, 36);
             userIdToName.put(714, 37);
 
             HashMap<String, Object> map = new HashMap<>();

+ 252 - 198
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -7,7 +7,6 @@ import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.domain.DmpProduct;
 import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.mapper.DmpProductMapper;
-import com.usky.cdi.service.config.mqtt.MqttBaseConfig;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
 import com.usky.cdi.service.util.DeviceDataQuery;
@@ -18,24 +17,26 @@ import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
 import org.springframework.context.ApplicationContext;
-import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
+import org.springframework.integration.endpoint.EventDrivenConsumer;
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.SubscribableChannel;
 import org.springframework.stereotype.Service;
-import org.springframework.web.context.ContextLoader;
 
 import javax.annotation.PostConstruct;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -59,6 +60,10 @@ public class IotDataTransferService {
     private static final int KEEP_ALIVE_INTERVAL = 60;
     private static final int COMPLETION_TIMEOUT = 5000;
 
+    // 存储每个任务的MQTT客户端工厂和网关
+    private final Map<String, MqttOutConfig.MqttGateway> mqttGatewayMap = new ConcurrentHashMap<>();
+    private final Map<String, DefaultMqttPahoClientFactory> mqttClientFactoryMap = new ConcurrentHashMap<>();
+
     private SnowflakeIdGenerator idGenerator;
 
     @Autowired
@@ -110,7 +115,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -154,7 +159,7 @@ public class IotDataTransferService {
                 vo.setDataEndTime(dataEndTime);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息");
+                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
@@ -184,7 +189,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -229,19 +234,19 @@ public class IotDataTransferService {
                 try {
                     switch (deviceType) {
                         case 707:
-                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 708:
-                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 709:
-                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 710:
-                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 711:
-                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                     }
                 } catch (Exception e) {
@@ -278,7 +283,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -316,7 +321,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(0);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况");
+                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
@@ -345,7 +350,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -395,7 +400,7 @@ public class IotDataTransferService {
                         deviceDataItem.getFloat("active_power"));
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况");
+                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
@@ -421,8 +426,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("wd");
         if (value == null) {
             log.warn("设备{}的温度数据为空", deviceId);
@@ -435,7 +441,7 @@ public class IotDataTransferService {
         tempVO.setPublishTime(getCurrentTime());
         tempVO.setSensorValue(value);
         tempVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息", username);
     }
 
     /**
@@ -445,8 +451,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("sd");
         if (value == null) {
             log.warn("设备{}的湿度数据为空", deviceId);
@@ -459,7 +466,7 @@ public class IotDataTransferService {
         humidityVO.setPublishTime(getCurrentTime());
         humidityVO.setSensorValue(value);
         humidityVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息", username);
     }
 
     /**
@@ -469,8 +476,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("o2");
         if (value == null) {
             log.warn("设备{}的氧气浓度数据为空", deviceId);
@@ -483,7 +491,7 @@ public class IotDataTransferService {
         oxygenVO.setPublishTime(getCurrentTime());
         oxygenVO.setSensorValue(value);
         oxygenVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息", username);
     }
 
     /**
@@ -493,8 +501,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("co");
         if (value == null) {
             log.warn("设备{}的一氧化碳浓度数据为空", deviceId);
@@ -507,7 +516,7 @@ public class IotDataTransferService {
         coVO.setPublishTime(getCurrentTime());
         coVO.setSensorValue(value);
         coVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息", username);
     }
 
     /**
@@ -517,8 +526,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("co2");
         if (value == null) {
             log.warn("设备{}的二氧化碳浓度数据为空", deviceId);
@@ -531,7 +541,7 @@ public class IotDataTransferService {
         co2VO.setPublishTime(getCurrentTime());
         co2VO.setSensorValue(value);
         co2VO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息", username);
     }
 
     /**
@@ -539,144 +549,144 @@ public class IotDataTransferService {
      *
      * @return 推送结果,包含成功数和失败数
      **/
-    public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
-        Map<String, Integer> result = new HashMap<>();
-        result.put("successCount", 0);
-        result.put("failureCount", 0);
-
-        if (!validateMqttGateway()) {
-            return result;
-        }
-
-        try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
-            Integer deviceType = transferVO.getDeviceType();
-            Integer totalDevices = transferVO.getDevices().size();
-
-            log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
-                    deviceType, totalDevices, deviceData.size());
-
-            if (deviceData.isEmpty()) {
-                log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
-                result.put("failureCount", totalDevices);
-                return result;
-            }
-
-            Long engineeringId = transferVO.getEngineeringId();
-            for (JSONObject deviceDataItem : deviceData) {
-                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
-                if (dataEndTime == null) {
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                Integer deviceId = deviceDataItem.getIntValue("device_id");
-                Integer value = deviceDataItem.getInteger("qx");
-                if (value == null) {
-                    log.warn("设备{}的倾斜数据为空", deviceId);
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                TiltVO vo = new TiltVO();
-                vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceId);
-                vo.setEngineeringID(engineeringId);
-                vo.setPublishTime(getCurrentTime());
-                vo.setDataEndTime(dataEndTime);
-                vo.setSensorValue(value == 0 ? 0 : 1);
-
-                try {
-                    sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
-                    result.put("successCount", result.get("successCount") + 1);
-                } catch (Exception e) {
-                    log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
-                    result.put("failureCount", result.get("failureCount") + 1);
-                }
-            }
-
-            log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
-                    deviceType, result.get("successCount"), result.get("failureCount"));
-
-            return result;
-        } catch (Exception e) {
-            log.error("倾斜数据推送发生异常", e);
-            result.put("failureCount", transferVO.getDevices().size());
-            return result;
-        }
-    }
+    // public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
+    //     Map<String, Integer> result = new HashMap<>();
+    //     result.put("successCount", 0);
+    //     result.put("failureCount", 0);
+    //
+    //     if (!validateMqttGateway()) {
+    //         return result;
+    //     }
+    //
+    //     try {
+    //         List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+    //         Integer deviceType = transferVO.getDeviceType();
+    //         Integer totalDevices = transferVO.getDevices().size();
+    //
+    //         log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+    //                 deviceType, totalDevices, deviceData.size());
+    //
+    //         if (deviceData.isEmpty()) {
+    //             log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
+    //             result.put("failureCount", totalDevices);
+    //             return result;
+    //         }
+    //
+    //         Long engineeringId = transferVO.getEngineeringId();
+    //         for (JSONObject deviceDataItem : deviceData) {
+    //             LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+    //             if (dataEndTime == null) {
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             Integer deviceId = deviceDataItem.getIntValue("device_id");
+    //             Integer value = deviceDataItem.getInteger("qx");
+    //             if (value == null) {
+    //                 log.warn("设备{}的倾斜数据为空", deviceId);
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             TiltVO vo = new TiltVO();
+    //             vo.setDataPacketID(generateDataPacketID());
+    //             vo.setSensorID(deviceId);
+    //             vo.setEngineeringID(engineeringId);
+    //             vo.setPublishTime(getCurrentTime());
+    //             vo.setDataEndTime(dataEndTime);
+    //             vo.setSensorValue(value == 0 ? 0 : 1);
+    //
+    //             try {
+    //                 sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
+    //                 result.put("successCount", result.get("successCount") + 1);
+    //             } catch (Exception e) {
+    //                 log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //             }
+    //         }
+    //
+    //         log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
+    //                 deviceType, result.get("successCount"), result.get("failureCount"));
+    //
+    //         return result;
+    //     } catch (Exception e) {
+    //         log.error("倾斜数据推送发生异常", e);
+    //         result.put("failureCount", transferVO.getDevices().size());
+    //         return result;
+    //     }
+    // }
 
     /**
      * 发送裂缝数据(713)
      *
      * @return 推送结果,包含成功数和失败数
      **/
-    public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
-        Map<String, Integer> result = new HashMap<>();
-        result.put("successCount", 0);
-        result.put("failureCount", 0);
-
-        if (!validateMqttGateway()) {
-            return result;
-        }
-
-        try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
-            Integer deviceType = transferVO.getDeviceType();
-            Integer totalDevices = transferVO.getDevices().size();
-
-            log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
-                    deviceType, totalDevices, deviceData.size());
-
-            if (deviceData.isEmpty()) {
-                log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
-                result.put("failureCount", totalDevices);
-                return result;
-            }
-
-            Long engineeringId = transferVO.getEngineeringId();
-            for (JSONObject deviceDataItem : deviceData) {
-                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
-                if (dataEndTime == null) {
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                Integer deviceId = deviceDataItem.getIntValue("device_id");
-                Integer value = deviceDataItem.getInteger("cd");
-                if (value == null) {
-                    log.warn("设备{}的裂缝数据为空", deviceId);
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                CrackVO vo = new CrackVO();
-                vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceId);
-                vo.setEngineeringID(engineeringId);
-                vo.setPublishTime(getCurrentTime());
-                vo.setDataEndTime(dataEndTime);
-                vo.setSensorValue(value == 0 ? 0 : 1);
-
-                try {
-                    sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
-                    result.put("successCount", result.get("successCount") + 1);
-                } catch (Exception e) {
-                    log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
-                    result.put("failureCount", result.get("failureCount") + 1);
-                }
-            }
-
-            log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
-                    deviceType, result.get("successCount"), result.get("failureCount"));
-
-            return result;
-        } catch (Exception e) {
-            log.error("裂缝数据推送发生异常", e);
-            result.put("failureCount", transferVO.getDevices().size());
-            return result;
-        }
-    }
+    // public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
+    //     Map<String, Integer> result = new HashMap<>();
+    //     result.put("successCount", 0);
+    //     result.put("failureCount", 0);
+    //
+    //     if (!validateMqttGateway()) {
+    //         return result;
+    //     }
+    //
+    //     try {
+    //         List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+    //         Integer deviceType = transferVO.getDeviceType();
+    //         Integer totalDevices = transferVO.getDevices().size();
+    //
+    //         log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+    //                 deviceType, totalDevices, deviceData.size());
+    //
+    //         if (deviceData.isEmpty()) {
+    //             log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
+    //             result.put("failureCount", totalDevices);
+    //             return result;
+    //         }
+    //
+    //         Long engineeringId = transferVO.getEngineeringId();
+    //         for (JSONObject deviceDataItem : deviceData) {
+    //             LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+    //             if (dataEndTime == null) {
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             Integer deviceId = deviceDataItem.getIntValue("device_id");
+    //             Integer value = deviceDataItem.getInteger("cd");
+    //             if (value == null) {
+    //                 log.warn("设备{}的裂缝数据为空", deviceId);
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             CrackVO vo = new CrackVO();
+    //             vo.setDataPacketID(generateDataPacketID());
+    //             vo.setSensorID(deviceId);
+    //             vo.setEngineeringID(engineeringId);
+    //             vo.setPublishTime(getCurrentTime());
+    //             vo.setDataEndTime(dataEndTime);
+    //             vo.setSensorValue(value == 0 ? 0 : 1);
+    //
+    //             try {
+    //                 sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
+    //                 result.put("successCount", result.get("successCount") + 1);
+    //             } catch (Exception e) {
+    //                 log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //             }
+    //         }
+    //
+    //         log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
+    //                 deviceType, result.get("successCount"), result.get("failureCount"));
+    //
+    //         return result;
+    //     } catch (Exception e) {
+    //         log.error("裂缝数据推送发生异常", e);
+    //         result.put("failureCount", transferVO.getDevices().size());
+    //         return result;
+    //     }
+    // }
 
     /**
      * 发送位移数据(714)
@@ -688,7 +698,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -731,7 +741,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(value == 0 ? 0 : 1);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息");
+                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
@@ -789,6 +799,7 @@ public class IotDataTransferService {
             transferVO.setDeviceType(deviceType);
             transferVO.setDevices(devices);
             transferVO.setEngineeringId(engineeringId);
+            transferVO.setUsername(username); // 保存当前任务的用户名
             transferList.add(transferVO);
         });
 
@@ -839,12 +850,12 @@ public class IotDataTransferService {
                 case 704:
                     result = sendElectricityLoad(transferVO);
                     break;
-                case 712:
-                    result = sendTiltData(transferVO);
-                    break;
-                case 713:
-                    result = sendCrackData(transferVO);
-                    break;
+                // case 712:
+                //     result = sendTiltData(transferVO);
+                //     break;
+                // case 713:
+                //     result = sendCrackData(transferVO);
+                //     break;
                 case 714:
                     result = sendDeviationData(transferVO);
                     break;
@@ -899,42 +910,78 @@ public class IotDataTransferService {
     }
 
     /**
-     * 手动创建MQTT连接
+     * 手动创建/刷新 MQTT 连接(含动态 clientId)
      * @param username MQTT用户名
      * @param password MQTT密码
      */
     public void createMqttConnection(String username, String password) {
         try {
-            // 使用注入的ApplicationContext获取已有的mqttGateway实例
-            // 因为我们保留了@MessagingGateway注解,Spring会自动创建这个实例
+            // 使用注入的ApplicationContext获取或创建MQTT连接
             if (this.context == null) {
                 throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
             }
 
-            // 1. 获取mqttGateway实例
-            this.mqttGateway = this.context.getBean(MqttOutConfig.MqttGateway.class);
-            if (this.mqttGateway == null) {
-                throw new IllegalStateException("MQTT Gateway未找到,无法发送消息");
-            }
+            // 检查是否已经为该用户创建了MQTT连接
+            if (!mqttGatewayMap.containsKey(username)) {
+                synchronized (this) {
+                    // 双重检查锁定
+                    if (!mqttGatewayMap.containsKey(username)) {
+                        // 1. 创建新的MQTT客户端工厂
+                        DefaultMqttPahoClientFactory mqttClientFactory = new DefaultMqttPahoClientFactory();
 
-            // 2. 获取现有的mqttClientFactory实例
-            DefaultMqttPahoClientFactory mqttClientFactory = this.context.getBean(DefaultMqttPahoClientFactory.class);
-            if (mqttClientFactory == null) {
-                throw new IllegalStateException("MQTT Client Factory未找到,无法创建MQTT连接");
-            }
+                        // 2. 创建并配置MqttConnectOptions
+                        MqttConnectOptions options = new MqttConnectOptions();
+                        options.setServerURIs(new String[]{MQTT_URL});
+                        options.setUserName(username);
+                        options.setPassword(password.toCharArray());
+                        options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
+
+                        // 3. 设置连接选项
+                        mqttClientFactory.setConnectionOptions(options);
+
+                        // 4. 创建唯一的客户端ID
+                        String clientId = "mqttx-" + username;
+
+                        // 5. 创建MQTT消息处理器
+                        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory);
+                        messageHandler.setAsync(true);
+                        messageHandler.setDefaultTopic("testTopic");
+
+                        // 6. 获取消息通道
+                        MessageChannel mqttOutboundChannel = context.getBean("mqttOutboundChannel", MessageChannel.class);
 
-            // 3. 创建并配置MqttConnectOptions
-            MqttConnectOptions options = new MqttConnectOptions();
-            options.setServerURIs(new String[]{MQTT_URL});
-            options.setUserName(username);
-            options.setPassword(password.toCharArray());
-            options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
+                        // 7. 注册消息处理器到通道
+                        IntegrationFlow flow = IntegrationFlows.from(mqttOutboundChannel)
+                                .handle(messageHandler)
+                                .get();
 
-            // 4. 更新mqttClientFactory的连接选项
-            mqttClientFactory.setConnectionOptions(options);
+                        // 8. 注册IntegrationFlow
+                        ((GenericApplicationContext) context).registerBean("mqttFlow-" + username, IntegrationFlow.class, () -> flow);
+                        ((GenericApplicationContext) context).refresh();
 
-            log.info("MQTT Gateway初始化成功,用户名:{}", username);
-            log.info("MQTT连接配置完成,服务器地址:{},客户端ID:mqttx-{}", MQTT_URL, username);
+                        // 9. 获取MQTT Gateway实例
+                        MqttOutConfig.MqttGateway mqttGateway = context.getBean(MqttOutConfig.MqttGateway.class);
+
+                        // 10. 存储到映射中
+                        mqttGatewayMap.put(username, mqttGateway);
+                        mqttClientFactoryMap.put(username, mqttClientFactory);
+
+                        log.info("MQTT连接创建成功,用户名:{},客户端ID:{}", username, clientId);
+                    }
+                }
+            } else {
+                // 连接已存在,更新密码
+                DefaultMqttPahoClientFactory clientFactory = mqttClientFactoryMap.get(username);
+                if (clientFactory != null) {
+                    MqttConnectOptions options = new MqttConnectOptions();
+                    options.setServerURIs(new String[]{MQTT_URL});
+                    options.setUserName(username);
+                    options.setPassword(password.toCharArray());
+                    options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
+                    clientFactory.setConnectionOptions(options);
+                    log.info("MQTT连接密码已更新,用户名:{}", username);
+                }
+            }
         } catch (Exception e) {
             log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
             throw new RuntimeException("初始化MQTT连接失败", e);
@@ -943,11 +990,12 @@ public class IotDataTransferService {
 
     /**
      * 验证MQTT网关是否初始化
+     * @param username 用户名
      * @return 是否初始化
      */
-    private boolean validateMqttGateway() {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
+    private boolean validateMqttGateway(String username) {
+        if (username == null || !mqttGatewayMap.containsKey(username) || mqttGatewayMap.get(username) == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息,用户名:{}", username);
             return false;
         }
         return true;
@@ -973,12 +1021,18 @@ public class IotDataTransferService {
      * @param topicEnum 主题枚举
      * @param vo 消息对象
      * @param messageType 消息类型描述
+     * @param username 用户名
      */
-    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType) {
+    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType, String username) {
         String json = JSON.toJSONString(vo);
         String topic = topicEnum.getTopic();
         // 不再记录每条数据的详情,只记录发送操作
-        mqttGateway.sendToMqtt(topic, json);
+        MqttOutConfig.MqttGateway gateway = mqttGatewayMap.get(username);
+        if (gateway != null) {
+            gateway.sendToMqtt(topic, json);
+        } else {
+            log.warn("MQTT Gateway未找到,无法发送消息,用户名:{}", username);
+        }
     }
 
     public void allData(Long engineeringId, String username, String password) {

+ 117 - 86
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/WeatherFetcher.java

@@ -2,7 +2,7 @@ package com.usky.cdi.service.util;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
@@ -33,107 +33,138 @@ public class WeatherFetcher {
     private static final double DEFAULT_TEMPERATURE = 15.0;
     private static final int DEFAULT_HUMIDITY = 40;
 
+    // 异步执行的线程池
+    private static final ExecutorService WEATHER_FETCHER_POOL = Executors.newFixedThreadPool(3);
+    private static final int ASYNC_TIMEOUT = 5000; // 异步调用超时时间:5秒
+
     /**
      * 获取天气数据
      * 1. 优先使用缓存数据(如果缓存有效)
-     * 2. 缓存无效时,调用API获取新数据
-     * 3. API调用失败时,使用默认值
+     * 2. 缓存无效时,异步调用API获取新数据
+     * 3. API调用失败或超时,使用默认值
      *
      * @return 包含温度和湿度的Map
      */
     public static Map<String, Double> fetchWeather() {
         // 1. 检查缓存是否有效
         long currentTime = System.currentTimeMillis();
-        if (!weatherCache.isEmpty() && (currentTime - lastUpdateTime) < CACHE_EXPIRE_TIME) {
-            log.debug("使用缓存的天气数据,温度:{}°C,湿度:{}%",
-                    weatherCache.get("temperature"), weatherCache.get("humidity"));
-            return new HashMap<>(weatherCache);
+        synchronized (WeatherFetcher.class) {
+            if (!weatherCache.isEmpty() && (currentTime - lastUpdateTime) < CACHE_EXPIRE_TIME) {
+                log.debug("使用缓存的天气数据,温度:{}°C,湿度:{}%",
+                        weatherCache.get("temperature"), weatherCache.get("humidity"));
+                return new HashMap<>(weatherCache);
+            }
+        }
+
+        // 2. 如果缓存无效,返回当前缓存值(如果有)或默认值
+        Map<String, Double> fallbackData = new HashMap<>();
+        synchronized (WeatherFetcher.class) {
+            if (!weatherCache.isEmpty()) {
+                fallbackData.putAll(weatherCache);
+                log.debug("缓存已过期,使用过期缓存作为临时数据");
+            } else {
+                fallbackData.put("temperature", DEFAULT_TEMPERATURE);
+                fallbackData.put("humidity", (double) DEFAULT_HUMIDITY);
+                log.debug("缓存为空,使用默认天气数据作为临时数据");
+            }
         }
 
-        double tempCelsius = DEFAULT_TEMPERATURE;
-        int humidity = DEFAULT_HUMIDITY;
+        // 3. 异步调用API更新缓存,不阻塞主线程
+        CompletableFuture.supplyAsync(() -> {
+            long asyncStartTime = System.currentTimeMillis();
+            double tempCelsius = DEFAULT_TEMPERATURE;
+            int humidity = DEFAULT_HUMIDITY;
+
+            try {
+                log.info("开始异步调用OpenWeatherMap API获取天气数据");
+                // 1. 构造请求URL
+                URL url = new URL(API_URL);
+
+                // 2. 建立连接并发送请求
+                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+                conn.setRequestMethod("GET");
+                conn.setConnectTimeout(10000); // 减少超时时间到10秒
+                conn.setReadTimeout(10000);
+
+                // 3. 读取响应
+                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+                StringBuilder response = new StringBuilder();
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    response.append(line);
+                }
+                reader.close();
+
+                // 4. 解析JSON数据(使用org.json库)
+                JSONObject jsonResponse = new JSONObject(response.toString());
+                JSONObject main = jsonResponse.getJSONObject("main");
+
+                // 注意:温度默认是开尔文单位,转换为摄氏度需要 -273.15
+                double tempKelvin = main.getDouble("temp");
+                tempCelsius = tempKelvin - 273.15;
+                humidity = main.getInt("humidity");
+                double feelsLikeKelvin = main.getDouble("feels_like");
+                double feelsLikeCelsius = feelsLikeKelvin - 273.15;
+
+                // 提取天气状况描述
+                JSONObject weather = jsonResponse.getJSONArray("weather").getJSONObject(0);
+                String description = weather.getString("description");
+
+                // 记录日志
+                log.info("=== 天气解析结果 ===");
+                log.info("城市: {}", jsonResponse.getString("name"));
+                log.info("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
+                log.info("体感温度: {:.2f}°C", feelsLikeCelsius);
+                log.info("湿度: {}%", humidity);
+                log.info("天气状况: {}", description);
+                log.info("时区偏移: {}小时", (jsonResponse.getInt("timezone") / 3600));
+
+                // 检查是否包含臭氧数据
+                if (jsonResponse.has("air_quality") || jsonResponse.has("o3") || jsonResponse.has("components")) {
+                    log.info("包含空气质量数据");
+                } else {
+                    log.info("当前数据不包含臭氧浓度等空气质量指标");
+                }
+
+                // 更新缓存
+                synchronized (WeatherFetcher.class) {
+                    weatherCache.clear();
+                    weatherCache.put("temperature", tempCelsius);
+                    weatherCache.put("humidity", (double) humidity);
+                    lastUpdateTime = System.currentTimeMillis();
+                    log.info("天气数据缓存更新成功");
+                }
+
+            } catch (Exception e) {
+                log.error("异步获取天气数据失败:{}", e.getMessage());
+                // 异常时使用默认值,但不更新缓存
+            } finally {
+                // 打印API调用结束时间和时长
+                long asyncEndTime = System.currentTimeMillis();
+                long duration = asyncEndTime - asyncStartTime;
+                log.info("OpenWeatherMap API异步调用结束,时长: {}ms", duration);
+            }
 
-        log.debug("开始调用OpenWeatherMap API获取天气数据");
-        long startTime = System.currentTimeMillis();
+            return null;
+        }, WEATHER_FETCHER_POOL);
 
+        // 立即返回临时数据,不等待异步调用完成
+        return fallbackData;
+    }
+
+    /**
+     * 关闭线程池(仅用于测试或应用关闭时)
+     */
+    public static void shutdown() {
+        WEATHER_FETCHER_POOL.shutdown();
         try {
-            // 1. 构造请求URL
-            URL url = new URL(API_URL);
-
-            // 2. 建立连接并发送请求
-            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-            conn.setRequestMethod("GET");
-            conn.setConnectTimeout(15000);
-            conn.setReadTimeout(15000);
-
-            // 3. 读取响应
-            BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
-            StringBuilder response = new StringBuilder();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                response.append(line);
+            if (!WEATHER_FETCHER_POOL.awaitTermination(5, TimeUnit.SECONDS)) {
+                WEATHER_FETCHER_POOL.shutdownNow();
             }
-            reader.close();
-
-            // 4. 解析JSON数据(使用org.json库)
-            JSONObject jsonResponse = new JSONObject(response.toString());
-            JSONObject main = jsonResponse.getJSONObject("main");
-
-            // 提取基础信息
-            String cityName = jsonResponse.getString("name");
-            int timezoneOffset = jsonResponse.getInt("timezone");
-
-            // 注意:温度默认是开尔文单位,转换为摄氏度需要 -273.15
-            double tempKelvin = main.getDouble("temp");
-            tempCelsius = tempKelvin - 273.15;
-            humidity = main.getInt("humidity");
-            double feelsLikeKelvin = main.getDouble("feels_like");
-            double feelsLikeCelsius = feelsLikeKelvin - 273.15;
-
-            // 提取天气状况描述
-            JSONObject weather = jsonResponse.getJSONArray("weather").getJSONObject(0);
-            String description = weather.getString("description");
-
-            // 记录日志,不输出到控制台
-            log.debug("=== 天气解析结果 ===");
-            log.debug("城市: {}", cityName);
-            log.debug("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
-            System.out.println("体感温度: {:.2f}°C (原始: {}K)" + feelsLikeCelsius + "==========" + feelsLikeKelvin);
-            log.debug("体感温度: {:.2f}°C", feelsLikeCelsius);
-            log.debug("湿度: {}%", humidity);
-            log.debug("天气状况: {}", description);
-            log.debug("时区偏移: {}小时", (timezoneOffset / 3600));
-
-            // 检查是否包含臭氧数据
-            if (jsonResponse.has("air_quality") || jsonResponse.has("o3") || jsonResponse.has("components")) {
-                log.debug("包含空气质量数据");
-            } else {
-                log.debug("当前数据不包含臭氧浓度等空气质量指标");
-            }
-
-            // 更新缓存
-            weatherCache.clear();
-            weatherCache.put("temperature", tempCelsius);
-            weatherCache.put("humidity", (double) humidity);
-            lastUpdateTime = currentTime;
-            log.debug("天气数据缓存更新成功");
-
-        } catch (Exception e) {
-            log.error("获取天气数据失败:{}", e.getMessage());
-            // 异常时使用默认值
-            log.warn("使用默认天气数据,温度:{}°C,湿度:{}%", DEFAULT_TEMPERATURE, DEFAULT_HUMIDITY);
-        } finally {
-            // 打印API调用结束时间和时长
-            long endTime = System.currentTimeMillis();
-            long duration = endTime - startTime;
-            System.out.println("第三方天气API调用时长" + duration + "毫秒");
-            log.info("OpenWeatherMap API调用结束,开始时间: {}, 结束时间: {}, 时长: {}ms", startTime, endTime, duration);
+        } catch (InterruptedException e) {
+            WEATHER_FETCHER_POOL.shutdownNow();
+            Thread.currentThread().interrupt();
         }
-
-        Map<String, Double> resultMap = new HashMap<>();
-        resultMap.put("temperature", tempCelsius);
-        resultMap.put("humidity", (double) humidity);
-        return resultMap;
     }
 
     public static void main(String[] args) {

+ 5 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/IotDataTransferVO.java

@@ -25,4 +25,9 @@ public class IotDataTransferVO {
      * 产品ID
      */
     private Integer deviceType;
+    
+    /**
+     * MQTT用户名
+     */
+    private String username;
 }