فهرست منبع

Merge branch 'han' of uskycloud/usky-modules into master

fuyuchuan 6 روز پیش
والد
کامیت
84ba5a6348
16فایلهای تغییر یافته به همراه643 افزوده شده و 81 حذف شده
  1. 8 1
      service-cdi/service-cdi-api/src/main/java/com/usky/cdi/RemotecdiTaskService.java
  2. 6 1
      service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/RemotecdiTaskFactory.java
  3. 2 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/RuoYiSystemApplication.java
  4. 7 2
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/RemotecdiTaskApi.java
  5. 61 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java
  6. 3 11
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttBaseConfig.java
  7. 1 6
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttInConfig.java
  8. 39 11
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java
  9. 10 1
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/enums/EnvMonitorMqttTopic.java
  10. 356 12
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java
  11. 26 21
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java
  12. 14 12
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java
  13. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/CrackVO.java
  14. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/DeviationVO.java
  15. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/TiltVO.java
  16. 8 3
      service-job/src/main/java/com/ruoyi/job/task/RyTask.java

+ 8 - 1
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/RemotecdiTaskService.java

@@ -9,5 +9,12 @@ import org.springframework.web.bind.annotation.RequestParam;
 public interface RemotecdiTaskService {
     @GetMapping("/synchronizeDeviceData")
     void synchronizeDeviceData(@RequestParam("tenantId") Integer tenantId,
-                               @RequestParam("engineeringId") Long engineeringId);
+                               @RequestParam("engineeringId") Long engineeringId,
+                               @RequestParam("username") String username,
+                               @RequestParam("password") String password);
+
+    @GetMapping("/allData")
+    void allData(@RequestParam("engineeringId") Long engineeringId,
+                               @RequestParam("username") String username,
+                               @RequestParam("password") String password);
 }

+ 6 - 1
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/RemotecdiTaskFactory.java

@@ -16,7 +16,12 @@ public class RemotecdiTaskFactory implements FallbackFactory<RemotecdiTaskServic
         log.error("用户服务调用失败:{}", throwable.getMessage());
         return new RemotecdiTaskService() {
             @Override
-            public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
+            public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+                throw new FeignBadRequestException(500, "人防设备数据定时推送异常(带租户)" + throwable.getMessage());
+            }
+
+            @Override
+            public void allData(Long engineeringId, String username, String password) {
                 throw new FeignBadRequestException(500, "人防设备数据定时推送异常" + throwable.getMessage());
             }
 

+ 2 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/RuoYiSystemApplication.java

@@ -11,6 +11,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.core.env.Environment;
+import org.springframework.integration.config.EnableIntegration;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 import java.net.InetAddress;
@@ -27,6 +28,7 @@ import java.net.UnknownHostException;
 @EnableFeignClients(basePackages = "com.usky")
 @MapperScan(value = "com.usky.cdi.mapper")
 @ComponentScan("com.usky")
+@EnableIntegration // 启用Spring Integration
 @SpringBootApplication
 public class RuoYiSystemApplication
 {

+ 7 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/RemotecdiTaskApi.java

@@ -19,7 +19,12 @@ public class RemotecdiTaskApi implements RemotecdiTaskService {
     private IotDataTransferService iotDataTransferService;
 
     @Override
-    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
-        iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);
+    }
+
+    @Override
+    public void allData(Long engineeringId, String username, String password) {
+        iotDataTransferService.allData(engineeringId, username, password);
     }
 }

+ 61 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java

