#840 万象城增加告警查询,外滩27号增加数据转发

Fechado
hanzhengyi quer mesclar 7 commits de uskycloud/han em uskycloud/server-165
32 arquivos alterados com 2039 adições e 621 exclusões
  1. 15 0
      service-cdi/service-cdi-api/src/main/java/com/usky/cdi/AlarmDataSyncTaskService.java
  2. 29 0
      service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/AlarmDataSyncTaskFactory.java
  3. 30 2
      service-cdi/service-cdi-biz/pom.xml
  4. 2 2
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/RuoYiSystemApplication.java
  5. 4 7
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/AlarmDataController.java
  6. 34 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/AlarmDataSyncTaskApi.java
  7. 22 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/web/BaseAlarmController.java
  8. 1 1
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/web/DmpDeviceInfoController.java
  9. 127 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/domain/BaseAlarm.java
  10. 190 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/domain/enums/AlarmType.java
  11. 18 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/mapper/BaseAlarmMapper.java
  12. 16 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/BaseAlarmService.java
  13. 5 5
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java
  14. 5 3
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java
  15. 551 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/AlarmDataSyncService.java
  16. 27 24
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/AlarmDataTransferService.java
  17. 35 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/BaseAlarmServiceImpl.java
  18. 2 2
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/BaseDataTransferService.java
  19. 213 204
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java
  20. 230 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/mqtt/MqttConnectionTool.java
  21. 42 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceAlarmDataSyncService.java
  22. 6 6
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java
  23. 117 86
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/WeatherFetcher.java
  24. 5 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/IotDataTransferVO.java
  25. 0 69
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessage1VO.java
  26. 0 69
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessage2VO.java
  27. 0 79
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessage3VO.java
  28. 107 35
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessageVO.java
  29. 29 0
      service-cdi/service-cdi-biz/src/main/resources/mapper/cdi/BaseAlarmMapper.xml
  30. 148 25
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/PatrolInspectionRecordServiceImpl.java
  31. 20 2
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/util/ExcelUtilImage.java
  32. 9 0
      service-job/src/main/java/com/ruoyi/job/task/RyTask.java

+ 15 - 0
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/AlarmDataSyncTaskService.java

@@ -0,0 +1,15 @@
+package com.usky.cdi;
+
+import com.usky.cdi.factory.AlarmDataSyncTaskFactory;
+import org.springframework.cloud.openfeign.FeignClient;
+
+/**
+ *
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/29
+ */
+@FeignClient(contextId = "AlarmDataSyncTaskService", value = "service-cdi", fallbackFactory = AlarmDataSyncTaskFactory.class)
+public interface AlarmDataSyncTaskService {
+    void synchronizeAlarmData(Integer tenantId, Long engineeringId, String username, String password, String status);
+}

+ 29 - 0
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/AlarmDataSyncTaskFactory.java

@@ -0,0 +1,29 @@
+package com.usky.cdi.factory;
+
+import com.usky.cdi.AlarmDataSyncTaskService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.openfeign.FallbackFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/29
+ */
+@Component
+public class AlarmDataSyncTaskFactory implements FallbackFactory<AlarmDataSyncTaskService> {
+    private static final Logger log = LoggerFactory.getLogger(AlarmDataSyncTaskFactory.class);
+
+    @Override
+    public AlarmDataSyncTaskService create(Throwable throwable) {
+        log.error("用户服务调用失败:{}", throwable.getMessage());
+        return new AlarmDataSyncTaskService() {
+            @Override
+            public void synchronizeAlarmData(Integer tenantId, Long engineeringId, String username, String password, String status) {
+                throw new RuntimeException(throwable);
+            }
+        };
+    }
+}

+ 30 - 2
service-cdi/service-cdi-biz/pom.xml

@@ -1,5 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>service-cdi</artifactId>
         <groupId>com.usky</groupId>
@@ -10,6 +11,22 @@
     <artifactId>service-cdi-biz</artifactId>
 
     <dependencies>
+        <!--                <dependency>-->
+        <!--                    <groupId>com.usky</groupId>-->
+        <!--                    <artifactId>common-cloud-starter</artifactId>-->
+        <!--                    <exclusions>-->
+        <!--                        &lt;!&ndash; 排除Nacos相关依赖 &ndash;&gt;-->
+        <!--                        <exclusion>-->
+        <!--                            <groupId>com.alibaba.cloud</groupId>-->
+        <!--                            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>-->
+        <!--                        </exclusion>-->
+        <!--                        <exclusion>-->
+        <!--                            <groupId>com.alibaba.cloud</groupId>-->
+        <!--                            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>-->
+        <!--                        </exclusion>-->
+        <!--                    </exclusions>-->
+        <!--                </dependency>-->
+
         <dependency>
             <groupId>com.usky</groupId>
             <artifactId>common-cloud-starter</artifactId>
@@ -61,7 +78,7 @@
             <artifactId>spring-websocket</artifactId>
             <version>5.2.8.RELEASE</version>
         </dependency>
-        
+
         <!--Redis依赖-->
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -81,6 +98,17 @@
             <version>0.2.15</version>
             <scope>compile</scope>
         </dependency>
+
+        <!--        <dependency>-->
+        <!--            <groupId>com.alibaba</groupId>-->
+        <!--            <artifactId>druid-spring-boot-starter</artifactId>-->
+        <!--            <version>1.2.20</version>-->
+        <!--        </dependency>-->
+        <!--        <dependency>-->
+        <!--            <groupId>com.alibaba.nacos</groupId>-->
+        <!--            <artifactId>nacos-client</artifactId>-->
+        <!--        </dependency>-->
+
     </dependencies>
 
     <build>

+ 2 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/RuoYiSystemApplication.java

@@ -1,7 +1,5 @@
 package com.usky.cdi;
 
-
-
 import org.mybatis.spring.annotation.MapperScan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -12,6 +10,7 @@ import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.core.env.Environment;
 import org.springframework.integration.config.EnableIntegration;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 import java.net.InetAddress;
@@ -25,6 +24,7 @@ import java.net.UnknownHostException;
 
 //@EnableSwagger2
 @EnableScheduling
+@EnableAsync // 启用异步执行支持
 @EnableFeignClients(basePackages = "com.usky")
 @MapperScan(value = "com.usky.cdi.mapper")
 @ComponentScan("com.usky")

+ 4 - 7
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/AlarmDataController.java

@@ -1,11 +1,7 @@
 package com.usky.cdi.controller;
 
 import com.usky.cdi.service.impl.AlarmDataTransferService;
-import com.usky.cdi.service.vo.alarm.AlarmMessage1VO;
-import com.usky.cdi.service.vo.alarm.AlarmMessage2VO;
-import com.usky.cdi.service.vo.alarm.AlarmMessage3VO;
 import com.usky.cdi.service.vo.alarm.AlarmMessageVO;
