Просмотр исходного кода

修改定时任务逻辑可配置租户和mqtt推送数据

fuyuchuan 6 дней назад
Родитель
Сommit
2497c5a627
15 измененных файлов с 550 добавлено и 68 удалено
  1. 8 1
      service-cdi/service-cdi-api/src/main/java/com/usky/cdi/RemotecdiTaskService.java
  2. 6 1
      service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/RemotecdiTaskFactory.java
  3. 7 2
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/RemotecdiTaskApi.java
  4. 9 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java
  5. 3 11
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttBaseConfig.java
  6. 1 3
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttInConfig.java
  7. 3 5
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java
  8. 10 1
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/enums/EnvMonitorMqttTopic.java
  9. 361 12
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java
  10. 20 17
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java
  11. 12 12
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java
  12. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/CrackVO.java
  13. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/DeviationVO.java
  14. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/TiltVO.java
  15. 8 3
      service-job/src/main/java/com/ruoyi/job/task/RyTask.java

+ 8 - 1
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/RemotecdiTaskService.java

@@ -9,5 +9,12 @@ import org.springframework.web.bind.annotation.RequestParam;
 public interface RemotecdiTaskService {
     @GetMapping("/synchronizeDeviceData")
     void synchronizeDeviceData(@RequestParam("tenantId") Integer tenantId,
-                               @RequestParam("engineeringId") Long engineeringId);
+                               @RequestParam("engineeringId") Long engineeringId,
+                               @RequestParam("username") String username,
+                               @RequestParam("password") String password);
+
+    @GetMapping("/allData")
+    void allData(@RequestParam("engineeringId") Long engineeringId,
+                               @RequestParam("username") String username,
+                               @RequestParam("password") String password);
 }

+ 6 - 1
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/RemotecdiTaskFactory.java

@@ -16,7 +16,12 @@ public class RemotecdiTaskFactory implements FallbackFactory<RemotecdiTaskServic
         log.error("用户服务调用失败:{}", throwable.getMessage());
         return new RemotecdiTaskService() {
             @Override
-            public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
+            public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+                throw new FeignBadRequestException(500, "人防设备数据定时推送异常(带租户)" + throwable.getMessage());
+            }
+
+            @Override
+            public void allData(Long engineeringId, String username, String password) {
                 throw new FeignBadRequestException(500, "人防设备数据定时推送异常" + throwable.getMessage());
             }
 

+ 7 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/RemotecdiTaskApi.java

@@ -19,7 +19,12 @@ public class RemotecdiTaskApi implements RemotecdiTaskService {
     private IotDataTransferService iotDataTransferService;
 
     @Override
-    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
-        iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);
+    }
+
+    @Override
+    public void allData(Long engineeringId, String username, String password) {
+        iotDataTransferService.allData(engineeringId, username, password);
     }
 }

+ 9 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java

@@ -47,6 +47,15 @@ public class DeviceFieldConfig {
         // 一氧化碳传感器(711)
         fieldMapping.put("711", "co");
 
+        // 倾斜传感器
+        fieldMapping.put("712", "qx");
+
+        // 裂缝传感器
+        fieldMapping.put("713", "cd");
+
+        // 位移传感器
+        fieldMapping.put("714", "wy");
+
         return fieldMapping;
     }
 }

+ 3 - 11
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttBaseConfig.java