@@ -0,0 +1,61 @@
+package com.usky.cdi.service.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 设备字段映射配置类
+ * 替代配置文件中的device.field-mapping配置
+ * @author fyc
+ * @date 2025/12/20
+ */
+@Configuration
+public class DeviceFieldConfig {
+
+    /**
+     * 设备类型与字段映射配置
+     * @return 设备字段映射Map
+     */
+    @Bean
+    public Map<String, String> deviceFieldMapping() {
+        Map<String, String> fieldMapping = new HashMap<>();
+
+        // 水浸传感器(702)
+        fieldMapping.put("702", "leach_status");
+
+        // 人员统计(703)
+        fieldMapping.put("703", "amount_into,amount_out,day_into,day_out");
+
+        // 电气火灾(704)
+        fieldMapping.put("704", "voltage_a,voltage_b,voltage_c,current_a,current_b,current_c,temperature_a,temperature_b,temperature_c,current_residual,active_power");
+
+        // 温度传感器(707)
+        fieldMapping.put("707", "wd");
+
+        // 湿度传感器(708)
+        fieldMapping.put("708", "sd");
+
+        // 氧气传感器(709)
+        fieldMapping.put("709", "o2");
+
+        // 二氧化碳传感器(710)
+        fieldMapping.put("710", "co2");
+
+        // 一氧化碳传感器(711)
+        fieldMapping.put("711", "co");
+
+        // 倾斜传感器
+        fieldMapping.put("712", "qx");
+
+        // 裂缝传感器
+        fieldMapping.put("713", "cd");
+
+        // 位移传感器
+        fieldMapping.put("714", "wy");
+
+        return fieldMapping;
+    }
+}

+ 3 - 11
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttBaseConfig.java