-import com.usky.cdi.service.vo.base.EngineeringBaseVO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -28,6 +24,7 @@ import org.springframework.web.bind.annotation.RestController;
 public class AlarmDataController {
     @Autowired
     private AlarmDataTransferService alarmDataTransferService;
+
     /**
      * 上报人防工程基础信息
      */
@@ -41,7 +38,7 @@ public class AlarmDataController {
      * 上报人防工程基础信息
      */
     @PostMapping("/alarmMessage1")
-    public String sendAlarmMessage1(@RequestBody AlarmMessage1VO vo) {
+    public String sendAlarmMessage1(@RequestBody AlarmMessageVO vo) {
         boolean success = alarmDataTransferService.sendAlarmMessage1(vo);
         return success ? "上报成功" : "上报失败";
     }
@@ -50,13 +47,13 @@ public class AlarmDataController {
      * 上报倾斜、位移、裂缝监测事件
      */
     @PostMapping("/alarmMessage2")
-    public String sendAlarmMessage2(@RequestBody AlarmMessage2VO vo) {
+    public String sendAlarmMessage2(@RequestBody AlarmMessageVO vo) {
         boolean success = alarmDataTransferService.sendAlarmMessage2(vo);
         return success ? "上报成功" : "上报失败";
     }
 
     @PostMapping("/alarmMessage3")
-    public String sendAlarmMessage3(@RequestBody AlarmMessage3VO vo) {
+    public String sendAlarmMessage3(@RequestBody AlarmMessageVO vo) {
         boolean success = alarmDataTransferService.sendEngineeringBase(vo);
         return success ? "上报成功" : "上报失败";
     }

+ 34 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/AlarmDataSyncTaskApi.java

@@ -0,0 +1,34 @@
+package com.usky.cdi.controller.api;
+
+import com.usky.cdi.AlarmDataSyncTaskService;
+import com.usky.cdi.service.impl.AlarmDataSyncService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 多租户定时任务API接口
+ *
+ * @author usky
+ * @since 2025-12-29
+ */
+@Slf4j
+@RestController
+public class AlarmDataSyncTaskApi implements AlarmDataSyncTaskService {
+
+    @Autowired
+    private AlarmDataSyncService alarmDataSyncService;
+
+    /**
+     * 同步告警数据
+     * tenantId: 租户ID
+     * engineeringId: 工程ID
+     * username: mqtt用户名
+     * password: mqtt密码
+     * status: 状态 0:平时 1:战时
+     */
+    @Override
+    public void synchronizeAlarmData(Integer tenantId, Long engineeringId, String username, String password, String status) {
+        alarmDataSyncService.synchronizeAlarmData(tenantId, engineeringId, username, password, status);
+    }
+}

+ 22 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/web/BaseAlarmController.java

@@ -0,0 +1,22 @@
+package com.usky.cdi.controller.web;
+
+
+import org.springframework.web.bind.annotation.RequestMapping;
+
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * <p>
+ * 统一告警表 前端控制器
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+@RestController
+@RequestMapping("/baseAlarm")
+public class BaseAlarmController {
+
+}
+

+ 1 - 1
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/web/DmpDeviceInfoController.java

@@ -23,7 +23,7 @@ import java.util.Map;
  * @since 2022-10-08
  */
 @RestController
-@RequestMapping("/dmpDeviceInfo")
+@RequestMapping("/cdiDmpDeviceInfo")
 public class DmpDeviceInfoController {
 
     @Autowired

+ 127 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/domain/BaseAlarm.java

@@ -0,0 +1,127 @@
+package com.usky.cdi.domain;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import java.time.LocalDateTime;
+import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * <p>
+ * 统一告警表
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+public class BaseAlarm implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键ID
+     */
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * 设备ID
+     */
+    private String deviceId;
+
+    /**
+     * 告警时间
+     */
+    private LocalDateTime alarmTime;
+
+    /**
+     * 告警类型
+     */
+    private String alarmType;
+
+    /**
+     * 告警对象
+     */
+    private String alarmObject;
+
+    /**
+     * 告警值
+     */
+    private String alarmData;
+
+    /**
+     * 告警属性
+     */
+    private String alarmAttribute;
+
+    /**
+     * 告警内容
+     */
+    private String alarmContent;
+
+    /**
+     * 告警等级
+     * 1 一级,2 二级,3 三级,4 四级,5 五级
+     */
+    private Integer alarmGrade;
+
+    /**
+     * 告警位置
+     */
+    private String alarmAddress;
+
+    /**
+     * 处理人
+     */
+    private String handleBy;
+
+    /**
+     * 处理时间
+     */
+    private LocalDateTime handleTime;
+
+    /**
+     * 处理内容
+     */
+    private String handleContent;
+
+    /**
+     * 处理人电话
+     */
+    private String handlePhone;
+
+    /**
+     * 处理状态;0 未处理,1 已处理
+     */
+    private Integer handleStatus;
+
+    /**
+     * 是否误报;0 非误报,1 误报
+     */
+    private Integer alarmFalse;
+
+    /**
+     * 现场照片
+     */
+    private String sitePhoto;
+
+    /**
+     * 产品编码
+     */
+    private String productCode;
+
+    /**
+     * 组织机构ID
+     */
+    private Integer deptId;
+
+    /**
+     * 租户ID
+     */
+    private Integer tenantId;
+
+
+}

+ 190 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/domain/enums/AlarmType.java

@@ -0,0 +1,190 @@
+package com.usky.cdi.domain.enums;
+
+/**
+ * 告警类型枚举类
+ * 统一管理告警类型定义、分类和转换逻辑
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+public enum AlarmType {
+
+    // 水浸告警
+    WATER("701", "10.1", null, "有水"),
+
+    // 空气温度偏高告警
+    AIR_TEMP("702", "12.1", Category.THRESHOLD, "空气温度偏高"),
+
+    // 空气湿度偏高告警
+    AIR_HUMIDITY("703", "13.1", Category.THRESHOLD, "空气湿度偏高"),
+
+    // 氧气浓度偏低告警
+    OXYGEN_LOW("704", "14.1", Category.THRESHOLD, "氧气浓度偏低"),
+
+    // 二氧化碳浓度偏高告警
+    CO2_HIGH("705", "15.1", Category.THRESHOLD, "二氧化碳浓度偏高"),
+
+    // 一氧化碳浓度偏高告警
+    CO_HIGH("706", "16.1", Category.THRESHOLD, "一氧化碳浓度偏高"),
+
+    // 剩余电流偏大告警
+    RESIDUAL_CURRENT("707", "29.1", Category.THRESHOLD_LINE, "剩余电流偏大"),
+
+    // 供电电缆温度偏高告警
+    CABLE_TEMP("708", "29.2", Category.THRESHOLD_LINE, "供电电缆温度偏高"),
+
+    // 电流偏大
+    CURRENT_HIGH("709", "29.3", Category.THRESHOLD_LINE, "电流偏大"),
+
+    // 人员闯入告警
+    PERSON_INTRUSION("710", "33.1", Category.OTHER, "人员闯入告警"),
+
+    // 监测位置有位移告警
+    DISPLACEMENT("711", "37.1", Category.STRUCTURE_MONITORING, "有位移"),
+
+    // 监测位置有倾斜告警
+    TILT("712", "34.1", Category.STRUCTURE_MONITORING, "有倾斜"),
+
+    // 裂缝检测告警
+    CRACK("713", "36.1", Category.STRUCTURE_MONITORING, "有裂缝");
+
+    private final String originalCode;
+    private final String mappedCode;
+    private final Category category;
+    private final String description;
+
+    AlarmType(String originalCode, String mappedCode, Category category, String description) {
+        this.originalCode = originalCode;
+        this.mappedCode = mappedCode;
+        this.category = category;
+        this.description = description;
+    }
+
+    /**
+     * 根据原始告警类型代码获取对应的枚举值
+     */
+    public static AlarmType fromOriginalCode(String originalCode) {
+        for (AlarmType alarmType : values()) {
+            if (alarmType.getOriginalCode().equals(originalCode)) {
+                return alarmType;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 判断是否为结构监测告警类型(需要unitName映射)
+     */
+    public boolean isStructureMonitoringType() {
+        return category == Category.STRUCTURE_MONITORING;
+    }
+
+    /**
+     * 判断是否为阈值相关告警类型
+     */
+    public boolean isThresholdAlarmType() {
+        return category == Category.THRESHOLD || category == Category.THRESHOLD_LINE;
+    }
+
+    /**
+     * 判断是否为线路编号相关告警类型
+     */
+    public boolean isLineNoAlarmType() {
+        return category == Category.THRESHOLD_LINE;
+    }
+
+    /**
+     * 获取默认unitName值
+     */
+    public String getDefaultUnitName() {
+        // switch (this) {
+        //     case DISPLACEMENT:
+        //         return "监测位置";
+        //     case TILT:
+        //         return "倾斜监测点";
+        //     case CRACK:
+        //         return "裂缝检测点";
+        //     default:
+        return "防化单元";
+        // }
+    }
+
+    /**
+     * 获取默认阈值
+     */
+    public Float getDefaultThreshold() {
+        switch (this) {
+            case AIR_TEMP:
+                return 40.0f; // 35°C
+            case AIR_HUMIDITY:
+                return 95.0f; // 80%
+            case OXYGEN_LOW:
+                return 19.5f; // 19.5%
+            case CO2_HIGH:
+                return 1000.0f; // 0.5%
+            case CO_HIGH:
+                return 5.0f; // 10ppm
+            case RESIDUAL_CURRENT:
+                return 10.0f; // 300mA
+            case CABLE_TEMP:
+                return 55.0f; // 70°C
+            case CURRENT_HIGH:
+                return 10000.0f; // 10000mA
+            default:
+                return 0.0f;
+        }
+    }
+
+    /**
+     * 验证阈值是否在合理范围内
+     */
+    public boolean isValidThreshold(float value) {
+        switch (this) {
+            case AIR_TEMP:
+                return value >= -20 && value <= 80; // 温度范围 -50°C 到 100°C
+            case AIR_HUMIDITY:
+                return value >= 0 && value <= 100; // 湿度范围 0% 到 100%
+            case OXYGEN_LOW:
+                return value >= 0 && value <= 25; // 氧气浓度范围 0% 到 25%
+            case CO2_HIGH:
+                return value >= 0 && value <= 5; // 二氧化碳浓度范围 0% 到 10%
+            case CO_HIGH:
+                return value >= 0 && value <= 2000; // 一氧化碳浓度范围 0 到 1000ppm
+            case RESIDUAL_CURRENT:
+                return value >= 0 && value <= 500000; // 剩余电流范围 0 到 5000mA
+            case CABLE_TEMP:
+                return value >= -50 && value <= 200; // 电缆温度范围 -50°C 到 200°C
+            case CURRENT_HIGH:
+                return value >= 0 && value <= 100000; // 电流范围 0 到 1000A
+            default:
+                return true; // 其他类型不进行验证
+        }
+    }
+
+    /**
+     * 告警分类枚举
+     */
+    public enum Category {
+        STRUCTURE_MONITORING,    // 结构监测类型(需要unitName映射)
+        THRESHOLD,              // 阈值告警类型(需要thresholding)
+        THRESHOLD_LINE,         // 阈值+线路编号类型(需要thresholding和lineNo)
+        OTHER                   // 其他类型
+    }
+
+    // Getters
+    public String getOriginalCode() {
+        return originalCode;
+    }
+
+    public String getMappedCode() {
+        return mappedCode;
+    }
+
+    public Category getCategory() {
+        return category;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+}

+ 18 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/mapper/BaseAlarmMapper.java

@@ -0,0 +1,18 @@
+package com.usky.cdi.mapper;
+
+import com.usky.cdi.domain.BaseAlarm;
+import com.usky.common.mybatis.core.CrudMapper;
+import org.springframework.stereotype.Repository;
+
+/**
+ * <p>
+ * 统一告警表 Mapper 接口
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+@Repository
+public interface BaseAlarmMapper extends CrudMapper<BaseAlarm> {
+
+}

+ 16 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/BaseAlarmService.java

@@ -0,0 +1,16 @@
+package com.usky.cdi.service;
+
+import com.usky.cdi.domain.BaseAlarm;
+import com.usky.common.mybatis.core.CrudService;
+
+/**
+ * <p>
+ * 统一告警表 服务类
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+public interface BaseAlarmService extends CrudService<BaseAlarm> {
+
+}

+ 5 - 5
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/DeviceFieldConfig.java

@@ -47,11 +47,11 @@ public class DeviceFieldConfig {
         // 一氧化碳传感器(711)
         fieldMapping.put("711", "co");
 
-        // 倾斜传感器
-        fieldMapping.put("712", "qx");
-
-        // 裂缝传感器
-        fieldMapping.put("713", "cd");
+        // // 倾斜传感器
+        // fieldMapping.put("712", "qx");
+        //
+        // // 裂缝传感器
+        // fieldMapping.put("713", "cd");
 
         // 位移传感器
         fieldMapping.put("714", "wy");

+ 5 - 3
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -73,10 +73,12 @@ public class MqttOutConfig {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
 
         // 设置默认的MqttConnectOptions,确保serverURIs不为null
-        // 实际使用时,会在createMqttConnection方法中重新配置
+        // 使用时,会在createMqttConnection方法中重新配置
         MqttConnectOptions options = new MqttConnectOptions();
-        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"}); // 设置默认的服务器地址
-        options.setKeepAliveInterval(60); // 设置默认的心跳间隔
+        // 设置默认的服务器地址
+        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"});
+        // 设置默认的心跳间隔
+        options.setKeepAliveInterval(60);
         factory.setConnectionOptions(options);
 
         return factory;

+ 551 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/AlarmDataSyncService.java

@@ -0,0 +1,551 @@
+package com.usky.cdi.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.usky.cdi.domain.BaseAlarm;
+import com.usky.cdi.domain.BaseBuildFacility;
+import com.usky.cdi.mapper.BaseAlarmMapper;
+import com.usky.cdi.mapper.BaseBuildFacilityMapper;
+import com.usky.cdi.mapper.DmpProductMapper;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
+import com.usky.cdi.service.util.SnowflakeIdGenerator;
+import com.usky.cdi.service.vo.alarm.AlarmMessageVO;
+import com.usky.cdi.domain.enums.AlarmType;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * 多租户定时任务服务
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class AlarmDataSyncService {
+
+    @Value("${snowflake.worker-id:3}")
+    private long workerId;
+
+    @Value("${snowflake.data-center-id:3}")
+    private long dataCenterId;
+
+    private static final String PEACETIME = "peacetime";
+    private static final String WARTIME = "wartime";
+    private static final String MQTT_TOPIC = "alarm/message";
+    private static final String ALARM_DELIVERY_KEY_PREFIX = "alarm:delivery:";
+    private final MqttConnectionTool mqttConnectionTool;
+
+    @Autowired
+    private StringRedisTemplate stringRedisTemplate;
+    @Autowired
+    private BaseAlarmMapper baseAlarmMapper;
+    @Autowired
+    private DmpProductMapper dmpProductMapper;
+    @Autowired
+    private BaseBuildFacilityMapper baseBuildFacilityMapper;
+
+    private SnowflakeIdGenerator idGenerator;
+
+    @PostConstruct
+    public void init() {
+        this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
+    }
+
+    /**
+     * 获取当前时间
+     */
+    private LocalDateTime getCurrentTime() {
+        return LocalDateTime.now();
+    }
+
+    /**
+     * 生成数据包ID
+     */
+    private Long generateDataPacketID() {
+        return idGenerator.nextPacketId();
+    }
+
+    @Async
+    public void synchronizeAlarmData(Integer tenantId, Long engineeringId, String username, String password, String status) {
+        log.info("租户:{}的人防告警数据推送定时任务开始执行,平战时状态:{}", tenantId, PEACETIME.equals(status) ? "平时" : "战时");
+
+        Long startTime = System.currentTimeMillis();
+        log.info("开始时间:{}", getCurrentTime());
+
+        // 1.查询 base_alarm 表中的告警数据,筛选条件为 tenant_id = tenantId
+        LambdaQueryWrapper<BaseAlarm> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.eq(BaseAlarm::getTenantId, tenantId)
+                .eq(BaseAlarm::getAlarmGrade, 1);
+        List<BaseAlarm> alarmList = baseAlarmMapper.selectList(queryWrapper);
+
+        if (alarmList.isEmpty()) {
+            log.warn("租户{}没有新的告警数据,任务结束", tenantId);
+            return;
+        }
+        log.info("查询到租户{}的告警数据总数:{}", tenantId, alarmList.size());
+
+        // 批量查询建筑设施数据,构建设备ID到设施的映射
+        Map<String, BaseBuildFacility> facilityMap = buildFacilityMap(alarmList, tenantId);
+
+        // 记录每一条告警数据
+        for (BaseAlarm alarm : alarmList) {
+            log.info("告警数据:ID={}, 设备ID={}, 告警类型={}, 告警时间={}, 处理状态={}, 是否误报={}",
+                    alarm.getId(), alarm.getDeviceId(), alarm.getAlarmType(),
+                    alarm.getAlarmTime(), alarm.getHandleStatus(), alarm.getAlarmFalse());
+        }
+
+        try {
+            // 2.创建MQTT连接
+            mqttConnectionTool.connectOrRefresh(username, password);
+
+            int successCount = 0;
+            int failureCount = 0;
+
+            // 3.遍历告警数据,转换为AlarmMessageVO并发送
+            for (BaseAlarm alarm : alarmList) {
+                try {
+                    // 根据告警类型映射关系转换
+                    String alarmType = convertAlarmType(alarm.getAlarmType());
+                    if (alarmType == null) {
+                        log.warn("不支持的告警类型:{},跳过处理", alarm.getAlarmType());
+                        failureCount++;
+                        continue;
+                    }
+
+                    // 转换告警状态
+                    Integer alarmStatus = convertAlarmStatus(alarm.getHandleStatus(), alarm.getAlarmFalse(), status);
+
+                    // 构建AlarmMessageVO对象
+                    AlarmMessageVO<Integer> alarmMessageVO = new AlarmMessageVO<>();
+                    alarmMessageVO.setDataPacketID(generateDataPacketID());
+                    alarmMessageVO.setEngineeringID(engineeringId);
+                    alarmMessageVO.setAlarmID(alarm.getId());
+                    // 固定值1,人防工程级物联平台自动发现
+                    alarmMessageVO.setAlarmSource(1);
+                    alarmMessageVO.setSensorID(alarm.getDeviceId() != null ? Integer.valueOf(alarm.getDeviceId()) : null);
+                    alarmMessageVO.setAlarmType(alarmType);
+                    alarmMessageVO.setAlarmStatus(alarmStatus);
+
+                    // 设置告警时间:新增告警取 alarm_type 字段,更新数据则取 handle_time 字段
+                    String alarmTime = alarm.getHandleTime() != null ?
+                            alarm.getHandleTime().toString() : alarm.getAlarmTime().toString();
+                    alarmMessageVO.setAlarmUpdateTime(alarmTime);
+
+                    // 设置监测对象告警描述和编号
+                    alarmMessageVO.setAlarmDesc(alarm.getAlarmAttribute());
+                    alarmMessageVO.setMonitorObjNo(facilityMap.get(alarm.getDeviceId()).getFacilityNum());
+
+                    // 设置上报时间为当前时间
+                    alarmMessageVO.setPublishTime(getCurrentTime());
+
+                    // 设置告警数据字段
+                    alarmMessageVO.setSensorValue(alarm.getAlarmData());
+
+                    // 根据告警类型填充unitName、thresholding、lineNo字段
+                    populateAlarmFields(alarmMessageVO, alarm, tenantId, facilityMap);
+
+                    // 从Redis获取缓存的投递状态,检查是否有变更
+                    String deliveryKey = alarm.getDeviceId() + "_" + alarm.getId();
+                    String cachedRecord = (String) stringRedisTemplate.opsForHash().get(ALARM_DELIVERY_KEY_PREFIX + tenantId, deliveryKey);
+                    if (cachedRecord != null && !hasAlarmChanged(alarm, cachedRecord, status)) {
+                        log.info("告警数据无变化,跳过推送:设备ID={}, 告警ID={}", alarm.getDeviceId(), alarm.getId());
+                        continue;
+                    }
+
+                    // 4.将AlarmMessageVO转换为JSON字符串并发送MQTT消息
+                    String jsonMessage = JSON.toJSONString(alarmMessageVO);
+                    MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(username, password);
+                    gateway.sendToMqtt(MQTT_TOPIC, jsonMessage);
+
+                    successCount++;
+
+                    // 更新Redis中的投递记录
+                    saveDeliveryRecordToRedis(alarm, alarmStatus, status, tenantId);
+
+                    log.info("成功发送告警消息:设备ID={}, 告警类型={}, 消息内容={}",
+                            alarm.getDeviceId(), alarm.getAlarmType(), jsonMessage);
+
+                } catch (Exception e) {
+                    failureCount++;
+                    log.error("发送告警消息失败:设备ID={}, 告警类型={}, 错误信息={}",
+                            alarm.getDeviceId(), alarm.getAlarmType(), e.getMessage(), e);
+                }
+            }
+
+            // 6.打印统计信息
+            log.info("租户{}的告警数据推送任务完成", tenantId);
+            log.info("告警数据总数:{}, 成功推送:{}, 失败:{}", alarmList.size(), successCount, failureCount);
+
+        } catch (Exception e) {
+            log.error("租户{}的告警数据推送定时任务执行失败:{}", tenantId, e.getMessage(), e);
+        } finally {
+            Long endTime = System.currentTimeMillis();
+            log.info("结束时间:{}, 耗时:{}ms", getCurrentTime(), endTime - startTime);
+        }
+    }
+
+    /**
+     * 将base_alarm的alarm_type转换为AlarmMessageVO的alarmType
+     */
+    private String convertAlarmType(String alarmType) {
+        AlarmType type = AlarmType.fromOriginalCode(alarmType);
+        return type != null ? type.getMappedCode() : null;
+    }
+
+    /**
+     * 根据平时/战时状态转换告警状态
+     * 平时:handle_status=0则alarmStatus=1,handle_status=1则alarmStatus=4,alarm_false=1则alarmStatus=2
+     * 战时:handle_status=0则alarmStatus=1,handle_status=1则alarmStatus=3,alarm_false=1则alarmStatus=5
+     */
+    private Integer convertAlarmStatus(Integer handleStatus, Integer alarmFalse, String status) {
+        // 如果是误报,优先返回对应的误报状态
+        if (alarmFalse != null && alarmFalse == 1) {
+            // 平时误报=2,战时误报=5
+            return PEACETIME.equals(status) ? 2 : 5;
+        }
+
+        // 根据处理状态和平战时状态返回对应状态
+        if (handleStatus != null) {
+            if (handleStatus == 0) {
+                // 未处理/待处置
+                return 1;
+            } else if (handleStatus == 1) {
+                // 平时已办结=4,战时已处置=3
+                return PEACETIME.equals(status) ? 4 : 3;
+            }
+        }
+
+        // 默认返回未处理状态
+        return 1;
+    }
+
+    /**
+     * 批量查询建筑设施数据,构建设备ID到设施的映射
+     */
+    private Map<String, BaseBuildFacility> buildFacilityMap(List<BaseAlarm> alarmList, Integer tenantId) {
+        Map<String, BaseBuildFacility> facilityMap = new HashMap<>();
+
+        // 收集所有设备ID
+        List<String> deviceIds = alarmList.stream()
+                .map(BaseAlarm::getDeviceId)
+                .filter(id -> id != null && !id.trim().isEmpty())
+                .distinct()
+                .collect(Collectors.toList());
+
+        if (deviceIds.isEmpty()) {
+            return facilityMap;
+        }
+
+        try {
+            LambdaQueryWrapper<BaseBuildFacility> queryWrapper = new LambdaQueryWrapper<>();
+            queryWrapper.in(BaseBuildFacility::getDeviceId, deviceIds)
+                    .eq(BaseBuildFacility::getTenantId, tenantId)
+                    .eq(BaseBuildFacility::getDeleteFlag, 0);
+
+            List<BaseBuildFacility> facilities = baseBuildFacilityMapper.selectList(queryWrapper);
+
+            for (BaseBuildFacility facility : facilities) {
+                if (facility.getDeviceId() != null) {
+                    facilityMap.put(facility.getDeviceId(), facility);
+                }
+            }
+
+            log.info("批量查询建筑设施数据完成:设备ID数量={}, 匹配到设施数量={}", deviceIds.size(), facilityMap.size());
+        } catch (Exception e) {
+            log.warn("批量查询BaseBuildFacility失败:{}", e.getMessage());
+        }
+
+        return facilityMap;
+    }
+
+    /**
+     * 将投递记录保存到Redis Hash中
+     * 使用Hash结构:key=alarm:delivery:{tenantId}, field=deviceId_alarmId, value=JSON格式的投递状态
+     */
+    private void saveDeliveryRecordToRedis(BaseAlarm alarm, Integer currentAlarmStatus, String status, Integer tenantId) {
+        try {
+            String deliveryKey = ALARM_DELIVERY_KEY_PREFIX + tenantId;
+            String fieldKey = alarm.getDeviceId() + "_" + alarm.getId();
+
+            Map<String, String> recordMap = new HashMap<>();
+            recordMap.put("alarmStatus", String.valueOf(currentAlarmStatus));
+            recordMap.put("handleStatus", String.valueOf(alarm.getHandleStatus()));
+            recordMap.put("alarmFalse", String.valueOf(alarm.getAlarmFalse()));
+            recordMap.put("handleTime", alarm.getHandleTime() != null ? alarm.getHandleTime().toString() : "");
+            recordMap.put("handleBy", alarm.getHandleBy() != null ? alarm.getHandleBy() : "");
+            recordMap.put("deliveryTime", LocalDateTime.now().toString());
+
+            stringRedisTemplate.opsForHash().put(deliveryKey, fieldKey, JSON.toJSONString(recordMap));
+
+            log.debug("保存投递记录到Redis:key={}, field={}", deliveryKey, fieldKey);
+        } catch (Exception e) {
+            log.warn("保存投递记录到Redis失败:告警ID={}, 错误={}", alarm.getId(), e.getMessage());
+        }
+    }
+
+    /**
+     * 检查告警数据是否有变更
+     * 比较handleStatus、alarmFalse、handleTime、handleBy是否发生变化
+     */
+    private boolean hasAlarmChanged(BaseAlarm alarm, String cachedRecord, String status) {
+        try {
+            Map<String, String> cachedMap = JSON.parseObject(cachedRecord, Map.class);
+
+            Integer currentAlarmStatus = convertAlarmStatus(alarm.getHandleStatus(), alarm.getAlarmFalse(), status);
+            Integer cachedAlarmStatus = parseInteger(cachedMap.get("alarmStatus"));
+            Integer cachedHandleStatus = parseInteger(cachedMap.get("handleStatus"));
+            Integer cachedAlarmFalse = parseInteger(cachedMap.get("alarmFalse"));
+
+            if (!Objects.equals(currentAlarmStatus, cachedAlarmStatus)) {
+                return true;
+            }
+            if (!Objects.equals(alarm.getHandleStatus(), cachedHandleStatus)) {
+                return true;
+            }
+            if (!Objects.equals(alarm.getAlarmFalse(), cachedAlarmFalse)) {
+                return true;
+            }
+
+            String cachedHandleTime = cachedMap.get("handleTime");
+            String currentHandleTime = alarm.getHandleTime() != null ? alarm.getHandleTime().toString() : "";
+            if (!Objects.equals(currentHandleTime, cachedHandleTime)) {
+                return true;
+            }
+
+            String cachedHandleBy = cachedMap.get("handleBy");
+            String currentHandleBy = alarm.getHandleBy() != null ? alarm.getHandleBy() : "";
+            if (!Objects.equals(currentHandleBy, cachedHandleBy)) {
+                return true;
+            }
+
+            return false;
+        } catch (Exception e) {
+            log.warn("解析缓存的投递记录失败:告警ID={}, 错误={}", alarm.getId(), e.getMessage());
+            return true;
+        }
+    }
+
+    private Integer parseInteger(String value) {
+        if (value == null || value.trim().isEmpty()) {
+            return null;
+        }
+        try {
+            return Integer.parseInt(value);
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
+    /**
+     * 根据告警类型填充unitName、thresholding、lineNo字段
+     */
+    private void populateAlarmFields(AlarmMessageVO<Integer> alarmMessageVO, BaseAlarm alarm, Integer tenantId,
+                                     Map<String, BaseBuildFacility> facilityMap) {
+        String originalAlarmType = alarm.getAlarmType();
+        AlarmType alarmTypeEnum = AlarmType.fromOriginalCode(originalAlarmType);
+
+        try {
+            // 根据AlarmMessageVO字段注释中的规则填充字段
+            if (alarmTypeEnum != null && alarmTypeEnum.isStructureMonitoringType()) {
+                // 倾斜/位移/裂缝告警类型 - 设置unitName
+                populateUnitName(alarmMessageVO, alarm, tenantId, facilityMap);
+            }
+            if (alarmTypeEnum != null && alarmTypeEnum.isThresholdAlarmType()) {
+                // 温湿度、气体浓度告警类型 - 设置thresholding
+                populateThresholding(alarmMessageVO, alarm);
+            }
+            if (alarmTypeEnum != null && alarmTypeEnum.isLineNoAlarmType()) {
+                // 电气相关告警类型 - 设置lineNo
+                populateLineNo(alarmMessageVO, alarm);
+            }
+        } catch (Exception e) {
+            log.warn("填充告警字段时发生异常:告警类型={}, 设备ID={}, 错误信息={}",
+                    originalAlarmType, alarm.getDeviceId(), e.getMessage());
+        }
+    }
+
+    /**
+     * 填充unitName字段(倾斜才用)
+     * 对于711、712、713类型,实现与BaseBuildFacility表中facilityDesc字段的映射
+     */
+    private void populateUnitName(AlarmMessageVO<Integer> alarmMessageVO, BaseAlarm alarm, Integer tenantId,
+                                  Map<String, BaseBuildFacility> facilityMap) {
+        String originalAlarmType = alarm.getAlarmType();
+        AlarmType alarmTypeEnum = AlarmType.fromOriginalCode(originalAlarmType);
+
+        if (alarmTypeEnum != null && alarmTypeEnum.isStructureMonitoringType()) {
+            // 从Map中获取建筑设施信息
+            BaseBuildFacility facility = facilityMap.get(alarm.getDeviceId());
+            if (facility != null && facility.getFacilityDesc() != null) {
+                alarmMessageVO.setUnitName(facility.getFacilityDesc());
+                log.debug("成功映射unitName:设备ID={}, facilityDesc={}", alarm.getDeviceId(), facility.getFacilityDesc());
+            } else {
+                // 如果没有找到对应的facilityDesc,设置默认值
+                String defaultUnitName = alarmTypeEnum.getDefaultUnitName();
+                alarmMessageVO.setUnitName(defaultUnitName);
+                log.debug("未找到facilityDesc,使用默认值:设备ID={}, 默认值={}", alarm.getDeviceId(), defaultUnitName);
+            }
+        } else {
+            // 其他情况,设置默认值
+            String defaultUnitName = alarmTypeEnum != null ? alarmTypeEnum.getDefaultUnitName() : "监测单元";
+            alarmMessageVO.setUnitName(defaultUnitName);
+        }
+    }
+
+    /**
+     * 填充thresholding字段(氧气、温湿度、一氧化碳、二氧化碳、剩余电流、供电电缆温度、电流偏大专用)
+     */
+    private void populateThresholding(AlarmMessageVO<Integer> alarmMessageVO, BaseAlarm alarm) {
+        // try {
+        //     // 从告警数据中解析阈值,如果解析失败则设置默认值
+        //     Float threshold = parseThresholdFromAlarmData(alarm.getAlarmData(), alarm.getAlarmType());
+        //     alarmMessageVO.setThresholding(threshold);
+        // } catch (Exception e) {
+        // 解析失败时设置合理的默认值
+        Float defaultThreshold = getDefaultThreshold(alarm.getAlarmType());
+        alarmMessageVO.setThresholding(defaultThreshold);
+        // log.warn("解析阈值失败,使用默认值:设备ID={}, 告警类型={}, 错误={}",
+        //         alarm.getDeviceId(), alarm.getAlarmType(), e.getMessage());
+        // }
+    }
+
+    /**
+     * 从告警数据中解析阈值
+     */
+    private Float parseThresholdFromAlarmData(String alarmData, String alarmType) {
+        if (alarmData == null || alarmData.trim().isEmpty()) {
+            return getDefaultThreshold(alarmType);
+        }
+
+        try {
+            // 尝试从告警数据中提取数值
+            String cleanedData = alarmData.replaceAll("[^\\d.]", "");
+            if (!cleanedData.isEmpty()) {
+                float parsedValue = Float.parseFloat(cleanedData);
+
+                // 添加边界检查,防止异常值
+                if (isValidThreshold(parsedValue, alarmType)) {
+                    return parsedValue;
+                } else {
+                    log.debug("阈值超出合理范围,使用默认值:alarmData={}, alarmType={}, value={}",
+                            alarmData, alarmType, parsedValue);
+                }
+            }
+        } catch (NumberFormatException e) {
+            log.debug("无法从告警数据中解析数值:alarmData={}", alarmData);
+        }
+
+        return getDefaultThreshold(alarmType);
+    }
+
+    /**
+     * 验证阈值是否在合理范围内
+     */
+    private boolean isValidThreshold(float value, String alarmType) {
+        AlarmType type = AlarmType.fromOriginalCode(alarmType);
+        return type == null || type.isValidThreshold(value);
+    }
+
+    /**
+     * 获取thresholding的默认值
+     */
+    private Float getDefaultThreshold(String alarmType) {
+        AlarmType type = AlarmType.fromOriginalCode(alarmType);
+        return type != null ? type.getDefaultThreshold() : 0.0f;
+    }
+
+    /**
+     * 填充lineNo字段(剩余电流、供电电缆温度、电流偏大专用)
+     */
+    private void populateLineNo(AlarmMessageVO<Integer> alarmMessageVO, BaseAlarm alarm) {
+        // try {
+        //     // 从设备ID或告警数据中解析线路编号
+        //     Integer lineNo = parseLineNoFromDeviceOrData(alarm.getDeviceId(), alarm.getAlarmData());
+        //     alarmMessageVO.setLineNo(lineNo);
+        // } catch (Exception e) {
+        // 解析失败时设置默认值
+        alarmMessageVO.setLineNo(1); // 默认线路1
+        // log.warn("解析线路编号失败,使用默认值:设备ID={}, 错误={}", alarm.getDeviceId(), e.getMessage());
+        // }
+    }
+
+    /**
+     * 从设备ID或告警数据中解析线路编号
+     */
+    // private Integer parseLineNoFromDeviceOrData(String deviceId, String alarmData) {
+    //     // 首先尝试从设备ID中解析
+    //     if (deviceId != null && !deviceId.trim().isEmpty()) {
+    //         // 假设设备ID格式可能包含线路信息,例如:DEV001_L1, DEV002_L2等
+    //         if (deviceId.contains("_L")) {
+    //             try {
+    //                 String linePart = deviceId.substring(deviceId.lastIndexOf("_L") + 2);
+    //                 int lineNo = Integer.parseInt(linePart);
+    //                 if (isValidLineNo(lineNo)) {
+    //                     return lineNo;
+    //                 } else {
+    //                     log.debug("线路编号超出范围,使用默认值:deviceId={}, lineNo={}", deviceId, lineNo);
+    //                 }
+    //             } catch (NumberFormatException e) {
+    //                 log.debug("无法从设备ID中解析线路编号:deviceId={}", deviceId);
+    //             }
+    //         }
+    //
+    //         // 或者尝试从设备ID末尾提取数字作为线路编号
+    //         String numbers = deviceId.replaceAll("[^\\d]", "");
+    //         if (!numbers.isEmpty()) {
+    //             try {
+    //                 int lineNo = Integer.parseInt(numbers);
+    //                 if (isValidLineNo(lineNo)) {
+    //                     return lineNo;
+    //                 } else {
+    //                     log.debug("从设备ID提取的线路编号超出范围,使用默认值:deviceId={}, lineNo={}", deviceId, lineNo);
+    //                 }
+    //             } catch (NumberFormatException e) {
+    //                 log.debug("无法从设备ID中提取有效线路编号:deviceId={}", deviceId);
+    //             }
+    //         }
+    //     }
+    //
+    //     // 然后尝试从告警数据中解析
+    //     if (alarmData != null && !alarmData.trim().isEmpty()) {
+    //         String numbers = alarmData.replaceAll("[^\\d]", "");
+    //         if (!numbers.isEmpty()) {
+    //             try {
+    //                 int lineNo = Integer.parseInt(numbers);
+    //                 if (isValidLineNo(lineNo)) {
+    //                     return lineNo;
+    //                 } else {
+    //                     log.debug("从告警数据中解析的线路编号超出范围,使用默认值:alarmData={}, lineNo={}", alarmData, lineNo);
+    //                 }
+    //             } catch (NumberFormatException e) {
+    //                 log.debug("无法从告警数据中解析线路编号:alarmData={}", alarmData);
+    //             }
+    //         }
+    //     }
+    //
+    //     // 默认返回线路1
+    //     return 1;
+    // }
+
+    /**
+     * 验证线路编号是否在有效范围内
+     */
+    private boolean isValidLineNo(int lineNo) {
+        return lineNo >= 1 && lineNo <= 20; // 限制在1-20范围内,符合实际情况
+    }
+}

+ 27 - 24
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/AlarmDataTransferService.java

@@ -2,23 +2,19 @@ package com.usky.cdi.service.impl;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.alibaba.nacos.shaded.com.google.gson.Gson;
+// import com.alibaba.nacos.shaded.com.google.gson.Gson;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
 import com.usky.cdi.service.util.SnowflakeIdGenerator;
-import com.usky.cdi.service.vo.alarm.AlarmMessage1VO;
-import com.usky.cdi.service.vo.alarm.AlarmMessage2VO;
-import com.usky.cdi.service.vo.alarm.AlarmMessage3VO;
 import com.usky.cdi.service.vo.alarm.AlarmMessageVO;
-import com.usky.cdi.service.vo.base.EngineeringBaseVO;
-import com.usky.cdi.service.vo.base.FloorPlaneVO;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
 import java.util.Date;
 import java.util.HashMap;
 
@@ -31,9 +27,10 @@ import java.util.HashMap;
  */
 @Slf4j
 @Service
-@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
 public class AlarmDataTransferService {
 
+    @Autowired
+    private MqttConnectionTool mqttConnectionTool;
     @Resource
     private MqttOutConfig.MqttGateway mqttGateway;
 
@@ -42,7 +39,7 @@ public class AlarmDataTransferService {
 
     public AlarmDataTransferService() {
         // 使用默认的workerId和datacenterId,实际项目中可以从配置读取
-        this.idGenerator = new SnowflakeIdGenerator(1L, 1L);
+        this.idGenerator = new SnowflakeIdGenerator(3L, 3L);
     }
 
     /**
@@ -72,10 +69,10 @@ public class AlarmDataTransferService {
                 vo.setDataPacketID(generateDataPacketID());
             }
             if (vo.getPublishTime() == null) {
-                vo.setPublishTime(getCurrentTime());
+                vo.setPublishTime(LocalDateTime.now());
             }
 
-            HashMap<String, Object> map = new HashMap<>();
+//            HashMap<String, Object> map = new HashMap<>();
 //            map.put("dataPacketID", vo.getDataPacketID());
 //            map.put("engineeringID", vo.getEngineeringID());
 //            map.put("floor", vo.getFloor());
@@ -86,12 +83,13 @@ public class AlarmDataTransferService {
 //            map.put("filePixHeight", vo.getFilePixHeight());
 //            map.put("floorFile", imageBytes);
 //            map.put("publishTime", vo.getPublishTime());
-            Gson gson = new Gson();
+//            Gson gson = new Gson();
             JSONObject jsonObject = (JSONObject) JSON.toJSON(vo);
             String json = jsonObject.toJSONString();
             System.out.println(json);
+            MqttConnectionTool.MqttGateway IQeIRyXG = mqttConnectionTool.connectOrRefresh("3101070011", "5RqhJ7VG");
             String topic = "alarm/message";
-            mqttGateway.sendToMqtt(topic, json);
+            IQeIRyXG.sendToMqtt(topic, json);
 
             return true;
         } catch (Exception e) {
@@ -107,13 +105,15 @@ public class AlarmDataTransferService {
      * @param vo 楼层平面图信息
      * @return 是否发送成功
      */
-    public boolean sendAlarmMessage1(AlarmMessage1VO vo) {
+    public boolean sendAlarmMessage1(AlarmMessageVO vo) {
+        MqttConnectionTool.MqttGateway IQeIRyXG = mqttConnectionTool.connectOrRefresh("3101070011", "5RqhJ7VG");
+
         try {
             if (vo.getDataPacketID() == null) {
                 vo.setDataPacketID(generateDataPacketID());
             }
             if (vo.getPublishTime() == null) {
-                vo.setPublishTime(getCurrentTime());
+                vo.setPublishTime(LocalDateTime.now());
             }
 
             HashMap<String, Object> map = new HashMap<>();
@@ -127,12 +127,12 @@ public class AlarmDataTransferService {
 //            map.put("filePixHeight", vo.getFilePixHeight());
 //            map.put("floorFile", imageBytes);
 //            map.put("publishTime", vo.getPublishTime());
-            Gson gson = new Gson();
+//             Gson gson = new Gson();
             JSONObject jsonObject = (JSONObject) JSON.toJSON(vo);
             String json = jsonObject.toJSONString();
             System.out.println(json);
             String topic = "alarm/message";
-            mqttGateway.sendToMqtt(topic, json);
+            IQeIRyXG.sendToMqtt(topic, json);
 
             return true;
         } catch (Exception e) {
@@ -141,20 +141,21 @@ public class AlarmDataTransferService {
         }
     }
 
-    public boolean sendAlarmMessage2(AlarmMessage2VO vo) {
+    public boolean sendAlarmMessage2(AlarmMessageVO vo) {
         try {
             if (vo.getDataPacketID() == null) {
                 vo.setDataPacketID(generateDataPacketID());
             }
             if (vo.getPublishTime() == null) {
-                vo.setPublishTime(getCurrentTime());
+                vo.setPublishTime(LocalDateTime.now());
             }
 
             JSONObject jsonObject = (JSONObject) JSON.toJSON(vo);
             String json = jsonObject.toJSONString();
             System.out.println(json);
+            MqttConnectionTool.MqttGateway IQeIRyXG = mqttConnectionTool.connectOrRefresh("3101070011", "5RqhJ7VG");
             String topic = "alarm/message";
-            mqttGateway.sendToMqtt(topic, json);
+            IQeIRyXG.sendToMqtt(topic, json);
 
             return true;
         } catch (Exception e) {
@@ -163,20 +164,22 @@ public class AlarmDataTransferService {
         }
     }
 
-    public boolean sendEngineeringBase(AlarmMessage3VO vo) {
+    public boolean sendEngineeringBase(AlarmMessageVO vo) {
         try {
             if (vo.getDataPacketID() == null) {
                 vo.setDataPacketID(generateDataPacketID());
             }
             if (vo.getPublishTime() == null) {
-                vo.setPublishTime(getCurrentTime());
+                vo.setPublishTime(LocalDateTime.now());
             }
 
+            MqttConnectionTool.MqttGateway IQeIRyXG = mqttConnectionTool.connectOrRefresh("3101070011", "5RqhJ7VG");
+
             JSONObject jsonObject = (JSONObject) JSON.toJSON(vo);
             String json = jsonObject.toJSONString();
             String topic = "alarm/message";
             System.out.println("推送的数据: " + json);
-            mqttGateway.sendToMqtt(topic, json);
+            IQeIRyXG.sendToMqtt(topic, json);
 
             return true;
         } catch (Exception e) {

+ 35 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/BaseAlarmServiceImpl.java

@@ -0,0 +1,35 @@
+package com.usky.cdi.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.nacos.shaded.com.google.gson.Gson;
+import com.usky.cdi.domain.BaseAlarm;
+import com.usky.cdi.mapper.BaseAlarmMapper;
+import com.usky.cdi.service.BaseAlarmService;
+import com.usky.cdi.service.config.mqtt.MqttOutConfig;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
+import com.usky.cdi.service.util.SnowflakeIdGenerator;
+import com.usky.cdi.service.vo.alarm.AlarmMessageVO;
+import com.usky.common.mybatis.core.AbstractCrudService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.util.Date;
+import java.util.HashMap;
+
+/**
+ * <p>
+ * 统一告警表 服务实现类
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-29
+ */
+@Service
+public class BaseAlarmServiceImpl extends AbstractCrudService<BaseAlarmMapper, BaseAlarm> implements BaseAlarmService {
+
+}

+ 2 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/BaseDataTransferService.java

@@ -224,8 +224,8 @@ public class BaseDataTransferService {
             userIdToName.put(709, 15);
             userIdToName.put(710, 16);
             userIdToName.put(711, 2);
-            userIdToName.put(712, 34);
-            userIdToName.put(713, 36);
+            //userIdToName.put(712, 34);
+            //userIdToName.put(713, 36);
             userIdToName.put(714, 37);
 
             HashMap<String, Object> map = new HashMap<>();

+ 213 - 204
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -7,9 +7,9 @@ import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.domain.DmpProduct;
 import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.mapper.DmpProductMapper;
-import com.usky.cdi.service.config.mqtt.MqttBaseConfig;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
 import com.usky.cdi.service.util.DeviceDataQuery;
 import com.usky.cdi.service.util.SnowflakeIdGenerator;
 import com.usky.cdi.service.vo.IotDataTransferVO;
@@ -18,24 +18,24 @@ import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.beans.factory.support.DefaultListableBeanFactory;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.IntegrationFlows;
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.stereotype.Service;
-import org.springframework.web.context.ContextLoader;
 
 import javax.annotation.PostConstruct;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -49,6 +49,9 @@ public class IotDataTransferService {
 
     private MqttOutConfig.MqttGateway mqttGateway;
 
+    @Autowired
+    private MqttConnectionTool mqttConnectionTool;
+
     // 注入ApplicationContext,确保总是能获取到
     @Autowired
     private ApplicationContext context;
@@ -59,6 +62,10 @@ public class IotDataTransferService {
     private static final int KEEP_ALIVE_INTERVAL = 60;
     private static final int COMPLETION_TIMEOUT = 5000;
 
+    // 存储每个任务的MQTT客户端工厂和网关
+    private final Map<String, MqttConnectionTool.MqttGateway> mqttGatewayMap = new ConcurrentHashMap<>();
+    private final Map<String, DefaultMqttPahoClientFactory> mqttClientFactoryMap = new ConcurrentHashMap<>();
+
     private SnowflakeIdGenerator idGenerator;
 
     @Autowired
@@ -110,7 +117,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -154,7 +161,7 @@ public class IotDataTransferService {
                 vo.setDataEndTime(dataEndTime);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息");
+                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
@@ -184,7 +191,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -229,19 +236,19 @@ public class IotDataTransferService {
                 try {
                     switch (deviceType) {
                         case 707:
-                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 708:
-                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 709:
-                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 710:
-                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                         case 711:
-                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
                             break;
                     }
                 } catch (Exception e) {
@@ -278,7 +285,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -316,7 +323,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(0);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况");
+                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
@@ -345,7 +352,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -395,7 +402,7 @@ public class IotDataTransferService {
                         deviceDataItem.getFloat("active_power"));
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况");
+                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
@@ -421,8 +428,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("wd");
         if (value == null) {
             log.warn("设备{}的温度数据为空", deviceId);
@@ -435,7 +443,7 @@ public class IotDataTransferService {
         tempVO.setPublishTime(getCurrentTime());
         tempVO.setSensorValue(value);
         tempVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息", username);
     }
 
     /**
@@ -445,8 +453,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("sd");
         if (value == null) {
             log.warn("设备{}的湿度数据为空", deviceId);
@@ -459,7 +468,7 @@ public class IotDataTransferService {
         humidityVO.setPublishTime(getCurrentTime());
         humidityVO.setSensorValue(value);
         humidityVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息", username);
     }
 
     /**
@@ -469,8 +478,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("o2");
         if (value == null) {
             log.warn("设备{}的氧气浓度数据为空", deviceId);
@@ -483,7 +493,7 @@ public class IotDataTransferService {
         oxygenVO.setPublishTime(getCurrentTime());
         oxygenVO.setSensorValue(value);
         oxygenVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息", username);
     }
 
     /**
@@ -493,8 +503,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("co");
         if (value == null) {
             log.warn("设备{}的一氧化碳浓度数据为空", deviceId);
@@ -507,7 +518,7 @@ public class IotDataTransferService {
         coVO.setPublishTime(getCurrentTime());
         coVO.setSensorValue(value);
         coVO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息", username);
     }
 
     /**
@@ -517,8 +528,9 @@ public class IotDataTransferService {
      * @param deviceId 设备ID
      * @param dataEndTime 数据结束时间
      * @param engineeringID 工程ID
+     * @param username 用户名
      **/
-    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
+    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
         Float value = deviceDataItem.getFloat("co2");
         if (value == null) {
             log.warn("设备{}的二氧化碳浓度数据为空", deviceId);
@@ -531,7 +543,7 @@ public class IotDataTransferService {
         co2VO.setPublishTime(getCurrentTime());
         co2VO.setSensorValue(value);
         co2VO.setDataEndTime(dataEndTime);
-        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
+        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息", username);
     }
 
     /**
@@ -539,144 +551,144 @@ public class IotDataTransferService {
      *
      * @return 推送结果,包含成功数和失败数
      **/
-    public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
-        Map<String, Integer> result = new HashMap<>();
-        result.put("successCount", 0);
-        result.put("failureCount", 0);
-
-        if (!validateMqttGateway()) {
-            return result;
-        }
-
-        try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
-            Integer deviceType = transferVO.getDeviceType();
-            Integer totalDevices = transferVO.getDevices().size();
-
-            log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
-                    deviceType, totalDevices, deviceData.size());
-
-            if (deviceData.isEmpty()) {
-                log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
-                result.put("failureCount", totalDevices);
-                return result;
-            }
-
-            Long engineeringId = transferVO.getEngineeringId();
-            for (JSONObject deviceDataItem : deviceData) {
-                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
-                if (dataEndTime == null) {
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                Integer deviceId = deviceDataItem.getIntValue("device_id");
-                Integer value = deviceDataItem.getInteger("qx");
-                if (value == null) {
-                    log.warn("设备{}的倾斜数据为空", deviceId);
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                TiltVO vo = new TiltVO();
-                vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceId);
-                vo.setEngineeringID(engineeringId);
-                vo.setPublishTime(getCurrentTime());
-                vo.setDataEndTime(dataEndTime);
-                vo.setSensorValue(value == 0 ? 0 : 1);
-
-                try {
-                    sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
-                    result.put("successCount", result.get("successCount") + 1);
-                } catch (Exception e) {
-                    log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
-                    result.put("failureCount", result.get("failureCount") + 1);
-                }
-            }
-
-            log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
-                    deviceType, result.get("successCount"), result.get("failureCount"));
-
-            return result;
-        } catch (Exception e) {
-            log.error("倾斜数据推送发生异常", e);
-            result.put("failureCount", transferVO.getDevices().size());
-            return result;
-        }
-    }
+    // public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
+    //     Map<String, Integer> result = new HashMap<>();
+    //     result.put("successCount", 0);
+    //     result.put("failureCount", 0);
+    //
+    //     if (!validateMqttGateway()) {
+    //         return result;
+    //     }
+    //
+    //     try {
+    //         List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+    //         Integer deviceType = transferVO.getDeviceType();
+    //         Integer totalDevices = transferVO.getDevices().size();
+    //
+    //         log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+    //                 deviceType, totalDevices, deviceData.size());
+    //
+    //         if (deviceData.isEmpty()) {
+    //             log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
+    //             result.put("failureCount", totalDevices);
+    //             return result;
+    //         }
+    //
+    //         Long engineeringId = transferVO.getEngineeringId();
+    //         for (JSONObject deviceDataItem : deviceData) {
+    //             LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+    //             if (dataEndTime == null) {
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             Integer deviceId = deviceDataItem.getIntValue("device_id");
+    //             Integer value = deviceDataItem.getInteger("qx");
+    //             if (value == null) {
+    //                 log.warn("设备{}的倾斜数据为空", deviceId);
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             TiltVO vo = new TiltVO();
+    //             vo.setDataPacketID(generateDataPacketID());
+    //             vo.setSensorID(deviceId);
+    //             vo.setEngineeringID(engineeringId);
+    //             vo.setPublishTime(getCurrentTime());
+    //             vo.setDataEndTime(dataEndTime);
+    //             vo.setSensorValue(value == 0 ? 0 : 1);
+    //
+    //             try {
+    //                 sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
+    //                 result.put("successCount", result.get("successCount") + 1);
+    //             } catch (Exception e) {
+    //                 log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //             }
+    //         }
+    //
+    //         log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
+    //                 deviceType, result.get("successCount"), result.get("failureCount"));
+    //
+    //         return result;
+    //     } catch (Exception e) {
+    //         log.error("倾斜数据推送发生异常", e);
+    //         result.put("failureCount", transferVO.getDevices().size());
+    //         return result;
+    //     }
+    // }
 
     /**
      * 发送裂缝数据(713)
      *
      * @return 推送结果,包含成功数和失败数
      **/
-    public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
-        Map<String, Integer> result = new HashMap<>();
-        result.put("successCount", 0);
-        result.put("failureCount", 0);
-
-        if (!validateMqttGateway()) {
-            return result;
-        }
-
-        try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
-            Integer deviceType = transferVO.getDeviceType();
-            Integer totalDevices = transferVO.getDevices().size();
-
-            log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
-                    deviceType, totalDevices, deviceData.size());
-
-            if (deviceData.isEmpty()) {
-                log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
-                result.put("failureCount", totalDevices);
-                return result;
-            }
-
-            Long engineeringId = transferVO.getEngineeringId();
-            for (JSONObject deviceDataItem : deviceData) {
-                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
-                if (dataEndTime == null) {
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                Integer deviceId = deviceDataItem.getIntValue("device_id");
-                Integer value = deviceDataItem.getInteger("cd");
-                if (value == null) {
-                    log.warn("设备{}的裂缝数据为空", deviceId);
-                    result.put("failureCount", result.get("failureCount") + 1);
-                    continue;
-                }
-
-                CrackVO vo = new CrackVO();
-                vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceId);
-                vo.setEngineeringID(engineeringId);
-                vo.setPublishTime(getCurrentTime());
-                vo.setDataEndTime(dataEndTime);
-                vo.setSensorValue(value == 0 ? 0 : 1);
-
-                try {
-                    sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
-                    result.put("successCount", result.get("successCount") + 1);
-                } catch (Exception e) {
-                    log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
-                    result.put("failureCount", result.get("failureCount") + 1);
-                }
-            }
-
-            log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
-                    deviceType, result.get("successCount"), result.get("failureCount"));
-
-            return result;
-        } catch (Exception e) {
-            log.error("裂缝数据推送发生异常", e);
-            result.put("failureCount", transferVO.getDevices().size());
-            return result;
-        }
-    }
+    // public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
+    //     Map<String, Integer> result = new HashMap<>();
+    //     result.put("successCount", 0);
+    //     result.put("failureCount", 0);
+    //
+    //     if (!validateMqttGateway()) {
+    //         return result;
+    //     }
+    //
+    //     try {
+    //         List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+    //         Integer deviceType = transferVO.getDeviceType();
+    //         Integer totalDevices = transferVO.getDevices().size();
+    //
+    //         log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+    //                 deviceType, totalDevices, deviceData.size());
+    //
+    //         if (deviceData.isEmpty()) {
+    //             log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
+    //             result.put("failureCount", totalDevices);
+    //             return result;
+    //         }
+    //
+    //         Long engineeringId = transferVO.getEngineeringId();
+    //         for (JSONObject deviceDataItem : deviceData) {
+    //             LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+    //             if (dataEndTime == null) {
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             Integer deviceId = deviceDataItem.getIntValue("device_id");
+    //             Integer value = deviceDataItem.getInteger("cd");
+    //             if (value == null) {
+    //                 log.warn("设备{}的裂缝数据为空", deviceId);
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //                 continue;
+    //             }
+    //
+    //             CrackVO vo = new CrackVO();
+    //             vo.setDataPacketID(generateDataPacketID());
+    //             vo.setSensorID(deviceId);
+    //             vo.setEngineeringID(engineeringId);
+    //             vo.setPublishTime(getCurrentTime());
+    //             vo.setDataEndTime(dataEndTime);
+    //             vo.setSensorValue(value == 0 ? 0 : 1);
+    //
+    //             try {
+    //                 sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
+    //                 result.put("successCount", result.get("successCount") + 1);
+    //             } catch (Exception e) {
+    //                 log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
+    //                 result.put("failureCount", result.get("failureCount") + 1);
+    //             }
+    //         }
+    //
+    //         log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
+    //                 deviceType, result.get("successCount"), result.get("failureCount"));
+    //
+    //         return result;
+    //     } catch (Exception e) {
+    //         log.error("裂缝数据推送发生异常", e);
+    //         result.put("failureCount", transferVO.getDevices().size());
+    //         return result;
+    //     }
+    // }
 
     /**
      * 发送位移数据(714)
@@ -688,7 +700,7 @@ public class IotDataTransferService {
         result.put("successCount", 0);
         result.put("failureCount", 0);
 
-        if (!validateMqttGateway()) {
+        if (!validateMqttGateway(transferVO.getUsername())) {
             return result;
         }
 
@@ -731,7 +743,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(value == 0 ? 0 : 1);
 
                 try {
-                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息");
+                    sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息", transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
@@ -758,6 +770,7 @@ public class IotDataTransferService {
      * @param password MQTT密码
      */
     public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        log.info("用户名:{},密码:{}", username, password);
         // 参数校验
         if (engineeringId == null || username == null || password == null) {
             log.error("工程ID、MQTT用户名或密码不能为空");
@@ -789,6 +802,7 @@ public class IotDataTransferService {
             transferVO.setDeviceType(deviceType);
             transferVO.setDevices(devices);
             transferVO.setEngineeringId(engineeringId);
+            transferVO.setUsername(username); // 保存当前任务的用户名
             transferList.add(transferVO);
         });
 
@@ -839,12 +853,12 @@ public class IotDataTransferService {
                 case 704:
                     result = sendElectricityLoad(transferVO);
                     break;
-                case 712:
-                    result = sendTiltData(transferVO);
-                    break;
-                case 713:
-                    result = sendCrackData(transferVO);
-                    break;
+                // case 712:
+                //     result = sendTiltData(transferVO);
+                //     break;
+                // case 713:
+                //     result = sendCrackData(transferVO);
+                //     break;
                 case 714:
                     result = sendDeviationData(transferVO);
                     break;
@@ -899,43 +913,25 @@ public class IotDataTransferService {
     }
 
     /**
-     * 手动创建MQTT连接
+     * 手动创建/刷新 MQTT 连接(含动态 clientId)
      * @param username MQTT用户名
      * @param password MQTT密码
      */
-    public void createMqttConnection(String username, String password) {
+    public synchronized void createMqttConnection(String username, String password) {
+        log.info("手动创建/刷新 MQTT 连接(含动态 clientId),用户名:{},密码:{}", username, password);
         try {
-            // 使用注入的ApplicationContext获取已有的mqttGateway实例
-            // 因为我们保留了@MessagingGateway注解,Spring会自动创建这个实例
-            if (this.context == null) {
-                throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
+            // 检查MqttConnectionTool是否已注入
+            if (this.mqttConnectionTool == null) {
+                throw new IllegalStateException("MqttConnectionTool未注入,无法获取MQTT Gateway");
             }
 
-            // 1. 获取mqttGateway实例
-            this.mqttGateway = this.context.getBean(MqttOutConfig.MqttGateway.class);
-            if (this.mqttGateway == null) {
-                throw new IllegalStateException("MQTT Gateway未找到,无法发送消息");
-            }
+            // 使用MqttConnectionTool创建或刷新MQTT连接
+            MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(username, password);
 
-            // 2. 获取现有的mqttClientFactory实例
-            DefaultMqttPahoClientFactory mqttClientFactory = this.context.getBean(DefaultMqttPahoClientFactory.class);
-            if (mqttClientFactory == null) {
-                throw new IllegalStateException("MQTT Client Factory未找到,无法创建MQTT连接");
-            }
-
-            // 3. 创建并配置MqttConnectOptions
-            MqttConnectOptions options = new MqttConnectOptions();
-            options.setServerURIs(new String[]{MQTT_URL});
-            options.setUserName(username);
-            options.setPassword(password.toCharArray());
-            options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-
-            // 4. 更新mqttClientFactory的连接选项
-            mqttClientFactory.setConnectionOptions(options);
-
-            log.info("MQTT Gateway初始化成功,用户名:{}", username);
-            log.info("MQTT连接配置完成,服务器地址:{},客户端ID:mqttx-{}", MQTT_URL, username);
-        } catch (Exception e) {
+            // 存储到映射中
+            mqttGatewayMap.put(username, gateway);
+            log.info("MQTT连接创建/刷新成功,用户名:{}", username);
+            } catch (Exception e) {
             log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
             throw new RuntimeException("初始化MQTT连接失败", e);
         }
@@ -943,11 +939,18 @@ public class IotDataTransferService {
 
     /**
      * 验证MQTT网关是否初始化
+     * @param username 用户名
      * @return 是否初始化
      */
-    private boolean validateMqttGateway() {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
+    private boolean validateMqttGateway(String username) {
+        if (username == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息,用户名:null");
+            return false;
+        }
+        // 一次性获取网关实例,避免竞态条件
+        MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
+        if (gateway == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息,用户名:{}", username);
             return false;
         }
         return true;
@@ -973,12 +976,18 @@ public class IotDataTransferService {
      * @param topicEnum 主题枚举
      * @param vo 消息对象
      * @param messageType 消息类型描述
+     * @param username 用户名
      */
-    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType) {
+    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType, String username) {
         String json = JSON.toJSONString(vo);
         String topic = topicEnum.getTopic();
         // 不再记录每条数据的详情,只记录发送操作
-        mqttGateway.sendToMqtt(topic, json);
+        MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
+        if (gateway != null) {
+            gateway.sendToMqtt(topic, json);
+        } else {
+            log.warn("MQTT Gateway未找到,无法发送消息,用户名:{}", username);
+        }
     }
 
     public void allData(Long engineeringId, String username, String password) {

+ 230 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/mqtt/MqttConnectionTool.java

@@ -0,0 +1,230 @@
+package com.usky.cdi.service.mqtt;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.endpoint.EventDrivenConsumer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.SubscribableChannel;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/22
+ * 动态 MQTT 连接工具类
+ * 用法:注入后调用 connectOrRefresh(...) 即可
+ *
+ */
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class MqttConnectionTool {
+
+    private final GenericApplicationContext context;
+
+    /* 默认 topic,可外部再 set */
+    private String defaultTopic = "testTopic";
+
+    /* 默认 keep-alive,可外部再 set */
+    private int keepAlive = 60;
+
+    private static final String MQTT_URL = "ssl://114.80.201.143:8883";
+
+    /**
+     * 存储每个用户名对应的连接信息
+     */
+    private final Map<String, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
+
+    /**
+     * 连接信息内部类
+     */
+    private static class ConnectionInfo {
+        private final String handlerBeanName;
+        private final String consumerBeanName;
+        private final String factoryBeanName;
+        private final String gatewayBeanName;
+
+        public ConnectionInfo(String username) {
+            this.handlerBeanName = "mqttHandler_" + username;
+            this.consumerBeanName = "mqttConsumer_" + username;
+            this.factoryBeanName = "mqttFactory_" + username;
+            this.gatewayBeanName = "mqttGateway_" + username;
+        }
+    }
+
+    /**
+     * 一键创建/刷新连接
+     *
+     * @param username  用户名
+     * @param password  密码
+     * @return 可直接发消息的 MqttGateway
+     */
+    public synchronized MqttGateway connectOrRefresh(String username, String password) {
+        Assert.notNull(username, "username cannot be null");
+        Assert.notNull(password, "password cannot be null");
+
+        String clientId = "mqttx-" + username;
+        try {
+            /* 1. 获取或创建连接信息 */
+            ConnectionInfo connectionInfo = connectionMap.computeIfAbsent(username, ConnectionInfo::new);
+
+            /* 2. 创建或更新专属工厂 */
+            DefaultMqttPahoClientFactory factory;
+            if (context.containsBean(connectionInfo.factoryBeanName)) {
+                factory = context.getBean(connectionInfo.factoryBeanName, DefaultMqttPahoClientFactory.class);
+                factory.setConnectionOptions(buildOptions(username, password, MQTT_URL));
+                log.info("已更新 MQTT 客户端工厂 -> {}", connectionInfo.factoryBeanName);
+            } else {
+                factory = new DefaultMqttPahoClientFactory();
+                factory.setConnectionOptions(buildOptions(username, password, MQTT_URL));
+                context.registerBean(connectionInfo.factoryBeanName, DefaultMqttPahoClientFactory.class, () -> factory);
+                log.info("已创建 MQTT 客户端工厂 -> {}", connectionInfo.factoryBeanName);
+            }
+
+            /* 3. 移除旧的 Handler 和 Consumer */
+            removeOldConnection(connectionInfo);
+
+            /* 4. 创建新的 Handler */
+            MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, factory);
+            handler.setAsync(true);
+            handler.setDefaultTopic(defaultTopic);
+            handler.afterPropertiesSet();
+
+            /* 5. 注册新的 Handler */
+            context.registerBean(connectionInfo.handlerBeanName, MqttPahoMessageHandler.class, () -> handler);
+
+            /* 6. 创建并注册新的专属网关 */
+            // 创建一个简单的Gateway实现,直接使用Handler发送消息
+            MqttGateway gateway = new MqttGateway() {
+                @Override
+                public void sendToMqtt(String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, defaultTopic)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+
+                @Override
+                public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, topic)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+
+                @Override
+                public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
+                                        @Header(MqttHeaders.QOS) int qos, String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, topic)
+                                .setHeader(MqttHeaders.QOS, qos)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+            };
+
+            log.info("MQTT 连接刷新完成 -> {} / {}", username, clientId);
+            return gateway;
+        } catch (Exception e) {
+            log.error("MQTT 连接失败 -> {}", clientId, e);
+            throw new RuntimeException("MQTT 连接失败", e);
+        }
+    }
+
+    /* ---------- 私有辅助 ---------- */
+
+    private org.eclipse.paho.client.mqttv3.MqttConnectOptions
+    buildOptions(String u, String p, String url) {
+        org.eclipse.paho.client.mqttv3.MqttConnectOptions opt = 
+                new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
+        opt.setServerURIs(new String[]{url});
+        opt.setUserName(u);
+        if (p != null) opt.setPassword(p.toCharArray());
+        opt.setKeepAliveInterval(keepAlive);
+        return opt;
+    }
+
+    /**
+     * 移除旧的连接实例
+     * @param connectionInfo 连接信息
+     */
+    private void removeOldConnection(ConnectionInfo connectionInfo) {
+        // 移除旧的 Handler
+        if (context.containsBeanDefinition(connectionInfo.handlerBeanName)) {
+            try {
+                MqttPahoMessageHandler oldHandler = context.getBean(connectionInfo.handlerBeanName, MqttPahoMessageHandler.class);
+                oldHandler.stop();
+            } catch (Exception e) {
+                log.warn("停止旧的MQTT处理器时出错: {}", e.getMessage(), e);
+            }
+            context.removeBeanDefinition(connectionInfo.handlerBeanName);
+            log.info("已移除旧的 MQTT 处理器 -> {}", connectionInfo.handlerBeanName);
+        }
+        
+        // 从单例缓存中移除旧的 Handler
+        if (context.getDefaultListableBeanFactory().containsSingleton(connectionInfo.handlerBeanName)) {
+            context.getDefaultListableBeanFactory().destroySingleton(connectionInfo.handlerBeanName);
+        }
+
+        // 移除旧的 Factory
+        if (context.containsBeanDefinition(connectionInfo.factoryBeanName)) {
+            context.removeBeanDefinition(connectionInfo.factoryBeanName);
+            log.info("已移除旧的 MQTT 工厂 -> {}", connectionInfo.factoryBeanName);
+        }
+        
+        // 从单例缓存中移除旧的 Factory
+        if (context.getDefaultListableBeanFactory().containsSingleton(connectionInfo.factoryBeanName)) {
+            context.getDefaultListableBeanFactory().destroySingleton(connectionInfo.factoryBeanName);
+        }
+    }
+
+    /* ---------- 对外可调用的 setter ---------- */
+
+    public MqttConnectionTool defaultTopic(String topic) {
+        this.defaultTopic = topic;
+        return this;
+    }
+
+    public MqttConnectionTool keepAlive(int keepAlive) {
+        this.keepAlive = keepAlive;
+        return this;
+    }
+
+    /* ---------- 复用原来的 Gateway 接口 ---------- */
+    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+    public interface MqttGateway {
+        void sendToMqtt(String payload);
+
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
+                        @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 42 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceAlarmDataSyncService.java

@@ -0,0 +1,42 @@
+package com.usky.cdi.service.util;
+
+import com.usky.cdi.service.impl.AlarmDataSyncService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * 设备数据同步组件(包含定时任务)
+ */
+@Slf4j
+@Component
+public class DeviceAlarmDataSyncService {
+
+    private final AlarmDataSyncService alarmDataSyncService;
+
+    @Autowired
+    public DeviceAlarmDataSyncService(AlarmDataSyncService alarmDataSyncService) {
+        this.alarmDataSyncService = alarmDataSyncService;
+    }
+
+    /**
+     * 定时任务:定期执行设备数据同步上报
+     * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
+     * initialDelay:初始化后立即执行第一次任务
+     */
+    // @Scheduled(fixedDelay = 1 * 60 * 1000, initialDelay = 0)
+    // public void scheduledDeviceDataSync() {
+    //     Integer tenantId = 1205;
+    //     Long engineeringId = 3101070011L;
+    //     String username = "3101070011";
+    //     String password = "5RqhJ7VG";
+    //     log.info("开始执行桃浦象屿人防设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+    //
+    //     try {
+    //         alarmDataSyncService.synchronizeAlarmData(tenantId, engineeringId, username, password, "peacetime");
+    //     } catch (Exception e) {
+    //         log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
+    //     }
+    // }
+}

+ 6 - 6
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,13 +26,13 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    // @Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
+    // @Scheduled(fixedDelay = 26 * 60 * 1000, initialDelay = 0)
     // public void scheduledDeviceDataSync() {
-    //     Integer tenantId = 1208;
-    //     Long engineeringId = 3101130019L;
-    //     String username = "3101130019";
-    //     String password = "ptrEQZK2";
-    //     log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+    //     Integer tenantId = 1205;
+    //     Long engineeringId = 3101070011L;
+    //     String username = "3101070011";
+    //     String password = "5RqhJ7VG";
+    //     log.info("开始执行桃浦象屿人防设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
     //
     //     try {
     //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);

+ 117 - 86
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/WeatherFetcher.java

@@ -2,7 +2,7 @@ package com.usky.cdi.service.util;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
@@ -33,107 +33,138 @@ public class WeatherFetcher {
     private static final double DEFAULT_TEMPERATURE = 15.0;
     private static final int DEFAULT_HUMIDITY = 40;
 
+    // 异步执行的线程池
+    private static final ExecutorService WEATHER_FETCHER_POOL = Executors.newFixedThreadPool(3);
+    private static final int ASYNC_TIMEOUT = 5000; // 异步调用超时时间:5秒
+
     /**
      * 获取天气数据
      * 1. 优先使用缓存数据(如果缓存有效)
-     * 2. 缓存无效时,调用API获取新数据
-     * 3. API调用失败时,使用默认值
+     * 2. 缓存无效时,异步调用API获取新数据
+     * 3. API调用失败或超时,使用默认值
      *
      * @return 包含温度和湿度的Map
      */
     public static Map<String, Double> fetchWeather() {
         // 1. 检查缓存是否有效
         long currentTime = System.currentTimeMillis();
-        if (!weatherCache.isEmpty() && (currentTime - lastUpdateTime) < CACHE_EXPIRE_TIME) {
-            log.debug("使用缓存的天气数据,温度:{}°C,湿度:{}%",
-                    weatherCache.get("temperature"), weatherCache.get("humidity"));
-            return new HashMap<>(weatherCache);
+        synchronized (WeatherFetcher.class) {
+            if (!weatherCache.isEmpty() && (currentTime - lastUpdateTime) < CACHE_EXPIRE_TIME) {
+                log.debug("使用缓存的天气数据,温度:{}°C,湿度:{}%",
+                        weatherCache.get("temperature"), weatherCache.get("humidity"));
+                return new HashMap<>(weatherCache);
+            }
+        }
+
+        // 2. 如果缓存无效,返回当前缓存值(如果有)或默认值
+        Map<String, Double> fallbackData = new HashMap<>();
+        synchronized (WeatherFetcher.class) {
+            if (!weatherCache.isEmpty()) {
+                fallbackData.putAll(weatherCache);
+                log.debug("缓存已过期,使用过期缓存作为临时数据");
+            } else {
+                fallbackData.put("temperature", DEFAULT_TEMPERATURE);
+                fallbackData.put("humidity", (double) DEFAULT_HUMIDITY);
+                log.debug("缓存为空,使用默认天气数据作为临时数据");
+            }
         }
 
-        double tempCelsius = DEFAULT_TEMPERATURE;
-        int humidity = DEFAULT_HUMIDITY;
+        // 3. 异步调用API更新缓存,不阻塞主线程
+        CompletableFuture.supplyAsync(() -> {
+            long asyncStartTime = System.currentTimeMillis();
+            double tempCelsius = DEFAULT_TEMPERATURE;
+            int humidity = DEFAULT_HUMIDITY;
+
+            try {
+                log.info("开始异步调用OpenWeatherMap API获取天气数据");
+                // 1. 构造请求URL
+                URL url = new URL(API_URL);
+
+                // 2. 建立连接并发送请求
+                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+                conn.setRequestMethod("GET");
+                conn.setConnectTimeout(10000); // 减少超时时间到10秒
+                conn.setReadTimeout(10000);
+
+                // 3. 读取响应
+                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+                StringBuilder response = new StringBuilder();
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    response.append(line);
+                }
+                reader.close();
+
+                // 4. 解析JSON数据(使用org.json库)
+                JSONObject jsonResponse = new JSONObject(response.toString());
+                JSONObject main = jsonResponse.getJSONObject("main");
+
+                // 注意:温度默认是开尔文单位,转换为摄氏度需要 -273.15
+                double tempKelvin = main.getDouble("temp");
+                tempCelsius = tempKelvin - 273.15;
+                humidity = main.getInt("humidity");
+                double feelsLikeKelvin = main.getDouble("feels_like");
+                double feelsLikeCelsius = feelsLikeKelvin - 273.15;
+
+                // 提取天气状况描述
+                JSONObject weather = jsonResponse.getJSONArray("weather").getJSONObject(0);
+                String description = weather.getString("description");
+
+                // 记录日志
+                log.info("=== 天气解析结果 ===");
+                log.info("城市: {}", jsonResponse.getString("name"));
+                log.info("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
+                log.info("体感温度: {:.2f}°C", feelsLikeCelsius);
+                log.info("湿度: {}%", humidity);
+                log.info("天气状况: {}", description);
+                log.info("时区偏移: {}小时", (jsonResponse.getInt("timezone") / 3600));
+
+                // 检查是否包含臭氧数据
+                if (jsonResponse.has("air_quality") || jsonResponse.has("o3") || jsonResponse.has("components")) {
+                    log.info("包含空气质量数据");
+                } else {
+                    log.info("当前数据不包含臭氧浓度等空气质量指标");
+                }
+
+                // 更新缓存
+                synchronized (WeatherFetcher.class) {
+                    weatherCache.clear();
+                    weatherCache.put("temperature", tempCelsius);
+                    weatherCache.put("humidity", (double) humidity);
+                    lastUpdateTime = System.currentTimeMillis();
+                    log.info("天气数据缓存更新成功");
+                }
+
+            } catch (Exception e) {
+                log.error("异步获取天气数据失败:{}", e.getMessage());
+                // 异常时使用默认值,但不更新缓存
+            } finally {
+                // 打印API调用结束时间和时长
+                long asyncEndTime = System.currentTimeMillis();
+                long duration = asyncEndTime - asyncStartTime;
+                log.info("OpenWeatherMap API异步调用结束,时长: {}ms", duration);
+            }
 
-        log.debug("开始调用OpenWeatherMap API获取天气数据");
-        long startTime = System.currentTimeMillis();
+            return null;
+        }, WEATHER_FETCHER_POOL);
 
+        // 立即返回临时数据,不等待异步调用完成
+        return fallbackData;
+    }
+
+    /**
+     * 关闭线程池(仅用于测试或应用关闭时)
+     */
+    public static void shutdown() {
+        WEATHER_FETCHER_POOL.shutdown();
         try {
-            // 1. 构造请求URL
-            URL url = new URL(API_URL);
-
-            // 2. 建立连接并发送请求
-            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-            conn.setRequestMethod("GET");
-            conn.setConnectTimeout(15000);
-            conn.setReadTimeout(15000);
-
-            // 3. 读取响应
-            BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
-            StringBuilder response = new StringBuilder();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                response.append(line);
+            if (!WEATHER_FETCHER_POOL.awaitTermination(5, TimeUnit.SECONDS)) {
+                WEATHER_FETCHER_POOL.shutdownNow();
             }
-            reader.close();
-
-            // 4. 解析JSON数据(使用org.json库)
-            JSONObject jsonResponse = new JSONObject(response.toString());
-            JSONObject main = jsonResponse.getJSONObject("main");
-
-            // 提取基础信息
-            String cityName = jsonResponse.getString("name");
-            int timezoneOffset = jsonResponse.getInt("timezone");
-
-            // 注意:温度默认是开尔文单位,转换为摄氏度需要 -273.15
-            double tempKelvin = main.getDouble("temp");
-            tempCelsius = tempKelvin - 273.15;
-            humidity = main.getInt("humidity");
-            double feelsLikeKelvin = main.getDouble("feels_like");
-            double feelsLikeCelsius = feelsLikeKelvin - 273.15;
-
-            // 提取天气状况描述
-            JSONObject weather = jsonResponse.getJSONArray("weather").getJSONObject(0);
-            String description = weather.getString("description");
-
-            // 记录日志,不输出到控制台
-            log.debug("=== 天气解析结果 ===");
-            log.debug("城市: {}", cityName);
-            log.debug("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
-            System.out.println("体感温度: {:.2f}°C (原始: {}K)" + feelsLikeCelsius + "==========" + feelsLikeKelvin);
-            log.debug("体感温度: {:.2f}°C", feelsLikeCelsius);
-            log.debug("湿度: {}%", humidity);
-            log.debug("天气状况: {}", description);
-            log.debug("时区偏移: {}小时", (timezoneOffset / 3600));
-
-            // 检查是否包含臭氧数据
-            if (jsonResponse.has("air_quality") || jsonResponse.has("o3") || jsonResponse.has("components")) {
-                log.debug("包含空气质量数据");
-            } else {
-                log.debug("当前数据不包含臭氧浓度等空气质量指标");
-            }
-
-            // 更新缓存
-            weatherCache.clear();
-            weatherCache.put("temperature", tempCelsius);
-            weatherCache.put("humidity", (double) humidity);
-            lastUpdateTime = currentTime;
-            log.debug("天气数据缓存更新成功");
-
-        } catch (Exception e) {
-            log.error("获取天气数据失败:{}", e.getMessage());
-            // 异常时使用默认值
-            log.warn("使用默认天气数据,温度:{}°C,湿度:{}%", DEFAULT_TEMPERATURE, DEFAULT_HUMIDITY);
-        } finally {
-            // 打印API调用结束时间和时长
-            long endTime = System.currentTimeMillis();
-            long duration = endTime - startTime;
-            System.out.println("第三方天气API调用时长" + duration + "毫秒");
-            log.info("OpenWeatherMap API调用结束,开始时间: {}, 结束时间: {}, 时长: {}ms", startTime, endTime, duration);
+        } catch (InterruptedException e) {
+            WEATHER_FETCHER_POOL.shutdownNow();
+            Thread.currentThread().interrupt();
         }
-
-        Map<String, Double> resultMap = new HashMap<>();
-        resultMap.put("temperature", tempCelsius);
-        resultMap.put("humidity", (double) humidity);
-        return resultMap;
     }
 
     public static void main(String[] args) {

+ 5 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/IotDataTransferVO.java

@@ -25,4 +25,9 @@ public class IotDataTransferVO {
      * 产品ID
      */
     private Integer deviceType;
+    
+    /**
+     * MQTT用户名
+     */
+    private String username;
 }

+ 0 - 69
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessage1VO.java

@@ -1,69 +0,0 @@
-package com.usky.cdi.service.vo.alarm;
-
-import lombok.Data;
-
-@Data
-public class AlarmMessage1VO {
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 数据包ID
-     */
-    private Long dataPacketID;
-
-    /**
-     * 人防工程ID
-     */
-    private Long engineeringID;
-
-    /**
-     * 事件ID
-     */
-    private Integer alarmID;
-
-    /**
-     * 事件来源
-     */
-    private Integer alarmSource;
-
-    /**
-     * 物联设施ID
-     */
-    private Integer sensorID;
-
-    /**
-     * 事件类型
-     */
-    private String alarmType;
-
-    /**
-     * 事件状态
-     */
-    private Integer alarmStatus;
-
-    /**
-     * 最新水浸状态
-     */
-    private Integer sensorValue;
-
-    /**
-     * 事件发生/更新时间
-     */
-    private String alarmUpdateTime;
-
-    /**
-     * 监测对象编号
-     */
-    private String monitorObjNo;
-
-    /**
-     * 事件描述
-     */
-    private String alarmDesc;
-
-    /**
-     * 上报时间
-     */
-    private String publishTime;
-
-}

+ 0 - 69
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessage2VO.java

@@ -1,69 +0,0 @@
-package com.usky.cdi.service.vo.alarm;
-
-import lombok.Data;
-
-@Data
-public class AlarmMessage2VO {
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 数据包ID
-     */
-    private Long dataPacketID;
-
-    /**
-     * 人防工程ID
-     */
-    private Long engineeringID;
-
-    /**
-     * 事件ID
-     */
-    private Integer alarmID;
-
-    /**
-     * 事件来源
-     */
-    private Integer alarmSource;
-
-    /**
-     * 防护单元
-     */
-    private String unitName;
-
-    /**
-     * 事件类型
-     */
-    private String alarmType;
-
-    /**
-     * 事件状态
-     */
-    private Integer alarmStatus;
-
-    /**
-     * 倾斜、位移、裂缝监测结果
-     */
-    private Double sensorValue;
-
-    /**
-     * 事件发生/更新时间
-     */
-    private String alarmUpdateTime;
-
-    /**
-     * 监测对象编号
-     */
-    private String monitorObjNo;
-
-    /**
-     * 事件描述
-     */
-    private String alarmDesc;
-
-    /**
-     * 上报时间
-     */
-    private String publishTime;
-
-}

+ 0 - 79
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessage3VO.java

@@ -1,79 +0,0 @@
-package com.usky.cdi.service.vo.alarm;
-
-import lombok.Data;
-
-@Data
-public class AlarmMessage3VO {
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 数据包ID
-     */
-    private Long dataPacketID;
-
-    /**
-     * 人防工程ID
-     */
-    private Long engineeringID;
-
-    /**
-     * 事件ID
-     */
-    private Integer alarmID;
-
-    /**
-     * 事件来源
-     */
-    private Integer alarmSource;
-
-    /**
-     * 物联设施ID
-     */
-    private Integer sensorID;
-
-    /**
-     * 事件类型
-     */
-    private String alarmType;
-
-    /**
-     * 事件状态
-     */
-    private Integer alarmStatus;
-
-    /**
-     * 告警时的电流
-     */
-    private Double sensorValue;
-
-    /**
-     * 告警阈值
-     */
-    private Double thresholding;
-
-    /**
-     * 线缆序号
-     */
-    private Integer lineNo;
-
-    /**
-     * 事件发生/更新时间
-     */
-    private String alarmUpdateTime;
-
-    /**
-     * 监测对象编号
-     */
-    private String monitorObjNo;
-
-    /**
-     * 事件描述
-     */
-    private String alarmDesc;
-
-    /**
-     * 上报时间
-     */
-    private String publishTime;
-
-}

+ 107 - 35
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/alarm/AlarmMessageVO.java

@@ -1,74 +1,146 @@
 package com.usky.cdi.service.vo.alarm;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import lombok.Data;
 
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+/**
+ *
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/29
+ */
+
 @Data
-public class AlarmMessageVO {
+@JsonInclude(JsonInclude.Include.NON_NULL)   // 值为 null 的字段不序列化,向前兼容
+public class AlarmMessageVO<T extends Number> implements Serializable {
+
     private static final long serialVersionUID = 1L;
 
+    /* ========== 所有告警公有字段 ========== */
     /**
-     * 数据包ID
-     */
+     * 数据包ID 必填
+     * Int 15
+     **/
     private Long dataPacketID;
 
     /**
-     * 人防工程ID
-     */
+     * 人防工程ID 必填
+     * Int 10
+     **/
     private Long engineeringID;
 
     /**
-     * 事件ID
-     */
+     * 告警事件ID 必填
+     * Int 15
+     **/
     private Integer alarmID;
 
     /**
-     * 事件来源
-     */
+     * 事件来源 必填
+     * 1	人防工程级物联平台自动发现
+     * 2	人工巡查发现
+     **/
     private Integer alarmSource;
 
     /**
-     * 物联设施ID
-     */
+     * 物联设施ID 非必填
+     * Int 8
+     **/
     private Integer sensorID;
 
     /**
-     * 事件类型
-     */
+     * 告警类型 必填
+     * 10.1	水浸告警
+     * 12.1	空气温度偏高告警
+     * 13.1	空气湿度偏高告警
+     * 14.1	氧气浓度偏低告警
+     * 15.1	二氧化碳浓度偏高告警
+     * 16.1	一氧化碳浓度偏高告警
+     * 29.1	剩余电流偏大告警
+     * 29.2	供电电缆温度偏高告警
+     * 29.3	电流偏大
+     * 33.1	人员闯入告警
+     * 34.1	监测位置有倾斜告警
+     * 35.1	监测位置有倾斜告警
+     * 36.1	监测位置有裂缝告警
+     * 37.1	监测位置有位移告警
+     **/
     private String alarmType;
 
     /**
-     * 事件状态
-     */
+     * 告警状态 必填
+     * 平时阶段
+     * 1	未响应	新发生的告警事件的初始状态
+     * 2	已忽略	特殊情况下可以忽略本次告警事件
+     * 3	已派单	该事件以工单的形式派人进行处置
+     * 4	已办结	经人工处置后,告警事件消除
+     * 5	已消失	异常情况自动消失,系统自动消除告警事件
+     * 战时阶段
+     * 1	待处置	系统发现异常情况并上报告警事件
+     * 2	处置中	管理人员已知晓告警事件,并派人进行处
+     * 3	已处置	经过人工处置,已解决告警事件
+     * 4    已消除	未经过人工处置,但告警事件已消失(此种状态下系统会自动消除告警事件)
+     * 5	已忽略	特殊情况下可以忽略本次告警事件
+     **/
     private Integer alarmStatus;
 
     /**
-     * 最新水浸状态
-     */
-    private Double sensorValue;
-
-    /**
-     * 告警阈值
-     */
-    private Double thresholding;
-
-    /**
-     * 事件发生/更新时间
-     */
+     * 告警时间 必填
+     * 查询告警表 base_alarm 时新增告警取 alarm_type 字段,更新数据则取 handle_time 字段
+     * 时间型(带毫秒),格式为 yyyy-MM-DD hh :mm :ss.SSS
+     **/
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
     private String alarmUpdateTime;
 
     /**
-     * 监测对象编号
-     */
+     * 监测对象编号 非必填
+     * Str 50
+     **/
     private String monitorObjNo;
 
     /**
-     * 事件描述
-     */
+     * 告警描述 必填
+     * Str 200
+     **/
     private String alarmDesc;
 
     /**
-     * 上报时间
-     */
-    private String publishTime;
-
+     * 上报时间 必填
+     * 获取当前时间
+     * 时间型(带毫秒),格式为 yyyy-MM-DD hh :mm :ss.SSS
+     **/
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
+    private LocalDateTime publishTime;
+
+    /** 告警数据字段 必填、通用
+     * 水浸(Integer) 0:无水、1:有水
+     * 倾斜/位移/裂缝(Integer) 0:无位移、1:有位移
+     * 人员闯入告警 0:无闯入、1:有闯入
+     * 电流(Float)
+     **/
+    private String sensorValue;
+
+    /* ========== 私有字段 ========== */
+
+    /** 倾斜才用
+     * 单元名称 必填
+     * Str 30
+     **/
+    private String unitName;
+
+    /** 氧气、温湿度、一氧化碳、二氧化碳、剩余电流、供电电缆温度、电流偏大专用
+     * 阈值 必填
+     * Float
+     **/
+    private Float thresholding;
+
+    /** 剩余电流、供电电缆温度、电流偏大专用
+     * 线路编号 必填
+     * Int 10
+     **/
+    private Integer lineNo;
 }

+ 29 - 0
service-cdi/service-cdi-biz/src/main/resources/mapper/cdi/BaseAlarmMapper.xml

@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.usky.cdi.mapper.BaseAlarmMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.usky.cdi.domain.BaseAlarm">
+        <id column="id" property="id" />
+        <result column="device_id" property="deviceId" />
+        <result column="alarm_time" property="alarmTime" />
+        <result column="alarm_type" property="alarmType" />
+        <result column="alarm_object" property="alarmObject" />
+        <result column="alarm_data" property="alarmData" />
+        <result column="alarm_attribute" property="alarmAttribute" />
+        <result column="alarm_content" property="alarmContent" />
+        <result column="alarm_grade" property="alarmGrade" />
+        <result column="alarm_address" property="alarmAddress" />
+        <result column="handle_by" property="handleBy" />
+        <result column="handle_time" property="handleTime" />
+        <result column="handle_content" property="handleContent" />
+        <result column="handle_phone" property="handlePhone" />
+        <result column="handle_status" property="handleStatus" />
+        <result column="alarm_false" property="alarmFalse" />
+        <result column="site_photo" property="sitePhoto" />
+        <result column="product_code" property="productCode" />
+        <result column="dept_id" property="deptId" />
+        <result column="tenant_id" property="tenantId" />
+    </resultMap>
+
+</mapper>

+ 148 - 25
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/PatrolInspectionRecordServiceImpl.java

@@ -426,7 +426,7 @@ public class PatrolInspectionRecordServiceImpl extends AbstractCrudService<Patro
     }
 
     /**
-     * 导出巡检记录
+     * 导出巡检记录(优化版:使用批量查询避免N+1问题)
      * @param areaName
      * @param siteName
      * @param name
@@ -466,34 +466,157 @@ public class PatrolInspectionRecordServiceImpl extends AbstractCrudService<Patro
         }
         queryWrapper.orderByDesc(PatrolInspectionRecord::getId);
         List<PatrolInspectionRecord> patrolInspectionRecordList = this.list(queryWrapper);
+        
+        if (patrolInspectionRecordList.isEmpty()) {
+            return new ArrayList<>();
+        }
+        
+        // 提取所有记录ID
+        List<Integer> recordIds = new ArrayList<>();
+        for (PatrolInspectionRecord record : patrolInspectionRecordList) {
+            recordIds.add(record.getId());
+        }
+        
+        // 批量查询所有相关数据,避免N+1查询问题
+        // 1. 批量查询所有记录选项
+        LambdaQueryWrapper<PatrolInspectionRecordOption> recordOptionWrapper = Wrappers.lambdaQuery();
+        recordOptionWrapper.in(PatrolInspectionRecordOption::getRecordId, recordIds)
+                .orderByAsc(PatrolInspectionRecordOption::getRecordId)
+                .orderByAsc(PatrolInspectionRecordOption::getId);
+        List<PatrolInspectionRecordOption> allRecordOptions = patrolInspectionRecordOptionMapper.selectList(recordOptionWrapper);
+        
+        // 2. 批量查询所有图片
+        LambdaQueryWrapper<PatrolInspectionRecordPicture> pictureWrapper = Wrappers.lambdaQuery();
+        pictureWrapper.select(PatrolInspectionRecordPicture::getPictureUrl, PatrolInspectionRecordPicture::getRecordId)
+                .in(PatrolInspectionRecordPicture::getRecordId, recordIds)
+                .orderByAsc(PatrolInspectionRecordPicture::getRecordId);
+        List<PatrolInspectionRecordPicture> allPictures = orphansInspectionRecordPictureMapper.selectList(pictureWrapper);
+        
+        // 构建缓存Map:recordId -> List<PatrolInspectionRecordOption>
+        Map<Integer, List<PatrolInspectionRecordOption>> recordOptionMap = new HashMap<>();
+        for (PatrolInspectionRecordOption option : allRecordOptions) {
+            recordOptionMap.computeIfAbsent(option.getRecordId(), k -> new ArrayList<>()).add(option);
+        }
+        
+        // 构建缓存Map:recordId -> List<PatrolInspectionRecordPicture>
+        Map<Integer, List<PatrolInspectionRecordPicture>> pictureMap = new HashMap<>();
+        for (PatrolInspectionRecordPicture picture : allPictures) {
+            pictureMap.computeIfAbsent(picture.getRecordId(), k -> new ArrayList<>()).add(picture);
+        }
+        
+        // 收集所有需要查询的contentId和contentOptionId
+        List<Integer> allContentIds = new ArrayList<>();
+        List<Integer> allContentOptionIds = new ArrayList<>();
+        for (PatrolInspectionRecordOption option : allRecordOptions) {
+            if (option.getContentId() != null) {
+                allContentIds.add(option.getContentId());
+            }
+            if (option.getContentOptionId() != null && !" ".equals(option.getContentOptionId())) {
+                allContentOptionIds.add(option.getContentOptionId());
+            }
+        }
+        
+        // 3. 批量查询所有内容
+        Map<Integer, PatrolInspectionContent> contentMap = new HashMap<>();
+        if (!allContentIds.isEmpty()) {
+            LambdaQueryWrapper<PatrolInspectionContent> contentWrapper = Wrappers.lambdaQuery();
+            contentWrapper.in(PatrolInspectionContent::getId, allContentIds)
+                    .orderByAsc(PatrolInspectionContent::getId);
+            List<PatrolInspectionContent> allContents = patrolInspectionContentMapper.selectList(contentWrapper);
+            for (PatrolInspectionContent content : allContents) {
+                contentMap.put(content.getId(), content);
+            }
+        }
+        
+        // 4. 批量查询所有内容选项
+        Map<Integer, PatrolInspectionContentOption> contentOptionMap = new HashMap<>();
+        if (!allContentOptionIds.isEmpty()) {
+            LambdaQueryWrapper<PatrolInspectionContentOption> contentOptionWrapper = Wrappers.lambdaQuery();
+            contentOptionWrapper.in(PatrolInspectionContentOption::getId, allContentOptionIds)
+                    .orderByAsc(PatrolInspectionContentOption::getId);
+            List<PatrolInspectionContentOption> allContentOptions = patrolInspectionContentOptionMapper.selectList(contentOptionWrapper);
+            for (PatrolInspectionContentOption option : allContentOptions) {
+                contentOptionMap.put(option.getId(), option);
+            }
+        }
+        
+        // 构建结果列表
         List<PatrolInspectionRecordExportVo> list = new ArrayList<>();
         for (int i = 0; i < patrolInspectionRecordList.size(); i++) {
+            PatrolInspectionRecord record = patrolInspectionRecordList.get(i);
+            Integer recordId = record.getId();
+            
             PatrolInspectionRecordExportVo patrolInspectionRecordExportVo = new PatrolInspectionRecordExportVo();
             patrolInspectionRecordExportVo.setXh(i + 1);
-            patrolInspectionRecordExportVo.setId(patrolInspectionRecordList.get(i).getId());
-            patrolInspectionRecordExportVo.setSiteNubmber(patrolInspectionRecordList.get(i).getSiteNubmber());
-            patrolInspectionRecordExportVo.setSiteType(patrolInspectionRecordList.get(i).getSiteType());
-            patrolInspectionRecordExportVo.setAreaName(patrolInspectionRecordList.get(i).getAreaName());
-            patrolInspectionRecordExportVo.setSiteName(patrolInspectionRecordList.get(i).getSiteName());
-            patrolInspectionRecordExportVo.setName(patrolInspectionRecordList.get(i).getName());
-            patrolInspectionRecordExportVo.setPhone(patrolInspectionRecordList.get(i).getPhone());
-            patrolInspectionRecordExportVo.setPlanType(patrolInspectionRecordList.get(i).getPlanType());
-            patrolInspectionRecordExportVo.setLongitude(patrolInspectionRecordList.get(i).getLongitude());
-            patrolInspectionRecordExportVo.setLatitude(patrolInspectionRecordList.get(i).getLatitude());
-            patrolInspectionRecordExportVo.setStartDate(patrolInspectionRecordList.get(i).getStartDate().format(df));
-            patrolInspectionRecordExportVo.setEndDate(patrolInspectionRecordList.get(i).getEndDate().format(df));
-            patrolInspectionRecordExportVo.setCreateTime(patrolInspectionRecordList.get(i).getCreateTime().format(df));
-            patrolInspectionRecordExportVo.setRemarks(patrolInspectionRecordList.get(i).getRemarks());
-
-            patrolInspectionRecordExportVo.setContentTitle(getContentTitle(patrolInspectionRecordList.get(i).getId()));
-            patrolInspectionRecordExportVo.setSubmissionMethod(getSubmissionMethod(patrolInspectionRecordList.get(i).getId()));
-            patrolInspectionRecordExportVo.setOptionName(getOptionName(patrolInspectionRecordList.get(i).getId()));
-            patrolInspectionRecordExportVo.setContentRemarks(getRemarks(patrolInspectionRecordList.get(i).getId()));
-            LambdaQueryWrapper<PatrolInspectionRecordPicture> queryWrapperImage = Wrappers.lambdaQuery();//导出图片
-            queryWrapperImage.select(PatrolInspectionRecordPicture::getPictureUrl)
-                    .eq(PatrolInspectionRecordPicture::getRecordId, patrolInspectionRecordList.get(i).getId());
-            List<PatrolInspectionRecordPicture> pictures = orphansInspectionRecordPictureMapper.selectList(queryWrapperImage);
-            if (pictures.size() > 0) {
+            patrolInspectionRecordExportVo.setId(record.getId());
+            patrolInspectionRecordExportVo.setSiteNubmber(record.getSiteNubmber());
+            patrolInspectionRecordExportVo.setSiteType(record.getSiteType());
+            patrolInspectionRecordExportVo.setAreaName(record.getAreaName());
+            patrolInspectionRecordExportVo.setSiteName(record.getSiteName());
+            patrolInspectionRecordExportVo.setName(record.getName());
+            patrolInspectionRecordExportVo.setPhone(record.getPhone());
+            patrolInspectionRecordExportVo.setPlanType(record.getPlanType());
+            patrolInspectionRecordExportVo.setLongitude(record.getLongitude());
+            patrolInspectionRecordExportVo.setLatitude(record.getLatitude());
+            patrolInspectionRecordExportVo.setStartDate(record.getStartDate().format(df));
+            patrolInspectionRecordExportVo.setEndDate(record.getEndDate().format(df));
+            patrolInspectionRecordExportVo.setCreateTime(record.getCreateTime().format(df));
+            patrolInspectionRecordExportVo.setRemarks(record.getRemarks());
+
+            // 从缓存中获取数据,而不是单独查询
+            List<PatrolInspectionRecordOption> recordOptions = recordOptionMap.getOrDefault(recordId, new ArrayList<>());
+            
+            // 获取内容标题和提交方法
+            List<String> contentTitleList = new ArrayList<>();
+            List<String> submissionMethodList = new ArrayList<>();
+            List<String> optionNameList = new ArrayList<>();
+            List<String> remarksList = new ArrayList<>();
+            
+            for (PatrolInspectionRecordOption option : recordOptions) {
+                // 获取备注
+                if (option.getRemarks() != null) {
+                    remarksList.add(option.getRemarks());
+                } else {
+                    remarksList.add("");
+                }
+                
+                // 获取内容标题和提交方法
+                if (option.getContentId() != null) {
+                    PatrolInspectionContent content = contentMap.get(option.getContentId());
+                    if (content != null) {
+                        contentTitleList.add(content.getContentTitle());
+                        switch (content.getSubmissionMethod()) {
+                            case 1:
+                                submissionMethodList.add("多选");
+                                break;
+                            case 2:
+                                submissionMethodList.add("单选");
+                                break;
+                            default:
+                                submissionMethodList.add("输入框");
+                        }
+                    }
+                }
+                
+                // 获取选项名称
+                if (option.getContentOptionId() == null || " ".equals(option.getContentOptionId())) {
+                    optionNameList.add(option.getContent());
+                } else {
+                    PatrolInspectionContentOption contentOption = contentOptionMap.get(option.getContentOptionId());
+                    if (contentOption != null) {
+                        optionNameList.add(contentOption.getOptionName());
+                    }
+                }
+            }
+            
+            patrolInspectionRecordExportVo.setContentTitle(contentTitleList);
+            patrolInspectionRecordExportVo.setSubmissionMethod(submissionMethodList);
+            patrolInspectionRecordExportVo.setOptionName(optionNameList);
+            patrolInspectionRecordExportVo.setContentRemarks(remarksList);
+            
+            // 从缓存中获取图片
+            List<PatrolInspectionRecordPicture> pictures = pictureMap.getOrDefault(recordId, new ArrayList<>());
+            if (!pictures.isEmpty()) {
                 for (int j = 0; j < pictures.size() && j < 5; j++) {
                     String pictureUrl = pictures.get(j).getPictureUrl();
                     switch (j) {

+ 20 - 2
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/util/ExcelUtilImage.java

@@ -85,6 +85,8 @@ public class ExcelUtilImage<T> {
     private Map<Integer, Double> statistics = new HashMap();
     private static final DecimalFormat DOUBLE_FORMAT = new DecimalFormat("######0.00");
     public Class<T> clazz;
+    // 图片缓存:避免重复下载相同图片
+    private Map<String, byte[]> imageCache = new HashMap<>();
 
     public ExcelUtilImage(Class<T> clazz) {
         this.clazz = clazz;
@@ -274,6 +276,8 @@ public class ExcelUtilImage<T> {
             log.error("导出Excel异常{}", var6.getMessage());
         } finally {
             IOUtils.closeQuietly(this.wb);
+            // 清理图片缓存
+            this.imageCache.clear();
         }
 
     }
@@ -431,8 +435,22 @@ public class ExcelUtilImage<T> {
             ClientAnchor anchor = new XSSFClientAnchor(0, 0, 0, 0, (short) cell.getColumnIndex(), cell.getRow().getRowNum(), (short) (cell.getColumnIndex() + 1), cell.getRow().getRowNum() + 1);
             String imagePath = Convert.toStr(value);
             if (StringUtils.isNotEmpty(imagePath)) {
-                byte[] data = ImageUtils.getImage(imagePath);
-                getDrawingPatriarch(cell.getSheet()).createPicture(anchor, cell.getSheet().getWorkbook().addPicture(data, this.getImageType(data)));
+                // 使用缓存避免重复下载相同图片
+                byte[] data = imageCache.get(imagePath);
+                if (data == null) {
+                    try {
+                        data = ImageUtils.getImage(imagePath);
+                        if (data != null) {
+                            imageCache.put(imagePath, data);
+                        }
+                    } catch (Exception e) {
+                        log.warn("获取图片失败: {}", imagePath, e);
+                        return;
+                    }
+                }
+                if (data != null) {
+                    getDrawingPatriarch(cell.getSheet()).createPicture(anchor, cell.getSheet().getWorkbook().addPicture(data, this.getImageType(data)));
+                }
             }
         }
 

+ 9 - 0
service-job/src/main/java/com/ruoyi/job/task/RyTask.java

@@ -1,5 +1,6 @@
 package com.ruoyi.job.task;
 
+import com.usky.cdi.AlarmDataSyncTaskService;
 import com.usky.cdi.RemotecdiTaskService;
 import com.usky.common.core.utils.StringUtils;
 import com.usky.meeting.RemoteMeetingService;
@@ -31,6 +32,9 @@ public class RyTask {
     @Autowired
     private RemotecdiTaskService remoteCdiTaskService;
 
+    @Autowired
+    private AlarmDataSyncTaskService alarmDataSyncTaskService;
+
     public void ryMultipleParams(String s, Boolean b, Long l, Double d, Integer i) {
         System.out.println(StringUtils.format("执行多参方法: 字符串类型{},布尔类型{},长整型{},浮点型{},整形{}", s, b, l, d, i));
     }
@@ -95,4 +99,9 @@ public class RyTask {
         remoteCdiTaskService.allData(engineeringId, username, password);
     }
 
+    public void synchronizeAlarmData(Integer tenantId, Long engineeringId, String username, String password, String status) {
+        System.out.println("人防告警数据推送定时任务开始执行......");
+        alarmDataSyncTaskService.synchronizeAlarmData(tenantId, engineeringId, username, password, status);
+    }
+
 }