@@ -10,33 +10,24 @@ import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
 import org.springframework.stereotype.Component;
 
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 @Data
-@Component
-@ConfigurationProperties(prefix = "mqtt")
 public class MqttBaseConfig {
 
-    @Value("${mqtt.username}")
     private String username;
 
-    @Value("${mqtt.password}")
     private String password;
 
-    @Value("${mqtt.url}")
     private String hostUrl;
 
-    @Value("${mqtt.sub-topics}")
     private String msgTopic;
 
-    @Value("${mqtt.keep-alive-interval}")
     //心跳间隔
     private int keepAliveInterval;
-    @Value("${mqtt.completionTimeout}")
-    //心跳间隔
+    
+    //完成超时
     private int completionTimeout;
 
 
-    @Bean
     public MqttPahoClientFactory mqttClientFactory() {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();
@@ -45,6 +36,7 @@ public class MqttBaseConfig {
         if (this.getPassword() != null) {
             options.setPassword(this.getPassword().toCharArray());
         }
+        options.setKeepAliveInterval(this.getKeepAliveInterval());
         factory.setConnectionOptions(options);
         return factory;
     }

+ 1 - 3
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttInConfig.java

@@ -14,11 +14,9 @@ import org.springframework.messaging.MessageChannel;
  * @author han
  * @date 2025/03/20 14:30
  */
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 @Configuration
 public class MqttInConfig {
-    @Autowired
-    private MqttBaseConfig mqttBaseConfig;
+    public MqttBaseConfig mqttBaseConfig;
 
     public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
 

+ 3 - 5
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -20,10 +20,8 @@ import java.util.Map;
  * @author han
  * @date 2025/03/20 14:31
  */
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 @Configuration
 public class MqttOutConfig {
-    @Autowired
     public MqttBaseConfig mqttBaseConfig;
 
     public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
@@ -50,9 +48,9 @@ public class MqttOutConfig {
     @Bean(name = MESSAGE_NAME)
     @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
     public MessageHandler outbound() {
-        // 在这里进行mqttOutboundChannel的相关设置
-        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
-        MqttPahoMessageHandler messageHandler =
+        // 根据username动态生成client-id,格式:mqttx-username
+        String clientId = "mqttx-" + mqttBaseConfig.getUsername();
+        MqttPahoMessageHandler messageHandler = 
                 new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
         //如果设置成true,发送消息时将不会阻塞。
         messageHandler.setAsync(true);

+ 10 - 1
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/enums/EnvMonitorMqttTopic.java

@@ -31,7 +31,16 @@ public enum EnvMonitorMqttTopic {
     PERSON_PRESENCE("iotInfo/personPresence"),
 
     // 用电负荷(后续用电负荷用)
-    ELECTRICITY_LOAD("iotInfo/electricityLoad");
+    ELECTRICITY_LOAD("iotInfo/electricityLoad"),
+
+    // 倾斜
+    TILT("iotInfo/tilt"),
+
+    // 裂缝
+    CRACK("iotInfo/crack"),
+
+    // 位移
+    DEVIATION("iotInfo/deviation");
 
     private final String topic;
 

+ 361 - 12
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -7,7 +7,7 @@ import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.domain.DmpProduct;
 import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.mapper.DmpProductMapper;
-//import com.usky.cdi.service.config.mqtt.MqttGateway;
+import com.usky.cdi.service.config.mqtt.MqttBaseConfig;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
 import com.usky.cdi.service.util.DeviceDataQuery;
@@ -15,14 +15,18 @@ import com.usky.cdi.service.util.SnowflakeIdGenerator;
 import com.usky.cdi.service.vo.IotDataTransferVO;
 import com.usky.cdi.service.vo.base.*;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.stereotype.Service;
+import org.springframework.web.context.ContextLoader;
 
 import javax.annotation.PostConstruct;
 
-import javax.annotation.Resource;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -39,12 +43,16 @@ import java.util.stream.Collectors;
  */
 @Slf4j
 @Service
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 public class IotDataTransferService {
 
-    @Resource
     private MqttOutConfig.MqttGateway mqttGateway;
 
+    // MQTT连接相关配置
+    private static final String MQTT_URL = "ssl://114.80.201.143:8883";
+    private static final String MQTT_TOPIC = "iotInfo/+";
+    private static final int KEEP_ALIVE_INTERVAL = 60;
+    private static final int COMPLETION_TIMEOUT = 5000;
+
     private SnowflakeIdGenerator idGenerator;
 
     @Autowired
@@ -178,8 +186,20 @@ public class IotDataTransferService {
             List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
             Integer totalDevices = transferVO.getDevices().size();
 
+            // 统计各类型设备数据数量
+            int tempCount = 0, humidityCount = 0, coCount = 0, o2Count = 0, co2Count = 0;
+            for (JSONObject dataItem : deviceData) {
+                if (dataItem.containsKey("wd") && dataItem.getFloat("wd") != null) tempCount++;
+                if (dataItem.containsKey("sd") && dataItem.getFloat("sd") != null) humidityCount++;
+                if (dataItem.containsKey("co") && dataItem.getFloat("co") != null) coCount++;
+                if (dataItem.containsKey("o2") && dataItem.getFloat("o2") != null) o2Count++;
+                if (dataItem.containsKey("co2") && dataItem.getFloat("co2") != null) co2Count++;
+            }
+
             log.info("开始推送温湿度及气体浓度数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
                     deviceType, totalDevices, deviceData.size());
+            log.info("各类型设备数据数量:温度{}条,湿度{}条,一氧化碳{}条,氧气{}条,二氧化碳{}条",
+                    tempCount, humidityCount, coCount, o2Count, co2Count);
 
             if (deviceData.isEmpty()) {
                 log.warn("没有获取到空气质量数据!设备类型:{}", deviceType);
@@ -387,7 +407,14 @@ public class IotDataTransferService {
         }
     }
 
-    // 提取公共发送方法,减少代码冗余
+    /**
+     * 推送温度信息(701)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("wd");
         if (value == null) {
@@ -404,6 +431,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
     }
 
+    /**
+     * 推送湿度信息(702)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("sd");
         if (value == null) {
@@ -420,6 +455,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
     }
 
+    /**
+     * 推送氧气浓度信息(705)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("o2");
         if (value == null) {
@@ -436,6 +479,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
     }
 
+    /**
+     * 推送一氧化碳浓度信息(706)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("co");
         if (value == null) {
@@ -452,6 +503,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
     }
 
+    /**
+     * 推送二氧化碳浓度信息(707)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("co2");
         if (value == null) {
@@ -468,15 +527,233 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
     }
 
+    /**
+     * 发送倾斜数据(712)
+     *
+     * @return 推送结果,包含成功数和失败数
+     **/
+    public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
+        }
+
+        try {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("qx");
+                if (value == null) {
+                    log.warn("设备{}的倾斜数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                TiltVO vo = new TiltVO();
+                vo.setDataPacketID(generateDataPacketID());
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
+                vo.setPublishTime(getCurrentTime());
+                vo.setDataEndTime(dataEndTime);
+                vo.setSensorValue(value == 0 ? 0 : 1);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
+            }
+
+            log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
+        } catch (Exception e) {
+            log.error("倾斜数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
+        }
+    }
+
+    /**
+     * 发送裂缝数据(713)
+     *
+     * @return 推送结果,包含成功数和失败数
+     **/
+    public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
+        }
+
+        try {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("cd");
+                if (value == null) {
+                    log.warn("设备{}的裂缝数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                CrackVO vo = new CrackVO();
+                vo.setDataPacketID(generateDataPacketID());
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
+                vo.setPublishTime(getCurrentTime());
+                vo.setDataEndTime(dataEndTime);
+                vo.setSensorValue(value == 0 ? 0 : 1);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
+            }
+
+            log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
+        } catch (Exception e) {
+            log.error("裂缝数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
+        }
+    }
+
+    /**
+     * 发送位移数据(714)
+     *
+     * @return 推送结果,包含成功数和失败数
+     **/
+    public Map<String, Integer> sendDeviationData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
+        }
+
+        try {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送位移数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到位移数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("wy");
+                if (value == null) {
+                    log.warn("设备{}的位移数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                DeviationVO vo = new DeviationVO();
+                vo.setDataPacketID(generateDataPacketID());
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
+                vo.setPublishTime(getCurrentTime());
+                vo.setDataEndTime(dataEndTime);
+                vo.setSensorValue(value == 0 ? 0 : 1);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
+            }
+
+            log.info("位移数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
+        } catch (Exception e) {
+            log.error("位移数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
+        }
+    }
+
     /**
      * 同步设备数据
      * @param tenantId 租户ID
      * @param engineeringId 工程ID
+     * @param username MQTT用户名
+     * @param password MQTT密码
      */
-    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
         // 参数校验
-        if (tenantId == null || engineeringId == null) {
-            log.error("租户ID或工程ID不能为空");
+        if (engineeringId == null || username == null || password == null) {
+            log.error("工程ID、MQTT用户名或密码不能为空");
             return;
         }
 
@@ -488,7 +765,7 @@ public class IotDataTransferService {
         }
 
         // 查询设备列表
-        List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList, tenantId);
+        List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList);
         if (deviceList.isEmpty()) {
             log.warn("租户{}不存在任何设备", tenantId);
             return;
@@ -514,6 +791,9 @@ public class IotDataTransferService {
                 .collect(Collectors.groupingBy(DmpDevice::getProductCode,
                         Collectors.mapping(DmpDevice::getDeviceUuid, Collectors.toList())));
 
+        // 创建MQTT连接
+        createMqttConnection(username, password);
+
         // 任务开始日志
         Integer totalDevices = deviceList.size();
         Integer totalProductTypes = codeDeviceUuidsMap.size();
@@ -552,6 +832,16 @@ public class IotDataTransferService {
                 case 704:
                     result = sendElectricityLoad(transferVO);
                     break;
+                case 712:
+                    result = sendTiltData(transferVO);
+                    break;
+                case 713:
+                    result = sendCrackData(transferVO);
+                    break;
+                case 714:
+                    result = sendDeviationData(transferVO);
+                    break;
+
                 default:
                     log.debug("不支持的设备类型:{}", deviceType);
                     continue;
@@ -577,7 +867,7 @@ public class IotDataTransferService {
     private List<String> getDeviceTypeListByTenant(Integer tenantId) {
         LambdaQueryWrapper<DmpProduct> productQueryWrapper = new LambdaQueryWrapper<>();
         productQueryWrapper.select(DmpProduct::getProductCode)
-                .eq(DmpProduct::getTenantId, tenantId)
+                .eq(tenantId != null && tenantId > 0, DmpProduct::getTenantId, tenantId)
                 .eq(DmpProduct::getDeleteFlag, 0);
         List<DmpProduct> productList = dmpProductMapper.selectList(productQueryWrapper);
         return productList.stream()
@@ -591,7 +881,7 @@ public class IotDataTransferService {
      * @param productCodeList 设备类型列表
      * @return 设备列表
      */
-    private List<DmpDevice> getDeviceListByType(List<String> productCodeList, Integer tenantId) {
+    private List<DmpDevice> getDeviceListByType(List<String> productCodeList) {
         LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.select(DmpDevice::getDeviceUuid, DmpDevice::getDeviceType, DmpDevice::getDeviceId, DmpDevice::getProductCode)
                 .in(DmpDevice::getProductCode, productCodeList)
@@ -601,6 +891,60 @@ public class IotDataTransferService {
         return dmpDeviceMapper.selectList(queryWrapper);
     }
 
+    /**
+     * 手动创建MQTT连接
+     * @param username MQTT用户名
+     * @param password MQTT密码
+     */
+    private void createMqttConnection(String username, String password) {
+        try {
+            // 创建MqttBaseConfig实例并设置参数
+            MqttBaseConfig mqttBaseConfig = new MqttBaseConfig();
+            // 手动设置所有MQTT配置参数
+            mqttBaseConfig.setUsername(username);
+            mqttBaseConfig.setPassword(password);
+            mqttBaseConfig.setHostUrl(MQTT_URL);
+            mqttBaseConfig.setMsgTopic(MQTT_TOPIC);
+            mqttBaseConfig.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
+            mqttBaseConfig.setCompletionTimeout(COMPLETION_TIMEOUT);
+
+            // 创建MqttOutConfig实例
+            MqttOutConfig mqttOutConfig = new MqttOutConfig();
+            mqttOutConfig.mqttBaseConfig = mqttBaseConfig;
+
+            // 使用Spring的ApplicationContext手动创建Bean
+            ApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
+            if (context == null) {
+                throw new IllegalStateException("ApplicationContext未找到,无法创建MQTT相关Bean");
+            }
+
+            // 注册MqttBaseConfig为Bean
+            ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) context).getBeanFactory();
+            beanFactory.registerSingleton("mqttBaseConfig", mqttBaseConfig);
+
+            // 创建并注册mqttClientFactory
+            MqttConnectOptions options = new MqttConnectOptions();
+            options.setServerURIs(new String[]{mqttBaseConfig.getHostUrl()});
+            options.setUserName(mqttBaseConfig.getUsername());
+            options.setPassword(mqttBaseConfig.getPassword().toCharArray());
+            options.setKeepAliveInterval(mqttBaseConfig.getKeepAliveInterval());
+
+            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+            factory.setConnectionOptions(options);
+            beanFactory.registerSingleton("mqttClientFactory", factory);
+
+            // 注册MqttOutConfig为Bean
+            beanFactory.registerSingleton("mqttOutConfig", mqttOutConfig);
+
+            // 获取mqttGateway
+            this.mqttGateway = context.getBean(MqttOutConfig.MqttGateway.class);
+            log.info("MQTT连接创建成功");
+        } catch (Exception e) {
+            log.error("创建MQTT连接失败: {}", e.getMessage(), e);
+            throw new RuntimeException("创建MQTT连接失败", e);
+        }
+    }
+
     /**
      * 验证MQTT网关是否初始化
      * @return 是否初始化
@@ -639,4 +983,9 @@ public class IotDataTransferService {
         // 不再记录每条数据的详情,只记录发送操作
         mqttGateway.sendToMqtt(topic, json);
     }
+
+    public void allData(Long engineeringId, String username, String password) {
+        Integer tenantId = 0;
+        synchronizeDeviceData(tenantId, engineeringId, username, password);
+    }
 }

+ 20 - 17
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java

@@ -346,6 +346,21 @@ public class DeviceDataQuery {
                     simulationData.put("leakageCurrent", formatNumber(ThreadLocalRandom.current().nextDouble(LEAKAGE_CURRENT_RANGE_MIN, LEAKAGE_CURRENT_RANGE_MAX), FORMAT_4_2));
                     break;
 
+                // 倾斜(712)
+                case 712:
+                    simulationData.put("qx", 0);
+                    break;
+
+                // 裂缝(713)
+                case 713:
+                    simulationData.put("cd", 0);
+                    break;
+
+                // 位移(714)
+                case 714:
+                    simulationData.put("wy", 0);
+                    break;
+
                 default:
                     log.warn("未知设备类型:{},无法生成模拟数据", deviceType);
                     break;
@@ -373,31 +388,19 @@ public class DeviceDataQuery {
 
     // ========== 获取带缓存的电流值 ==========
     private double getCurrentValue(String deviceId, String phase) {
+        String cacheKey = (deviceId == null ? "dummy" : deviceId) + "_" + phase;
+        int count = currentReuseCountCache.getOrDefault(cacheKey, 0);
 
-        // 构建设备+相的唯一缓存Key(核心!)
-        String cacheKey = deviceId + "_" + phase;
-        // 后续逻辑不变,只是把所有phase替换为cacheKey
-        int currentReuseCount = currentReuseCountCache.getOrDefault(cacheKey, 0);
-
-        System.out.println("cacheKey=" + cacheKey +
-                " 命中=" + currentValueCache.containsKey(cacheKey) +
-                " 复用次数=" + currentReuseCountCache.getOrDefault(cacheKey, 0) +
-                "值=" + currentValueCache.get(cacheKey));
-
-        if (!currentValueCache.containsKey(cacheKey) || currentReuseCount >= MAX_REUSE_COUNT) {
+        if (!currentValueCache.containsKey(cacheKey) || count >= MAX_REUSE_COUNT) {
             return generateNewCurrent(cacheKey);
         }
 
         boolean reuse = ThreadLocalRandom.current().nextDouble() < REUSE_PROBABILITY;
         if (reuse) {
-            int newReuseCount = currentReuseCount + 1;
-            currentReuseCountCache.put(cacheKey, newReuseCount);
-            System.out.println("复用值=" + currentValueCache.get(cacheKey));
+            currentReuseCountCache.put(cacheKey, count + 1);
             return currentValueCache.get(cacheKey);
         } else {
-            double newValue = generateNewCurrent(cacheKey);
-            System.out.println("生成新值=" + newValue);
-            return newValue;
+            return generateNewCurrent(cacheKey);
         }
     }
 

+ 12 - 12
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,16 +26,16 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    @Scheduled(fixedDelay = 25 * 60 * 1000, initialDelay = 0)
-    public void scheduledDeviceDataSync() {
-        Integer tenantId = 1205;
-        Long engineeringId = 3101070011L;
-        log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
-
-        try {
-            iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
-        } catch (Exception e) {
-            log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
-        }
-    }
+    // @Scheduled(fixedDelay = 25 * 60 * 1000, initialDelay = 0)
+    // public void scheduledDeviceDataSync() {
+    //     Integer tenantId = 1206;
+    //     Long engineeringId = 3101170019L;
+    //     log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+    //
+    //     try {
+    //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
+    //     } catch (Exception e) {
+    //         log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
+    //     }
+    // }
 }

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/CrackVO.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.service.vo.base;
+
+import lombok.Data;
+
+/**
+ * 裂缝数据推送VO
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/18
+ */
+@Data
+public class CrackVO extends BaseEnvMonitorPushVO {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 裂缝宽度值
+     * 类型:int 0、1 有、无
+     */
+    private Integer sensorValue;
+
+    @Override
+    public Number getSensorValue() {
+        return sensorValue;
+    }
+
+    @Override
+    protected void validateSensorValue() {
+        // 裂缝宽度值校验规则,根据实际业务需求调整
+        if (sensorValue == null) {
+            throw new IllegalArgumentException("裂缝宽度值(sensorValue)为必填项");
+        }
+    }
+}

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/DeviationVO.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.service.vo.base;
+
+import lombok.Data;
+
+/**
+ * 位移数据推送VO
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/18
+ */
+@Data
+public class DeviationVO extends BaseEnvMonitorPushVO {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 位移值
+     * 类型:int 0、1 有、无
+     */
+    private Integer sensorValue;
+
+    @Override
+    public Number getSensorValue() {
+        return sensorValue;
+    }
+
+    @Override
+    protected void validateSensorValue() {
+        // 位移值校验规则,根据实际业务需求调整
+        if (sensorValue == null) {
+            throw new IllegalArgumentException("位移值(sensorValue)为必填项");
+        }
+    }
+}

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/TiltVO.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.service.vo.base;
+
+import lombok.Data;
+
+/**
+ * 倾斜数据推送VO
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/18
+ */
+@Data
+public class TiltVO extends BaseEnvMonitorPushVO {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 倾斜角度值
+     * 类型:int 0、1 有、无
+     */
+    private Integer sensorValue;
+
+    @Override
+    public Number getSensorValue() {
+        return sensorValue;
+    }
+
+    @Override
+    protected void validateSensorValue() {
+        // 倾斜角度值校验规则,根据实际业务需求调整
+        if (sensorValue == null) {
+            throw new IllegalArgumentException("倾斜角度值(sensorValue)为必填项");
+        }
+    }
+}

+ 8 - 3
service-job/src/main/java/com/ruoyi/job/task/RyTask.java

@@ -85,9 +85,14 @@ public class RyTask {
         remotePmService.reportSubmissionReminder();
     }
 
-    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
-        System.out.println("synchronizeDeviceData start......");
-        remoteCdiTaskService.synchronizeDeviceData(tenantId, engineeringId);
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        System.out.println("租户: " + tenantId + "的人防监测数据推送定时任务开始执行......");
+        remoteCdiTaskService.synchronizeDeviceData(tenantId, engineeringId, username, password);
+    }
+
+    public void allData(Long engineeringId, String username, String password) {
+        System.out.println("人防监测数据推送定时任务开始执行......");
+        remoteCdiTaskService.allData(engineeringId, username, password);
     }
 
 }