@@ -10,33 +10,24 @@ import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
 import org.springframework.stereotype.Component;
 
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 @Data
-@Component
-@ConfigurationProperties(prefix = "mqtt")
 public class MqttBaseConfig {
 
-    @Value("${mqtt.username}")
     private String username;
 
-    @Value("${mqtt.password}")
     private String password;
 
-    @Value("${mqtt.url}")
     private String hostUrl;
 
-    @Value("${mqtt.sub-topics}")
     private String msgTopic;
 
-    @Value("${mqtt.keep-alive-interval}")
     //心跳间隔
     private int keepAliveInterval;
-    @Value("${mqtt.completionTimeout}")
-    //心跳间隔
+    
+    //完成超时
     private int completionTimeout;
 
 
-    @Bean
     public MqttPahoClientFactory mqttClientFactory() {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         MqttConnectOptions options = new MqttConnectOptions();
@@ -45,6 +36,7 @@ public class MqttBaseConfig {
         if (this.getPassword() != null) {
             options.setPassword(this.getPassword().toCharArray());
         }
+        options.setKeepAliveInterval(this.getKeepAliveInterval());
         factory.setConnectionOptions(options);
         return factory;
     }

+ 1 - 6
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttInConfig.java

@@ -14,15 +14,11 @@ import org.springframework.messaging.MessageChannel;
  * @author han
  * @date 2025/03/20 14:30
  */
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
-@Configuration
 public class MqttInConfig {
-    @Autowired
-    private MqttBaseConfig mqttBaseConfig;
+    public MqttBaseConfig mqttBaseConfig;
 
     public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
 
-    @Bean(name = CHANNEL_NAME_INPUT)
     public MessageChannel mqttInputChannel() {
         return new DirectChannel();
     }
@@ -33,7 +29,6 @@ public class MqttInConfig {
      *
      * @return
      */
-    @Bean
     public MessageProducer inbound() {
         String msgTopic = mqttBaseConfig.getMsgTopic();
         if (msgTopic == null || msgTopic.trim().isEmpty()) {

+ 39 - 11
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -1,6 +1,7 @@
 package com.usky.cdi.service.config.mqtt;
 
 import com.alibaba.fastjson.JSONObject;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -8,11 +9,13 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.integration.annotation.MessagingGateway;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
 
 import java.util.Map;
 
@@ -20,10 +23,8 @@ import java.util.Map;
  * @author han
  * @date 2025/03/20 14:31
  */
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
-@Configuration
+@Component
 public class MqttOutConfig {
-    @Autowired
     public MqttBaseConfig mqttBaseConfig;
 
     public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
@@ -42,24 +43,51 @@ public class MqttOutConfig {
         return new DirectChannel();
     }
 
+
     /**
-     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
-     *
-     * @return
+     * MQTT消息发送处理器
+     * 注意:这个方法会被Spring自动创建,用于处理mqttOutboundChannel通道上的消息
+     * 
+     * @param factory MQTT客户端工厂
+     * @return MessageHandler实例
      */
     @Bean(name = MESSAGE_NAME)
     @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler outbound() {
-        // 在这里进行mqttOutboundChannel的相关设置
-        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
-        MqttPahoMessageHandler messageHandler =
-                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+    public MessageHandler outbound(DefaultMqttPahoClientFactory factory) {
+        // 注意:这里的client-id暂时使用固定值,因为username在启动时还不可用
+        // 实际使用时,会在createMqttConnection方法中重新设置
+        String clientId = "mqttx-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler = 
+                new MqttPahoMessageHandler(clientId, factory);
         //如果设置成true,发送消息时将不会阻塞。
         messageHandler.setAsync(true);
         messageHandler.setDefaultTopic(DEFAULT_TOPIC);
         return messageHandler;
     }
+    
+    /**
+     * MQTT客户端工厂
+     * 注意:这个方法会被Spring自动创建,用于创建MQTT客户端
+     * 
+     * @return DefaultMqttPahoClientFactory实例
+     */
+    @Bean
+    public DefaultMqttPahoClientFactory mqttClientFactory() {
+        // 创建默认的MqttPahoClientFactory
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        
+        // 设置默认的MqttConnectOptions,确保serverURIs不为null
+        // 实际使用时,会在createMqttConnection方法中重新配置
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"}); // 设置默认的服务器地址
+        options.setKeepAliveInterval(60); // 设置默认的心跳间隔
+        factory.setConnectionOptions(options);
+        
+        return factory;
+    }
 
+    // 注意:这个接口需要被Spring扫描到,所以我们保留@MessagingGateway注解
+    // Spring会自动创建这个接口的实现类
     @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
     public interface MqttGateway {
         /**

+ 10 - 1
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/enums/EnvMonitorMqttTopic.java

@@ -31,7 +31,16 @@ public enum EnvMonitorMqttTopic {
     PERSON_PRESENCE("iotInfo/personPresence"),
 
     // 用电负荷(后续用电负荷用)
-    ELECTRICITY_LOAD("iotInfo/electricityLoad");
+    ELECTRICITY_LOAD("iotInfo/electricityLoad"),
+
+    // 倾斜
+    TILT("iotInfo/tilt"),
+
+    // 裂缝
+    CRACK("iotInfo/crack"),
+
+    // 位移
+    DEVIATION("iotInfo/deviation");
 
     private final String topic;
 

+ 356 - 12
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -7,7 +7,7 @@ import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.domain.DmpProduct;
 import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.mapper.DmpProductMapper;
-//import com.usky.cdi.service.config.mqtt.MqttGateway;
+import com.usky.cdi.service.config.mqtt.MqttBaseConfig;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
 import com.usky.cdi.service.util.DeviceDataQuery;
@@ -15,14 +15,20 @@ import com.usky.cdi.service.util.SnowflakeIdGenerator;
 import com.usky.cdi.service.vo.IotDataTransferVO;
 import com.usky.cdi.service.vo.base.*;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.stereotype.Service;
+import org.springframework.web.context.ContextLoader;
 
 import javax.annotation.PostConstruct;
 
-import javax.annotation.Resource;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -39,11 +45,19 @@ import java.util.stream.Collectors;
  */
 @Slf4j
 @Service
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 public class IotDataTransferService {
 
-    @Resource
     private MqttOutConfig.MqttGateway mqttGateway;
+    
+    // 注入ApplicationContext,确保总是能获取到
+    @Autowired
+    private ApplicationContext context;
+    
+    // MQTT连接相关配置
+    private static final String MQTT_URL = "ssl://114.80.201.143:8883";
+    private static final String MQTT_TOPIC = "iotInfo/+";
+    private static final int KEEP_ALIVE_INTERVAL = 60;
+    private static final int COMPLETION_TIMEOUT = 5000;
 
     private SnowflakeIdGenerator idGenerator;
 
@@ -178,8 +192,20 @@ public class IotDataTransferService {
             List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
             Integer totalDevices = transferVO.getDevices().size();
 
+            // 统计各类型设备数据数量
+            int tempCount = 0, humidityCount = 0, coCount = 0, o2Count = 0, co2Count = 0;
+            for (JSONObject dataItem : deviceData) {
+                if (dataItem.containsKey("wd") && dataItem.getFloat("wd") != null) tempCount++;
+                if (dataItem.containsKey("sd") && dataItem.getFloat("sd") != null) humidityCount++;
+                if (dataItem.containsKey("co") && dataItem.getFloat("co") != null) coCount++;
+                if (dataItem.containsKey("o2") && dataItem.getFloat("o2") != null) o2Count++;
+                if (dataItem.containsKey("co2") && dataItem.getFloat("co2") != null) co2Count++;
+            }
+
             log.info("开始推送温湿度及气体浓度数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
                     deviceType, totalDevices, deviceData.size());
+            log.info("各类型设备数据数量:温度{}条,湿度{}条,一氧化碳{}条,氧气{}条,二氧化碳{}条",
+                    tempCount, humidityCount, coCount, o2Count, co2Count);
 
             if (deviceData.isEmpty()) {
                 log.warn("没有获取到空气质量数据!设备类型:{}", deviceType);
@@ -387,7 +413,14 @@ public class IotDataTransferService {
         }
     }
 
-    // 提取公共发送方法,减少代码冗余
+    /**
+     * 推送温度信息(701)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("wd");
         if (value == null) {
@@ -404,6 +437,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
     }
 
+    /**
+     * 推送湿度信息(702)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("sd");
         if (value == null) {
@@ -420,6 +461,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
     }
 
+    /**
+     * 推送氧气浓度信息(705)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("o2");
         if (value == null) {
@@ -436,6 +485,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
     }
 
+    /**
+     * 推送一氧化碳浓度信息(706)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("co");
         if (value == null) {
@@ -452,6 +509,14 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
     }
 
+    /**
+     * 推送二氧化碳浓度信息(707)
+     *
+     * @param deviceDataItem 设备数据
+     * @param deviceId 设备ID
+     * @param dataEndTime 数据结束时间
+     * @param engineeringID 工程ID
+     **/
     private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("co2");
         if (value == null) {
@@ -468,15 +533,233 @@ public class IotDataTransferService {
         sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
     }
 
+    /**
+     * 发送倾斜数据(712)
+     *
+     * @return 推送结果,包含成功数和失败数
+     **/
+    public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
+        }
+
+        try {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("qx");
+                if (value == null) {
+                    log.warn("设备{}的倾斜数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                TiltVO vo = new TiltVO();
+                vo.setDataPacketID(generateDataPacketID());
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
+                vo.setPublishTime(getCurrentTime());
+                vo.setDataEndTime(dataEndTime);
+                vo.setSensorValue(value == 0 ? 0 : 1);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
+            }
+
+            log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
+        } catch (Exception e) {
+            log.error("倾斜数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
+        }
+    }
+
+    /**
+     * 发送裂缝数据(713)
+     *
+     * @return 推送结果,包含成功数和失败数
+     **/
+    public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
+        }
+
+        try {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("cd");
+                if (value == null) {
+                    log.warn("设备{}的裂缝数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                CrackVO vo = new CrackVO();
+                vo.setDataPacketID(generateDataPacketID());
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
+                vo.setPublishTime(getCurrentTime());
+                vo.setDataEndTime(dataEndTime);
+                vo.setSensorValue(value == 0 ? 0 : 1);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
+            }
+
+            log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
+        } catch (Exception e) {
+            log.error("裂缝数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
+        }
+    }
+
+    /**
+     * 发送位移数据(714)
+     *
+     * @return 推送结果,包含成功数和失败数
+     **/
+    public Map<String, Integer> sendDeviationData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
+        }
+
+        try {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送位移数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到位移数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("wy");
+                if (value == null) {
+                    log.warn("设备{}的位移数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                DeviationVO vo = new DeviationVO();
+                vo.setDataPacketID(generateDataPacketID());
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
+                vo.setPublishTime(getCurrentTime());
+                vo.setDataEndTime(dataEndTime);
+                vo.setSensorValue(value == 0 ? 0 : 1);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
+            }
+
+            log.info("位移数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
+        } catch (Exception e) {
+            log.error("位移数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
+        }
+    }
+
     /**
      * 同步设备数据
      * @param tenantId 租户ID
      * @param engineeringId 工程ID
+     * @param username MQTT用户名
+     * @param password MQTT密码
      */
-    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
         // 参数校验
-        if (tenantId == null || engineeringId == null) {
-            log.error("租户ID或工程ID不能为空");
+        if (engineeringId == null || username == null || password == null) {
+            log.error("工程ID、MQTT用户名或密码不能为空");
             return;
         }
 
@@ -488,7 +771,7 @@ public class IotDataTransferService {
         }
 
         // 查询设备列表
-        List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList, tenantId);
+        List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList);
         if (deviceList.isEmpty()) {
             log.warn("租户{}不存在任何设备", tenantId);
             return;
@@ -514,6 +797,9 @@ public class IotDataTransferService {
                 .collect(Collectors.groupingBy(DmpDevice::getProductCode,
                         Collectors.mapping(DmpDevice::getDeviceUuid, Collectors.toList())));
 
+        // 创建MQTT连接
+        createMqttConnection(username, password);
+
         // 任务开始日志
         Integer totalDevices = deviceList.size();
         Integer totalProductTypes = codeDeviceUuidsMap.size();
@@ -552,6 +838,16 @@ public class IotDataTransferService {
                 case 704:
                     result = sendElectricityLoad(transferVO);
                     break;
+                case 712:
+                    result = sendTiltData(transferVO);
+                    break;
+                case 713:
+                    result = sendCrackData(transferVO);
+                    break;
+                case 714:
+                    result = sendDeviationData(transferVO);
+                    break;
+
                 default:
                     log.debug("不支持的设备类型:{}", deviceType);
                     continue;
@@ -577,7 +873,7 @@ public class IotDataTransferService {
     private List<String> getDeviceTypeListByTenant(Integer tenantId) {
         LambdaQueryWrapper<DmpProduct> productQueryWrapper = new LambdaQueryWrapper<>();
         productQueryWrapper.select(DmpProduct::getProductCode)
-                .eq(DmpProduct::getTenantId, tenantId)
+                .eq(tenantId != null && tenantId > 0, DmpProduct::getTenantId, tenantId)
                 .eq(DmpProduct::getDeleteFlag, 0);
         List<DmpProduct> productList = dmpProductMapper.selectList(productQueryWrapper);
         return productList.stream()
@@ -591,7 +887,7 @@ public class IotDataTransferService {
      * @param productCodeList 设备类型列表
      * @return 设备列表
      */
-    private List<DmpDevice> getDeviceListByType(List<String> productCodeList, Integer tenantId) {
+    private List<DmpDevice> getDeviceListByType(List<String> productCodeList) {
         LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.select(DmpDevice::getDeviceUuid, DmpDevice::getDeviceType, DmpDevice::getDeviceId, DmpDevice::getProductCode)
                 .in(DmpDevice::getProductCode, productCodeList)
@@ -601,6 +897,49 @@ public class IotDataTransferService {
         return dmpDeviceMapper.selectList(queryWrapper);
     }
 
+    /**
+     * 手动创建MQTT连接
+     * @param username MQTT用户名
+     * @param password MQTT密码
+     */
+    private void createMqttConnection(String username, String password) {
+        try {
+            // 使用注入的ApplicationContext获取已有的mqttGateway实例
+            // 因为我们保留了@MessagingGateway注解,Spring会自动创建这个实例
+            if (this.context == null) {
+                throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
+            }
+            
+            // 1. 获取mqttGateway实例
+            this.mqttGateway = this.context.getBean(MqttOutConfig.MqttGateway.class);
+            if (this.mqttGateway == null) {
+                throw new IllegalStateException("MQTT Gateway未找到,无法发送消息");
+            }
+            
+            // 2. 获取现有的mqttClientFactory实例
+            DefaultMqttPahoClientFactory mqttClientFactory = this.context.getBean(DefaultMqttPahoClientFactory.class);
+            if (mqttClientFactory == null) {
+                throw new IllegalStateException("MQTT Client Factory未找到,无法创建MQTT连接");
+            }
+            
+            // 3. 创建并配置MqttConnectOptions
+            MqttConnectOptions options = new MqttConnectOptions();
+            options.setServerURIs(new String[]{MQTT_URL});
+            options.setUserName(username);
+            options.setPassword(password.toCharArray());
+            options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
+            
+            // 4. 更新mqttClientFactory的连接选项
+            mqttClientFactory.setConnectionOptions(options);
+            
+            log.info("MQTT Gateway初始化成功,用户名:{}", username);
+            log.info("MQTT连接配置完成,服务器地址:{},客户端ID:mqttx-{}", MQTT_URL, username);
+        } catch (Exception e) {
+            log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
+            throw new RuntimeException("初始化MQTT连接失败", e);
+        }
+    }
+
     /**
      * 验证MQTT网关是否初始化
      * @return 是否初始化
@@ -639,4 +978,9 @@ public class IotDataTransferService {
         // 不再记录每条数据的详情,只记录发送操作
         mqttGateway.sendToMqtt(topic, json);
     }
+
+    public void allData(Long engineeringId, String username, String password) {
+        Integer tenantId = 0;
+        synchronizeDeviceData(tenantId, engineeringId, username, password);
+    }
 }

+ 26 - 21
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.mapper.DmpDeviceMapper;
+import com.usky.cdi.service.config.DeviceFieldConfig;
 import com.usky.cdi.service.vo.IotDataTransferVO;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -28,7 +29,6 @@ import java.util.stream.Collectors;
 @Data
 @Slf4j
 @Component
-@ConfigurationProperties(prefix = "device")
 public class DeviceDataQuery {
     @Autowired
     private DmpDeviceMapper dmpDeviceMapper;
@@ -36,7 +36,9 @@ public class DeviceDataQuery {
     private String baseUrl;
     @Value("${device.data.simulation}")
     private boolean simulation;
-    private Map<String, String> deviceFieldMapping;
+
+    @Autowired
+    private DeviceFieldConfig deviceFieldConfig;
 
     // 定义各参数的格式化器(整数位,小数位)
     private static final DecimalFormat FORMAT_2_2 = new DecimalFormat("00.00"); // 2位整数+2位小数
@@ -218,7 +220,7 @@ public class DeviceDataQuery {
             return Collections.singletonList("time");
         }
 
-        String fieldsStr = deviceFieldMapping.get(deviceType.toString());
+        String fieldsStr = deviceFieldConfig.deviceFieldMapping().get(deviceType.toString());
         if (fieldsStr == null || fieldsStr.trim().isEmpty()) {
             log.warn("获取目标字段失败:设备类型{}对应的字段映射不存在", deviceType);
             return Collections.singletonList("time");
@@ -344,6 +346,21 @@ public class DeviceDataQuery {
                     simulationData.put("leakageCurrent", formatNumber(ThreadLocalRandom.current().nextDouble(LEAKAGE_CURRENT_RANGE_MIN, LEAKAGE_CURRENT_RANGE_MAX), FORMAT_4_2));
                     break;
 
+                // 倾斜(712)
+                case 712:
+                    simulationData.put("qx", 0);
+                    break;
+
+                // 裂缝(713)
+                case 713:
+                    simulationData.put("cd", 0);
+                    break;
+
+                // 位移(714)
+                case 714:
+                    simulationData.put("wy", 0);
+                    break;
+
                 default:
                     log.warn("未知设备类型:{},无法生成模拟数据", deviceType);
                     break;
@@ -371,31 +388,19 @@ public class DeviceDataQuery {
 
     // ========== 获取带缓存的电流值 ==========
     private double getCurrentValue(String deviceId, String phase) {
+        String cacheKey = (deviceId == null ? "dummy" : deviceId) + "_" + phase;
+        int count = currentReuseCountCache.getOrDefault(cacheKey, 0);
 
-        // 构建设备+相的唯一缓存Key(核心!)
-        String cacheKey = deviceId + "_" + phase;
-        // 后续逻辑不变,只是把所有phase替换为cacheKey
-        int currentReuseCount = currentReuseCountCache.getOrDefault(cacheKey, 0);
-
-        System.out.println("cacheKey=" + cacheKey +
-                " 命中=" + currentValueCache.containsKey(cacheKey) +
-                " 复用次数=" + currentReuseCountCache.getOrDefault(cacheKey, 0) +
-                "值=" + currentValueCache.get(cacheKey));
-
-        if (!currentValueCache.containsKey(cacheKey) || currentReuseCount >= MAX_REUSE_COUNT) {
+        if (!currentValueCache.containsKey(cacheKey) || count >= MAX_REUSE_COUNT) {
             return generateNewCurrent(cacheKey);
         }
 
         boolean reuse = ThreadLocalRandom.current().nextDouble() < REUSE_PROBABILITY;
         if (reuse) {
-            int newReuseCount = currentReuseCount + 1;
-            currentReuseCountCache.put(cacheKey, newReuseCount);
-            System.out.println("复用值=" + currentValueCache.get(cacheKey));
+            currentReuseCountCache.put(cacheKey, count + 1);
             return currentValueCache.get(cacheKey);
         } else {
-            double newValue = generateNewCurrent(cacheKey);
-            System.out.println("生成新值=" + newValue);
-            return newValue;
+            return generateNewCurrent(cacheKey);
         }
     }
 
@@ -407,4 +412,4 @@ public class DeviceDataQuery {
         currentReuseCountCache.put(cacheKey, 0);
         return newValue;
     }
-}
+}

+ 14 - 12
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,16 +26,18 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    //@Scheduled(fixedDelay = 60 * 1000, initialDelay = 0)
-    public void scheduledDeviceDataSync() {
-        Integer tenantId = 1208;
-        Long engineeringId = 3101130019L;
-        log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
-
-        try {
-            iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
-        } catch (Exception e) {
-            log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
-        }
-    }
+    // @Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
+    // public void scheduledDeviceDataSync() {
+    //     Integer tenantId = 1205;
+    //     Long engineeringId = 3101070011L;
+    //     String username = "3101070011";
+    //     String password = "5RqhJ7VG";
+    //     log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+    //
+    //     try {
+    //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);
+    //     } catch (Exception e) {
+    //         log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
+    //     }
+    // }
 }

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/CrackVO.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.service.vo.base;
+
+import lombok.Data;
+
+/**
+ * 裂缝数据推送VO
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/18
+ */
+@Data
+public class CrackVO extends BaseEnvMonitorPushVO {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 裂缝宽度值
+     * 类型:int 0、1 有、无
+     */
+    private Integer sensorValue;
+
+    @Override
+    public Number getSensorValue() {
+        return sensorValue;
+    }
+
+    @Override
+    protected void validateSensorValue() {
+        // 裂缝宽度值校验规则,根据实际业务需求调整
+        if (sensorValue == null) {
+            throw new IllegalArgumentException("裂缝宽度值(sensorValue)为必填项");
+        }
+    }
+}

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/DeviationVO.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.service.vo.base;
+
+import lombok.Data;
+
+/**
+ * 位移数据推送VO
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/18
+ */
+@Data
+public class DeviationVO extends BaseEnvMonitorPushVO {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 位移值
+     * 类型:int 0、1 有、无
+     */
+    private Integer sensorValue;
+
+    @Override
+    public Number getSensorValue() {
+        return sensorValue;
+    }
+
+    @Override
+    protected void validateSensorValue() {
+        // 位移值校验规则,根据实际业务需求调整
+        if (sensorValue == null) {
+            throw new IllegalArgumentException("位移值(sensorValue)为必填项");
+        }
+    }
+}

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/TiltVO.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.service.vo.base;
+
+import lombok.Data;
+
+/**
+ * 倾斜数据推送VO
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/18
+ */
+@Data
+public class TiltVO extends BaseEnvMonitorPushVO {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 倾斜角度值
+     * 类型:int 0、1 有、无
+     */
+    private Integer sensorValue;
+
+    @Override
+    public Number getSensorValue() {
+        return sensorValue;
+    }
+
+    @Override
+    protected void validateSensorValue() {
+        // 倾斜角度值校验规则,根据实际业务需求调整
+        if (sensorValue == null) {
+            throw new IllegalArgumentException("倾斜角度值(sensorValue)为必填项");
+        }
+    }
+}

+ 8 - 3
service-job/src/main/java/com/ruoyi/job/task/RyTask.java

@@ -85,9 +85,14 @@ public class RyTask {
         remotePmService.reportSubmissionReminder();
     }
 
-    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
-        System.out.println("synchronizeDeviceData start......");
-        remoteCdiTaskService.synchronizeDeviceData(tenantId, engineeringId);
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        System.out.println("租户: " + tenantId + "的人防监测数据推送定时任务开始执行......");
+        remoteCdiTaskService.synchronizeDeviceData(tenantId, engineeringId, username, password);
+    }
+
+    public void allData(Long engineeringId, String username, String password) {
+        System.out.println("人防监测数据推送定时任务开始执行......");
+        remoteCdiTaskService.allData(engineeringId, username, password);
     }
 
 }