Quellcode durchsuchen

人防代码推送优化

fuyuchuan vor 3 Tagen
Ursprung
Commit
bde2dc65f5

+ 14 - 12
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/IotDataController.java

@@ -2,7 +2,9 @@ package com.usky.cdi.controller;
 
 import com.alibaba.fastjson.JSONObject;
 import com.usky.cdi.service.impl.IotDataTransferService;
+import com.usky.cdi.service.vo.IotDataTransferVO;
 import com.usky.cdi.service.vo.base.*;
+import com.usky.common.core.bean.ApiResult;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -30,36 +32,36 @@ public class IotDataController {
      * 上报水浸状态
      */
     @PostMapping("/flooded")
-    public String sendWaterLeak(@RequestBody JSONObject jsonObject) {
-        boolean success = iotDataTransferService.sendWaterLeak(jsonObject);
-        return success ? "上报成功" : "上报失败";
+    public ApiResult<Void> sendWaterLeak(@RequestBody IotDataTransferVO jsonObject) {
+        iotDataTransferService.sendWaterLeak(jsonObject);
+        return ApiResult.success();
     }
 
     /**
      * 上报温度、湿度、氧气、一氧化碳、二氧化碳
      */
     @PostMapping("/envData")
-    public String sendEnvData(@RequestBody JSONObject jsonObject) {
-        boolean success = iotDataTransferService.sendEnvData(jsonObject);
-        return success ? "上报成功" : "上报失败";
+    public ApiResult<Void> sendEnvData(@RequestBody IotDataTransferVO jsonObject) {
+        iotDataTransferService.sendEnvData(jsonObject);
+        return ApiResult.success();
     }
 
     /**
      * 上报人员闯入
      */
     @PostMapping("/personPresence")
-    public String sendPerson(@RequestBody JSONObject jsonObject) {
-        boolean success = iotDataTransferService.sendPersonPresence(jsonObject);
-        return success ? "上报成功" : "上报失败";
+    public ApiResult<Void> sendPerson(@RequestBody IotDataTransferVO jsonObject) {
+        iotDataTransferService.sendPersonPresence(jsonObject);
+        return ApiResult.success();
     }
 
     /**
      * 上报用电负荷
      */
     @PostMapping("/electricityLoad")
-    public String sendElectricityLoad(@RequestBody JSONObject jsonObject) {
-        boolean success = iotDataTransferService.sendElectricityLoad(jsonObject);
-        return success ? "上报成功" : "上报失败";
+    public ApiResult<Void> sendElectricityLoad(@RequestBody IotDataTransferVO jsonObject) {
+        iotDataTransferService.sendElectricityLoad(jsonObject);
+        return ApiResult.success();
     }
 
 }

+ 4 - 4
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/MybatisGeneratorUtils.java

@@ -43,10 +43,10 @@ public class MybatisGeneratorUtils {
         //2、数据源配置
         //修改数据源
         DataSourceConfig dsc = new DataSourceConfig();
-        dsc.setUrl("jdbc:mysql://47.98.201.187:3306/jdxf?useUnicode=true&serverTimezone=GMT&useSSL=false&characterEncoding=utf8");
+        dsc.setUrl("jdbc:mysql://47.111.81.118:13307/usky-cloud?useUnicode=true&serverTimezone=GMT&useSSL=false&characterEncoding=utf8");
         dsc.setDriverName("com.mysql.jdbc.Driver");
-        dsc.setUsername("fuYuChuan");
-        dsc.setPassword("fuYuChuan@123");
+        dsc.setUsername("usky");
+        dsc.setPassword("Yt#75Usky");
         mpg.setDataSource(dsc);
 
         // 3、包配置
@@ -71,7 +71,7 @@ public class MybatisGeneratorUtils {
         // strategy.setTablePrefix("t_"); // 表名前缀
         strategy.setEntityLombokModel(true); //使用lombok
         //修改自己想要生成的表
-        strategy.setInclude(new String[]{"sp_rtu2017", "sp_sj2017"});  // 逆向工程使用的表   如果要生成多个,这里可以传入String[]
+        strategy.setInclude("dmp_product");  // 逆向工程使用的表   如果要生成多个,这里可以传入String[]
         mpg.setStrategy(strategy);
 
         // 关闭默认 xml 生成,调整生成 至 根目录

+ 3 - 1
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/RemotecdiTaskApi.java

@@ -20,6 +20,8 @@ public class RemotecdiTaskApi implements RemotecdiTaskService {
 
     @Override
     public void synchronizeDeviceData() {
-        iotDataTransferService.synchronizeDeviceData();
+        Integer tenantId = 1;
+        Long engineeringId = 1L;
+        iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
     }
 }

+ 22 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/web/DmpProductController.java

@@ -0,0 +1,22 @@
+package com.usky.cdi.controller.web;
+
+
+import org.springframework.web.bind.annotation.RequestMapping;
+
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * <p>
+ * 产品信息表 前端控制器
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-05
+ */
+@RestController
+@RequestMapping("/dmpProduct")
+public class DmpProductController {
+
+}
+

+ 141 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/domain/DmpProduct.java

@@ -0,0 +1,141 @@
+package com.usky.cdi.domain;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import java.time.LocalDateTime;
+import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * <p>
+ * 产品信息表
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-05
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+public class DmpProduct implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键id
+     */
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * 产品名称
+     */
+    private String productName;
+
+    /**
+     * 接入方式
+     */
+    private Integer accessMode;
+
+    /**
+     * 网络类型
+     */
+    private Integer networkType;
+
+    /**
+     * 设备类型
+     */
+    private Integer deviceType;
+
+    /**
+     * 通信协议
+     */
+    private Integer comProtocol;
+
+    /**
+     * 认证方式
+     */
+    private String authMode;
+
+    /**
+     * 设备型号
+     */
+    private String deviceModel;
+
+    /**
+     * 产品描述
+     */
+    private String productDescribe;
+
+    /**
+     * 厂家名称
+     */
+    private String factoryName;
+
+    /**
+     * 厂家联系人
+     */
+    private String factoryPerson;
+
+    /**
+     * 厂家联系电话
+     */
+    private String factoryPhone;
+
+    /**
+     * 资质证书1
+     */
+    private String certificateUrl1;
+
+    /**
+     * 资质证书2
+     */
+    private String certificateUrl2;
+
+    /**
+     * 资质证书3
+     */
+    private String certificateUrl3;
+
+    /**
+     * 协议文档
+     */
+    private String agreementUrl;
+
+    /**
+     * 删除标识
+     */
+    private Integer deleteFlag;
+
+    /**
+     * 创建人
+     */
+    private String createdBy;
+
+    /**
+     * 创建时间
+     */
+    private LocalDateTime createdTime;
+
+    /**
+     * 更新人
+     */
+    private String updatedBy;
+
+    /**
+     * 更新时间
+     */
+    private LocalDateTime updatedTime;
+
+    /**
+     * 租户号
+     */
+    private Integer tenantId;
+
+    /**
+     * 产品编码
+     */
+    private String productCode;
+
+
+}

+ 16 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/mapper/DmpProductMapper.java

@@ -0,0 +1,16 @@
+package com.usky.cdi.mapper;
+
+import com.usky.cdi.domain.DmpProduct;
+import com.usky.common.mybatis.core.CrudMapper;
+
+/**
+ * <p>
+ * 产品信息表 Mapper 接口
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-05
+ */
+public interface DmpProductMapper extends CrudMapper<DmpProduct> {
+
+}

+ 16 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/DmpProductService.java

@@ -0,0 +1,16 @@
+package com.usky.cdi.service;
+
+import com.usky.cdi.domain.DmpProduct;
+import com.usky.common.mybatis.core.CrudService;
+
+/**
+ * <p>
+ * 产品信息表 服务类
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-05
+ */
+public interface DmpProductService extends CrudService<DmpProduct> {
+
+}

+ 20 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/DmpProductServiceImpl.java

@@ -0,0 +1,20 @@
+package com.usky.cdi.service.impl;
+
+import com.usky.cdi.domain.DmpProduct;
+import com.usky.cdi.mapper.DmpProductMapper;
+import com.usky.cdi.service.DmpProductService;
+import com.usky.common.mybatis.core.AbstractCrudService;
+import org.springframework.stereotype.Service;
+
+/**
+ * <p>
+ * 产品信息表 服务实现类
+ * </p>
+ *
+ * @author fu
+ * @since 2025-12-05
+ */
+@Service
+public class DmpProductServiceImpl extends AbstractCrudService<DmpProductMapper, DmpProduct> implements DmpProductService {
+
+}

+ 395 - 163
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -4,11 +4,14 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.usky.cdi.domain.DmpDevice;
+import com.usky.cdi.domain.DmpProduct;
 import com.usky.cdi.mapper.DmpDeviceMapper;
+import com.usky.cdi.mapper.DmpProductMapper;
 import com.usky.cdi.service.config.mqtt.MqttGateway;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
 import com.usky.cdi.service.util.DeviceDataQuery;
 import com.usky.cdi.service.util.SnowflakeIdGenerator;
+import com.usky.cdi.service.vo.IotDataTransferVO;
 import com.usky.cdi.service.vo.base.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -16,6 +19,8 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
+
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -23,7 +28,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -39,24 +43,30 @@ public class IotDataTransferService {
     @Autowired(required = false)
     private MqttGateway mqttGateway;
 
-    private final SnowflakeIdGenerator idGenerator;
+    private SnowflakeIdGenerator idGenerator;
 
     @Autowired
     private DeviceDataQuery deviceDataQuery;
 
-    @Value("${config.engineeringID}")
-    private Long engineeringID;
-
     @Autowired
     private DmpDeviceMapper dmpDeviceMapper;
 
     @Value("${device.data.simulation}")
     private boolean simulation;
 
+    @Autowired
+    private DmpProductMapper dmpProductMapper;
+
+    // 从配置文件读取Snowflake参数,默认值为2
+    @Value("${snowflake.worker-id:2}")
+    private long workerId;
 
-    public IotDataTransferService() {
-        // 使用默认的workerId和datacenterId,实际项目中可以从配置读取
-        this.idGenerator = new SnowflakeIdGenerator(2L, 2L);
+    @Value("${snowflake.data-center-id:2}")
+    private long dataCenterId;
+
+    @PostConstruct
+    public void init() {
+        this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
     }
 
     /**
@@ -77,48 +87,72 @@ public class IotDataTransferService {
      * 发送水浸状态(deviceType:702)
      * Topic: iotInfo/flooded
      *
-     * @return 是否发送成功
+     * @return 推送结果,包含成功数和失败数
      */
-    public boolean sendWaterLeak(JSONObject jsonObject) {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
-            return false;
+    public Map<String, Integer> sendWaterLeak(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
         }
+
         try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送水浸状态数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
 
             if (deviceData.isEmpty()) {
-                log.warn("没有获取到水浸数据!deviceUuids:{}", jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
-                return false;
+                log.warn("没有获取到水浸数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
             }
 
+            Long engineeringId = transferVO.getEngineeringId();
             for (JSONObject deviceDataItem : deviceData) {
-                Long dataTime = deviceDataItem.getLong("time");
-                if (dataTime == null) {
-                    log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
+                    continue;
+                }
+
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+                Integer value = deviceDataItem.getInteger("leach_status");
+                if (value == null) {
+                    log.warn("设备{}的水浸状态数据为空", deviceId);
+                    result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
-                LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
 
                 WaterLeakVO vo = new WaterLeakVO();
                 vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceDataItem.getIntValue("device_id"));
-                vo.setEngineeringID(engineeringID);
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
                 vo.setPublishTime(getCurrentTime());
-                vo.setSensorValue(deviceDataItem.getInteger("leach_status"));
+                vo.setSensorValue(value);
                 vo.setDataEndTime(dataEndTime);
 
-                String json = JSON.toJSONString(vo);
-                String topic = EnvMonitorMqttTopic.WATER_LEAK.getTopic();
-
-                log.info("发送水浸状态信息,Topic: {}, Data: {}", topic, json);
-                mqttGateway.sendToMqtt(topic, json);
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
             }
 
-            return true;
+            log.info("水浸状态数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
         } catch (Exception e) {
-            log.error("发送水浸状态信息失败", e);
-            return false;
+            log.error("水浸状态数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
         }
     }
 
@@ -126,86 +160,121 @@ public class IotDataTransferService {
      * 发送温湿度及气体浓度数据(设备类型:701,707-711)
      * 包含: wd(温度), sd(湿度), o2(氧气), co(一氧化碳), co2(二氧化碳)
      *
-     * @return 是否发送成功
+     * @return 推送结果,包含成功数和失败数
      */
-    public boolean sendEnvData(JSONObject jsonObject) {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
-            return false;
+    public Map<String, Integer> sendEnvData(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
         }
+
         try {
-            Integer deviceType = jsonObject.getInteger("deviceType");
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(deviceType, jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
+            Integer deviceType = transferVO.getDeviceType();
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer totalDevices = transferVO.getDevices().size();
 
+            log.info("开始推送温湿度及气体浓度数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
 
             if (deviceData.isEmpty()) {
-                log.warn("没有获取到空气质量数据!deviceType:{}, deviceUuids:{}", jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
-                return false;
+                log.warn("没有获取到空气质量数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
             }
 
+            Long engineeringId = transferVO.getEngineeringId();
             for (JSONObject deviceDataItem : deviceData) {
-                Long dataTime = deviceDataItem.getLong("time");
-                if (dataTime == null) {
-                    log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
-                LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
+
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
+                boolean deviceSuccess = true;
+
+                // 根据设备类型发送对应的数据
+                try {
+                    switch (deviceType) {
+                        case 707:
+                            sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            break;
+                        case 708:
+                            sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            break;
+                        case 709:
+                            sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            break;
+                        case 710:
+                            sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            break;
+                        case 711:
+                            sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId);
+                            break;
+                    }
+                } catch (Exception e) {
+                    log.warn("设备{}的环境数据推送失败:{}", deviceId, e.getMessage());
+                    deviceSuccess = false;
+                }
 
-                // 提取公共方法,减少代码冗余
-                switch (deviceType) {
-                    case 701:
-                        sendTempData(deviceId, dataEndTime, deviceDataItem);
-                        sendHumidityData(deviceId, dataEndTime, deviceDataItem);
-                        sendOxygenData(deviceId, dataEndTime, deviceDataItem);
-                        sendCoData(deviceId, dataEndTime, deviceDataItem);
-                        sendCo2Data(deviceId, dataEndTime, deviceDataItem);
-                        break;
-                    case 707:
-                        sendTempData(deviceId, dataEndTime, deviceDataItem);
-                        break;
-                    case 708:
-                        sendHumidityData(deviceId, dataEndTime, deviceDataItem);
-                        break;
-                    case 709:
-                        sendOxygenData(deviceId, dataEndTime, deviceDataItem);
-                        break;
-                    case 710:
-                        sendCo2Data(deviceId, dataEndTime, deviceDataItem);
-                        break;
-                    case 711:
-                        sendCoData(deviceId, dataEndTime, deviceDataItem);
-                        break;
+                // 统计设备推送结果
+                if (deviceSuccess) {
+                    result.put("successCount", result.get("successCount") + 1);
+                } else {
+                    result.put("failureCount", result.get("failureCount") + 1);
                 }
             }
-            return true;
+
+            log.info("温湿度及气体浓度数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
         } catch (Exception e) {
-            log.error("发送环境数据失败", e);
-            return false;
+            log.error("温湿度及气体浓度数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
         }
     }
 
     /**
-     * 发送人员闯入情况
+     * 发送人员闯入情况(703)
      *
-     * @return 是否发送成功
+     * @return 推送结果,包含成功数和失败数
      **/
-    public boolean sendPersonPresence(JSONObject jsonObject) {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
-            return false;
+    public Map<String, Integer> sendPersonPresence(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
         }
 
         try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
-            for (JSONObject deviceDataItem : deviceData) {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送人员闯入情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
 
-                Long dataTime = deviceDataItem.getLong("time");
-                if (dataTime == null) {
-                    log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到人员闯入情况数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
+
+            Long engineeringId = transferVO.getEngineeringId();
+            for (JSONObject deviceDataItem : deviceData) {
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
-                LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
+
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
 
                 PersonPresenceVO vo = new PersonPresenceVO();
@@ -213,46 +282,71 @@ public class IotDataTransferService {
                 vo.setSensorID(deviceId);
                 vo.setDataEndTime(dataEndTime);
                 vo.setPublishTime(getCurrentTime());
-                vo.setEngineeringID(engineeringID);
-                // vo.setSensorValue(deviceDataItem.getIntValue("sensorValue"));
+                vo.setEngineeringID(engineeringId);
+                // 传感器值固定为0(可能是默认值或占位符)
                 vo.setSensorValue(0);
-                String json = JSON.toJSONString(vo);
-                String topic = EnvMonitorMqttTopic.PERSON_PRESENCE.getTopic();
-                log.info("发送人员闯入情况,Topic: {}, Data: {}", topic, json);
-                mqttGateway.sendToMqtt(topic, json);
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
             }
-            return true;
+
+            log.info("人员闯入情况数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
         } catch (Exception e) {
-            log.error("发送人员闯入情况失败", e);
-            return false;
+            log.error("人员闯入情况数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
         }
     }
 
     /**
      * 发送人防用电负荷情况(704)
      *
-     * @return 是否发送成功
+     * @return 推送结果,包含成功数和失败数
      **/
-    public boolean sendElectricityLoad(JSONObject jsonObject) {
-        if (mqttGateway == null) {
-            log.warn("MQTT Gateway未初始化,无法发送消息");
-            return false;
+    public Map<String, Integer> sendElectricityLoad(IotDataTransferVO transferVO) {
+        Map<String, Integer> result = new HashMap<>();
+        result.put("successCount", 0);
+        result.put("failureCount", 0);
+
+        if (!validateMqttGateway()) {
+            return result;
         }
+
         try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            Integer deviceType = transferVO.getDeviceType();
+            Integer totalDevices = transferVO.getDevices().size();
+
+            log.info("开始推送人防用电负荷情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
+                    deviceType, totalDevices, deviceData.size());
+
+            if (deviceData.isEmpty()) {
+                log.warn("没有获取到人防用电负荷情况数据!设备类型:{}", deviceType);
+                result.put("failureCount", totalDevices);
+                return result;
+            }
 
+            Long engineeringId = transferVO.getEngineeringId();
             for (JSONObject deviceDataItem : deviceData) {
-                Long dataTime = deviceDataItem.getLong("time");
-                if (dataTime == null) {
-                    log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
+                LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
+                if (dataEndTime == null) {
+                    result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
-                LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
 
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
                 ElectricityLoadVO vo = new ElectricityLoadVO();
                 vo.setDataPacketID(generateDataPacketID());
-                vo.setSensorID(deviceDataItem.getIntValue("device_id"));
-                vo.setEngineeringID(engineeringID);
+                vo.setSensorID(deviceId);
+                vo.setEngineeringID(engineeringId);
                 vo.setPublishTime(getCurrentTime());
                 vo.setDataEndTime(dataEndTime);
                 vo.setAVoltage(deviceDataItem.getFloat("aVoltage"));
@@ -265,26 +359,34 @@ public class IotDataTransferService {
                 vo.setLine2TEMP(deviceDataItem.getFloat("Line2TEMP"));
                 vo.setLine3TEMP(deviceDataItem.getFloat("Line3TEMP"));
                 vo.setLeakageCurrent(deviceDataItem.getFloat("leakageCurrent"));
-                if (simulation) {
-                    vo.setTotalPower(deviceDataItem.getFloat("totalPower"));
-                } else {
-                    vo.setTotalPower(deviceDataItem.getFloat("active_power"));
-                }
 
-                String json = JSON.toJSONString(vo);
-                String topic = EnvMonitorMqttTopic.ELECTRICITY_LOAD.getTopic();
-                log.info("发送人防用电负荷情况,Topic: {}, Data: {}", topic, json);
-                mqttGateway.sendToMqtt(topic, json);
+                // 根据模拟模式选择不同的功率字段
+                vo.setTotalPower(simulation ?
+                        deviceDataItem.getFloat("totalPower") :
+                        deviceDataItem.getFloat("active_power"));
+
+                try {
+                    sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况");
+                    result.put("successCount", result.get("successCount") + 1);
+                } catch (Exception e) {
+                    log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
+                    result.put("failureCount", result.get("failureCount") + 1);
+                }
             }
-            return true;
+
+            log.info("人防用电负荷情况数据推送完成,设备类型:{},成功:{},失败:{}",
+                    deviceType, result.get("successCount"), result.get("failureCount"));
+
+            return result;
         } catch (Exception e) {
-            log.error("发送人防用电负荷情况失败", e);
-            return false;
+            log.error("人防用电负荷情况数据推送发生异常", e);
+            result.put("failureCount", transferVO.getDevices().size());
+            return result;
         }
     }
 
     // 提取公共发送方法,减少代码冗余
-    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem) {
+    private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("wd");
         if (value == null) {
             log.warn("设备{}的温度数据为空", deviceId);
@@ -297,13 +399,10 @@ public class IotDataTransferService {
         tempVO.setPublishTime(getCurrentTime());
         tempVO.setSensorValue(value);
         tempVO.setDataEndTime(dataEndTime);
-        String json = JSON.toJSONString(tempVO);
-        String topic = EnvMonitorMqttTopic.TEMP.getTopic();
-        mqttGateway.sendToMqtt(topic, json);
-        log.info("发送温度信息,Topic: {}, Data: {}", topic, json);
+        sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
     }
 
-    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem) {
+    private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("sd");
         if (value == null) {
             log.warn("设备{}的湿度数据为空", deviceId);
@@ -316,13 +415,10 @@ public class IotDataTransferService {
         humidityVO.setPublishTime(getCurrentTime());
         humidityVO.setSensorValue(value);
         humidityVO.setDataEndTime(dataEndTime);
-        String json = JSON.toJSONString(humidityVO);
-        String topic = EnvMonitorMqttTopic.HUMIDITY.getTopic();
-        mqttGateway.sendToMqtt(topic, json);
-        log.info("发送湿度信息,Topic: {}, Data: {}", topic, json);
+        sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
     }
 
-    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem) {
+    private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("o2");
         if (value == null) {
             log.warn("设备{}的氧气浓度数据为空", deviceId);
@@ -335,13 +431,10 @@ public class IotDataTransferService {
         oxygenVO.setPublishTime(getCurrentTime());
         oxygenVO.setSensorValue(value);
         oxygenVO.setDataEndTime(dataEndTime);
-        String json = JSON.toJSONString(oxygenVO);
-        String topic = EnvMonitorMqttTopic.OXYGEN.getTopic();
-        mqttGateway.sendToMqtt(topic, json);
-        log.info("发送氧气浓度信息,Topic: {}, Data: {}", topic, json);
+        sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
     }
 
-    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem) {
+    private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("co");
         if (value == null) {
             log.warn("设备{}的一氧化碳浓度数据为空", deviceId);
@@ -354,13 +447,10 @@ public class IotDataTransferService {
         coVO.setPublishTime(getCurrentTime());
         coVO.setSensorValue(value);
         coVO.setDataEndTime(dataEndTime);
-        String json = JSON.toJSONString(coVO);
-        String topic = EnvMonitorMqttTopic.CO.getTopic();
-        mqttGateway.sendToMqtt(topic, json);
-        log.info("发送一氧化碳浓度信息,Topic: {}, Data: {}", topic, json);
+        sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
     }
 
-    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem) {
+    private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
         Float value = deviceDataItem.getFloat("co2");
         if (value == null) {
             log.warn("设备{}的二氧化碳浓度数据为空", deviceId);
@@ -373,35 +463,177 @@ public class IotDataTransferService {
         co2VO.setPublishTime(getCurrentTime());
         co2VO.setSensorValue(value);
         co2VO.setDataEndTime(dataEndTime);
-        String json = JSON.toJSONString(co2VO);
-        String topic = EnvMonitorMqttTopic.CO2.getTopic();
-        mqttGateway.sendToMqtt(topic, json);
-        log.info("发送二氧化碳浓度信息,Topic: {}, Data: {}", topic, json);
+        sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
     }
 
-    public void synchronizeDeviceData() {
+    /**
+     * 同步设备数据
+     * @param tenantId 租户ID
+     * @param engineeringId 工程ID
+     */
+    public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
+        // 参数校验
+        if (tenantId == null || engineeringId == null) {
+            log.error("租户ID或工程ID不能为空");
+            return;
+        }
+
+        // 查询租户下的所有产品类型
+        List<String> deviceTypeList = getDeviceTypeListByTenant(tenantId);
+        if (deviceTypeList.isEmpty()) {
+            log.warn("租户{}不存在任何产品", tenantId);
+            return;
+        }
+
+        // 查询设备列表
+        List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList, tenantId);
+        if (deviceList.isEmpty()) {
+            log.warn("租户{}不存在任何设备", tenantId);
+            return;
+        }
+
+        // 按设备类型分组
+        Map<Integer, List<DmpDevice>> deviceTypeMap = deviceList.stream()
+                .collect(Collectors.groupingBy(DmpDevice::getDeviceType));
+
+        // 构建数据传输对象列表
+        List<IotDataTransferVO> transferList = new ArrayList<>();
+        deviceTypeMap.forEach((deviceType, devices) -> {
+            IotDataTransferVO transferVO = new IotDataTransferVO();
+            transferVO.setDeviceType(deviceType);
+            transferVO.setDevices(devices);
+            transferVO.setEngineeringId(engineeringId);
+            transferList.add(transferVO);
+        });
+
+        // 按产品代码分组,构建ProductCode到uuid列表的映射
+        Map<String, List<String>> codeDeviceUuidsMap = deviceList.stream()
+                .collect(Collectors.groupingBy(DmpDevice::getProductCode,
+                        Collectors.mapping(DmpDevice::getDeviceUuid, Collectors.toList())));
+
+        // 任务开始日志
+        Integer totalDevices = deviceList.size();
+        Integer totalProductTypes = codeDeviceUuidsMap.size();
+        log.info("设备数据同步任务开始,租户ID:{},工程ID:{}", tenantId, engineeringId);
+        log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
+        log.info("总共需要推送设备数量:{}个,涉及设备类型数:{}个,设备类型为:{}",
+                totalDevices, deviceTypeMap.size(), deviceTypeMap.keySet());
+
+        // 记录每种设备类型的设备数量
+        deviceTypeMap.forEach((deviceType, devices) -> {
+            log.info("设备类型:{},设备数量:{}", deviceType, devices.size());
+        });
+
+        // 按设备类型处理数据同步
+        int totalSuccessCount = 0;
+        int totalFailureCount = 0;
+
+        for (IotDataTransferVO transferVO : transferList) {
+            Integer deviceType = transferVO.getDeviceType();
+            Map<String, Integer> result = new HashMap<>();
+
+            switch (deviceType) {
+                case 707:
+                case 708:
+                case 709:
+                case 710:
+                case 711:
+                    result = sendEnvData(transferVO);
+                    break;
+                case 702:
+                    result = sendWaterLeak(transferVO);
+                    break;
+                case 703:
+                    result = sendPersonPresence(transferVO);
+                    break;
+                case 704:
+                    result = sendElectricityLoad(transferVO);
+                    break;
+                default:
+                    log.debug("不支持的设备类型:{}", deviceType);
+                    continue;
+            }
+
+            // 累加成功数和失败数
+            totalSuccessCount += result.get("successCount");
+            totalFailureCount += result.get("failureCount");
+        }
+
+        // 任务完成总结
+        log.info("设备数据同步任务完成,租户ID:{},工程ID:{}", tenantId, engineeringId);
+        log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
+        log.info("总共推送设备数量:{}个,成功:{}个,失败:{}个",
+                totalDevices, totalSuccessCount, totalFailureCount);
+    }
+
+    /**
+     * 查询租户下的设备类型列表
+     * @param tenantId 租户ID
+     * @return 设备类型列表
+     */
+    private List<String> getDeviceTypeListByTenant(Integer tenantId) {
+        LambdaQueryWrapper<DmpProduct> productQueryWrapper = new LambdaQueryWrapper<>();
+        productQueryWrapper.select(DmpProduct::getProductCode)
+                .eq(DmpProduct::getTenantId, tenantId)
+                .eq(DmpProduct::getDeleteFlag, 0);
+        List<DmpProduct> productList = dmpProductMapper.selectList(productQueryWrapper);
+        return productList.stream()
+                .map(DmpProduct::getProductCode)
+                .distinct() // 去重
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * 根据设备类型列表查询设备
+     * @param productCodeList 设备类型列表
+     * @return 设备列表
+     */
+    private List<DmpDevice> getDeviceListByType(List<String> productCodeList, Integer tenantId) {
         LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
-        queryWrapper.eq(DmpDevice::getTenantId, 1208)
-                .eq(DmpDevice::getDeleteFlag, 0);
-        List<DmpDevice> deviceList = dmpDeviceMapper.selectList(queryWrapper);
-        List<Integer> deviceTypeList = deviceList.stream().map(DmpDevice::getDeviceType).distinct().collect(Collectors.toList());
-        log.info("设备数据同步开始,涉及设备类型数:" + deviceTypeList.size() + "个,类型为:" + deviceTypeList);
-
-        Map<Integer, Consumer<JSONObject>> deviceTypeHandlerMap = new HashMap<>();
-        deviceTypeHandlerMap.put(701, this::sendEnvData);
-        deviceTypeHandlerMap.put(707, this::sendEnvData);
-        deviceTypeHandlerMap.put(708, this::sendEnvData);
-        deviceTypeHandlerMap.put(709, this::sendEnvData);
-        deviceTypeHandlerMap.put(710, this::sendEnvData);
-        deviceTypeHandlerMap.put(711, this::sendEnvData);
-        deviceTypeHandlerMap.put(702, this::sendWaterLeak);
-        deviceTypeHandlerMap.put(703, this::sendPersonPresence);
-        deviceTypeHandlerMap.put(704, this::sendElectricityLoad);
-        for (Integer deviceType : deviceTypeList) {
-            JSONObject json = new JSONObject();
-            json.put("deviceType", deviceType);
-            deviceTypeHandlerMap.get(deviceType).accept(json);
+        queryWrapper.select(DmpDevice::getDeviceUuid, DmpDevice::getDeviceType, DmpDevice::getDeviceId)
+                .in(DmpDevice::getProductCode, productCodeList)
+                .eq(DmpDevice::getDeleteFlag, 0)
+                .notIn(DmpDevice::getServiceStatus, 3)
+                .orderByAsc(DmpDevice::getProductCode);
+        return dmpDeviceMapper.selectList(queryWrapper);
+    }
+
+    /**
+     * 验证MQTT网关是否初始化
+     * @return 是否初始化
+     */
+    private boolean validateMqttGateway() {
+        if (mqttGateway == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息");
+            return false;
         }
-        log.info("设备数据同步完成,涉及设备类型数:" + deviceTypeList.size() + "个");
+        return true;
+    }
+
+    /**
+     * 解析数据时间
+     * @param deviceDataItem 设备数据
+     * @return 解析后的时间,如果解析失败返回null
+     */
+    private LocalDateTime parseDataTime(JSONObject deviceDataItem) {
+        Long dataTime = deviceDataItem.getLong("time");
+        if (dataTime == null) {
+            log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
+            return null;
+        }
+        return LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
+    }
+
+    /**
+     * 发送MQTT消息
+     * @param topicEnum 主题枚举
+     * @param vo 消息对象
+     * @param messageType 消息类型描述
+     */
+    private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType) {
+        String json = JSON.toJSONString(vo);
+        String topic = topicEnum.getTopic();
+        // 不再记录每条数据的详情,只记录发送操作
+        mqttGateway.sendToMqtt(topic, json);
     }
 }

+ 160 - 135
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java

@@ -3,9 +3,9 @@ package com.usky.cdi.service.util;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.mapper.DmpDeviceMapper;
+import com.usky.cdi.service.vo.IotDataTransferVO;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -45,119 +45,157 @@ public class DeviceDataQuery {
 
     /**
      * 获取指定设备类型的设备数据
+     * @param transferVO 设备数据传输请求参数
+     * @return 设备数据列表
      */
-    public List<JSONObject> getDeviceData(Integer deviceType, List<String> deviceUuid) {
-
-        List<DmpDevice> devices = getDeviceUuids(deviceType, deviceUuid);
-        if (devices.isEmpty()) {
-            log.warn("该租户下没有注册设备!");
+    public List<JSONObject> getDeviceData(IotDataTransferVO transferVO) {
+        // 参数验证
+        if (transferVO == null || transferVO.getDevices() == null || transferVO.getDevices().isEmpty()) {
+            log.warn("获取设备数据失败:参数无效");
             return Collections.emptyList();
         }
 
-        JSONObject requestBody = new JSONObject();
-
-        if (deviceUuid != null && !deviceUuid.isEmpty()) {
-            requestBody.put("deviceUuids", deviceUuid);
-        } else {
-            List<String> deviceUuids = devices.stream().map(DmpDevice::getDeviceUuid).collect(Collectors.toList());
+        try {
+            // 构建请求参数
+            JSONObject requestBody = new JSONObject();
+            List<String> deviceUuids = transferVO.getDevices().stream()
+                    .map(DmpDevice::getDeviceUuid)
+                    .collect(Collectors.toList());
             requestBody.put("deviceUuids", deviceUuids);
-        }
 
-        String response = HttpClientUtils.doPostJson(baseUrl, String.valueOf(requestBody));
+            log.debug("请求设备数据接口,设备数量:{}", deviceUuids.size());
+            String response = HttpClientUtils.doPostJson(baseUrl, requestBody.toJSONString());
 
-        List<JSONObject> resultList = parseResponseData(response, deviceType, devices);
+            List<JSONObject> resultList = parseResponseData(response, transferVO.getDeviceType(), transferVO.getDevices());
 
-        if (resultList.isEmpty() && simulation) {
-            resultList = generateSimulationData(deviceType, devices);
-        }
-
-        return resultList;
-
-    }
+            // 若返回数据为空且开启模拟模式,则生成模拟数据
+            if (resultList.isEmpty() && simulation) {
+                log.info("接口返回数据为空,生成模拟数据,设备类型:{}", transferVO.getDeviceType());
+                resultList = generateSimulationData(transferVO.getDeviceType(), transferVO.getDevices());
+            }
 
-    /**
-     * 获取指定设备类型的设备UUID列表
-     */
-    private List<DmpDevice> getDeviceUuids(Integer deviceType, List<String> deviceUuid) {
-        LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
-        queryWrapper.eq(DmpDevice::getTenantId, 1208)
-                .eq(DmpDevice::getDeleteFlag, 0)
-                .eq(deviceType != null, DmpDevice::getDeviceType, deviceType)
-                .in(deviceUuid != null && !deviceUuid.isEmpty(), DmpDevice::getDeviceUuid, deviceUuid)
-                .orderByAsc(DmpDevice::getId);
-        return dmpDeviceMapper.selectList(queryWrapper);
+            return resultList;
+        } catch (Exception e) {
+            log.error("获取设备数据失败:{}", e.getMessage(), e);
+            // 异常情况下若开启模拟模式,则生成模拟数据
+            if (simulation) {
+                log.info("接口调用异常,生成模拟数据,设备类型:{}", transferVO.getDeviceType());
+                return generateSimulationData(transferVO.getDeviceType(), transferVO.getDevices());
+            }
+            return Collections.emptyList();
+        }
     }
 
     /**
      * 解析接口响应数据,提取指定字段
+     * @param responseJson 接口响应JSON字符串
+     * @param deviceType 设备类型
+     * @param devices 设备列表
+     * @return 解析后的设备数据列表
      */
     private List<JSONObject> parseResponseData(String responseJson, Integer deviceType, List<DmpDevice> devices) {
         List<JSONObject> resultList = new ArrayList<>();
-        if (responseJson == null) {
+        if (responseJson == null || responseJson.trim().isEmpty()) {
+            log.warn("接口响应数据为空");
             return resultList;
         }
 
-        Map<String, Integer> uuid2Id = devices.stream()
-                .collect(Collectors.toMap(DmpDevice::getDeviceUuid, DmpDevice::getId));
+        try {
+            // 构建设备UUID到ID的映射
+            Map<String, String> deviceUuidToIdMap = devices.stream()
+                    .collect(Collectors.toMap(DmpDevice::getDeviceUuid, DmpDevice::getDeviceId));
 
-        JSONObject responseObj = JSON.parseObject(responseJson);
-        if (!"SUCCESS".equals(responseObj.getString("status")) || !"0".equals(responseObj.getString("code"))) {
-            System.err.println("接口返回失败:" + responseObj.getString("msg"));
-            return resultList;
-        }
+            JSONObject responseObj = JSON.parseObject(responseJson);
 
-        JSONArray dataArray = responseObj.getJSONArray("data");
-        if (dataArray == null || dataArray.isEmpty()) {
-            return resultList;
-        }
+            // 检查响应状态
+            String status = responseObj.getString("status");
+            String code = responseObj.getString("code");
 
-        List<String> targetFields = getTargetFieldsByDeviceType(deviceType);
-
-        for (int i = 0; i < dataArray.size(); i++) {
-            JSONObject deviceData = dataArray.getJSONObject(i);
-            JSONObject metrics = deviceData.getJSONObject("metrics");
-            String deviceUuid = deviceData.getString("deviceuuid");
+            if (!"SUCCESS".equals(status) || !"0".equals(code)) {
+                log.warn("接口返回失败:状态={}, 错误码={}, 错误信息={}", status, code, responseObj.getString("msg"));
+                return resultList;
+            }
 
-            if (metrics == null) {
-                continue;
+            JSONArray dataArray = responseObj.getJSONArray("data");
+            if (dataArray == null || dataArray.isEmpty()) {
+                log.debug("接口响应数据为空数组");
+                return resultList;
             }
 
-            JSONObject targetData = new JSONObject();
-            boolean hasValidData = false;
-            for (String field : targetFields) {
-                Object value = metrics.get(field);
-                if (value != null) {
-                    targetData.put(field, value);
-                    hasValidData = true;
+            List<String> targetFields = getTargetFieldsByDeviceType(deviceType);
+
+            for (int i = 0; i < dataArray.size(); i++) {
+                JSONObject deviceData = dataArray.getJSONObject(i);
+                if (deviceData == null) {
+                    continue;
                 }
-            }
 
-            targetData.put("deviceuuid", deviceUuid);
+                JSONObject metrics = deviceData.getJSONObject("metrics");
+                String deviceUuid = deviceData.getString("deviceuuid");
 
-            Integer deviceId = uuid2Id.get(deviceUuid);
-            if (deviceId != null) {
-                targetData.put("device_id", deviceId);
-            }
+                if (metrics == null || deviceUuid == null) {
+                    continue;
+                }
+
+                JSONObject targetData = new JSONObject();
+                boolean hasValidData = false;
+
+                // 提取目标字段数据
+                for (String field : targetFields) {
+                    Object value = metrics.get(field);
+                    if (value != null) {
+                        targetData.put(field, value);
+                        hasValidData = true;
+                    }
+                }
 
-            if (hasValidData) {
-                resultList.add(targetData);
+                // 添加设备标识信息
+                targetData.put("deviceuuid", deviceUuid);
+                String deviceId = deviceUuidToIdMap.get(deviceUuid);
+                if (deviceId != null) {
+                    targetData.put("device_id", deviceId);
+                }
+
+                if (hasValidData) {
+                    resultList.add(targetData);
+                }
             }
-        }
 
-        return resultList;
+            log.debug("解析接口响应数据完成,有效设备数据数量:{}", resultList.size());
+            return resultList;
+        } catch (Exception e) {
+            log.error("解析接口响应数据失败:{}", e.getMessage(), e);
+            return Collections.emptyList();
+        }
     }
 
     /**
      * 根据设备类型获取目标字段(自动包含time)
+     * @param deviceType 设备类型
+     * @return 目标字段列表
      */
     private List<String> getTargetFieldsByDeviceType(Integer deviceType) {
+        if (deviceType == null) {
+            log.warn("获取目标字段失败:设备类型为空");
+            return Collections.singletonList("time");
+        }
+
         String fieldsStr = deviceFieldMapping.get(deviceType.toString());
+        if (fieldsStr == null || fieldsStr.trim().isEmpty()) {
+            log.warn("获取目标字段失败:设备类型{}对应的字段映射不存在", deviceType);
+            return Collections.singletonList("time");
+        }
+
         List<String> fields = Arrays.stream(fieldsStr.split(","))
                 .map(String::trim)
+                .filter(field -> !field.isEmpty())
                 .collect(Collectors.toList());
+
+        // 确保包含时间字段
         if (!fields.contains("time")) {
             fields.add("time");
         }
+
         return fields;
     }
 
@@ -171,126 +209,110 @@ public class DeviceDataQuery {
         List<JSONObject> simulationList = new ArrayList<>();
         long currentTime = System.currentTimeMillis();
 
+        // 定义模拟数据范围常量
+        final double TEMP_RANGE_MIN = 10.0;
+        final double TEMP_RANGE_MAX = 20.0;
+        final double HUMIDITY_RANGE_MIN = 40.0;
+        final double HUMIDITY_RANGE_MAX = 85.0;
+        final double OXYGEN_RANGE_MIN = 20.0;
+        final double OXYGEN_RANGE_MAX = 21.0;
+        final double CO2_RANGE_MIN = 750.0;
+        final double CO2_RANGE_MAX = 760.0;
+        final double VOLTAGE_RANGE_MIN = 220.0;
+        final double VOLTAGE_RANGE_MAX = 230.0;
+        final double CURRENT_RANGE_MIN = 0.0;
+        final double CURRENT_RANGE_MAX = 50.0;
+        final double POWER_RANGE_MIN = 1.0;
+        final double POWER_RANGE_MAX = 20.0;
+        final double TEMP_LINE_RANGE_MIN = 20.0;
+        final double TEMP_LINE_RANGE_MAX = 50.0;
+        final double LEAKAGE_CURRENT_RANGE_MIN = 0.0;
+        final double LEAKAGE_CURRENT_RANGE_MAX = 100.0;
+
         for (DmpDevice device : devices) {
             JSONObject simulationData = new JSONObject();
             simulationData.put("time", currentTime);
             simulationData.put("device_id", device.getDeviceId());
 
-            switch (deviceType) {
-                // 空气质量(701):温度/湿度/氧气/二氧化碳/一氧化碳
-                case 701:
-                    // 温度:-10~40℃ → 2位整数+2位小数(格式化时自动补零)
-                    double temperature = ThreadLocalRandom.current().nextDouble(10, 20);
-                    simulationData.put("wd", formatNumber(temperature, FORMAT_2_2));
-                    // 湿度:0~100% → 2位整数+2位小数
-                    double humidity = ThreadLocalRandom.current().nextDouble(40, 85);
-                    simulationData.put("sd", formatNumber(humidity, FORMAT_2_2));
-                    // 氧气浓度:0~21% → 2位整数+2位小数
-                    double o2 = ThreadLocalRandom.current().nextDouble(20, 21);
-                    simulationData.put("o2", formatNumber(o2, FORMAT_2_2));
-                    // 一氧化碳浓度:0~100ppm → 2位整数+2位小数
-                    double co = ThreadLocalRandom.current().nextDouble(0);
-                    simulationData.put("co", formatNumber(co, FORMAT_2_2));
-                    // 二氧化碳浓度:0~2000ppm → 0位整数+3位小数(实际范围0~2.000,对应0~2000ppm)
-                    double co2 = ThreadLocalRandom.current().nextDouble(750, 760);
-                    simulationData.put("co2", formatNumber(co2, FORMAT_0_3));
-                    break;
+            if (deviceType == null) {
+                log.warn("生成模拟数据失败:设备类型为空,设备ID:{}", device.getId());
+                continue;
+            }
 
+            switch (deviceType) {
                 // 单一温度传感器(707)
                 case 707:
-                    double temp707 = ThreadLocalRandom.current().nextDouble(10, 20);
+                    double temp707 = ThreadLocalRandom.current().nextDouble(TEMP_RANGE_MIN, TEMP_RANGE_MAX);
                     simulationData.put("wd", formatNumber(temp707, FORMAT_2_2));
                     break;
 
                 // 单一湿度传感器(708)
                 case 708:
-                    double hum708 = ThreadLocalRandom.current().nextDouble(40, 85);
+                    double hum708 = ThreadLocalRandom.current().nextDouble(HUMIDITY_RANGE_MIN, HUMIDITY_RANGE_MAX);
                     simulationData.put("sd", formatNumber(hum708, FORMAT_2_2));
                     break;
 
                 // 单一氧气传感器(709)
                 case 709:
-                    double o2709 = ThreadLocalRandom.current().nextDouble(20, 21);
+                    double o2709 = ThreadLocalRandom.current().nextDouble(OXYGEN_RANGE_MIN, OXYGEN_RANGE_MAX);
                     simulationData.put("o2", formatNumber(o2709, FORMAT_2_2));
                     break;
 
                 // 单一二氧化碳传感器(710)
                 case 710:
-                    double co2710 = ThreadLocalRandom.current().nextDouble(750, 760);
+                    double co2710 = ThreadLocalRandom.current().nextDouble(CO2_RANGE_MIN, CO2_RANGE_MAX);
                     simulationData.put("co2", formatNumber(co2710, FORMAT_0_3));
                     break;
 
                 // 单一一氧化碳传感器(711)
                 case 711:
-                    // double co711 = ThreadLocalRandom.current().nextDouble(0, 100);
                     simulationData.put("co", 0);
                     break;
 
-                // 水浸(702):保持原有逻辑
+                // 水浸(702)
                 case 702:
-                    // simulationData.put("leach_status", ThreadLocalRandom.current().nextInt(2));
                     simulationData.put("leach_status", 0);
                     break;
 
-                // 人员统计(703):保持原有逻辑(若需精度可补充格式化)
+                // 人员统计(703)
                 case 703:
-                    // simulationData.put("amount_into", ThreadLocalRandom.current().nextDouble() * 100);
-                    // simulationData.put("amount_out", ThreadLocalRandom.current().nextDouble() * 100);
-                    // simulationData.put("day_into", ThreadLocalRandom.current().nextDouble() * 1000);
-                    // simulationData.put("day_out", ThreadLocalRandom.current().nextDouble() * 1000);
                     simulationData.put("sensorValue", 0);
                     break;
 
-                // 电气火灾(704):对齐数据项代码+精度要求
+                // 电气火灾(704)
                 case 704:
                     // A/B/C相电压:3位整数+2位小数(220.00~230.00V)
-                    double aVoltage = ThreadLocalRandom.current().nextDouble(220, 230);
-                    simulationData.put("aVoltage", formatNumber(aVoltage, FORMAT_3_2));
-                    double bVoltage = ThreadLocalRandom.current().nextDouble(220, 230);
-                    simulationData.put("bVoltage", formatNumber(bVoltage, FORMAT_3_2));
-                    double cVoltage = ThreadLocalRandom.current().nextDouble(220, 230);
-                    simulationData.put("cVoltage", formatNumber(cVoltage, FORMAT_3_2));
+                    simulationData.put("aVoltage", formatNumber(ThreadLocalRandom.current().nextDouble(VOLTAGE_RANGE_MIN, VOLTAGE_RANGE_MAX), FORMAT_3_2));
+                    simulationData.put("bVoltage", formatNumber(ThreadLocalRandom.current().nextDouble(VOLTAGE_RANGE_MIN, VOLTAGE_RANGE_MAX), FORMAT_3_2));
+                    simulationData.put("cVoltage", formatNumber(ThreadLocalRandom.current().nextDouble(VOLTAGE_RANGE_MIN, VOLTAGE_RANGE_MAX), FORMAT_3_2));
 
                     // A/B/C相电流:3位整数+2位小数(0.00~50.00A)
-                    double aElectricity = ThreadLocalRandom.current().nextDouble(0, 50);
-                    simulationData.put("aElectricity", formatNumber(aElectricity, FORMAT_3_2));
-                    double bElectricity = ThreadLocalRandom.current().nextDouble(0, 50);
-                    simulationData.put("bElectricity", formatNumber(bElectricity, FORMAT_3_2));
-                    double cElectricity = ThreadLocalRandom.current().nextDouble(0, 50);
-                    simulationData.put("cElectricity", formatNumber(cElectricity, FORMAT_3_2));
+                    simulationData.put("aElectricity", formatNumber(ThreadLocalRandom.current().nextDouble(CURRENT_RANGE_MIN, CURRENT_RANGE_MAX), FORMAT_3_2));
+                    simulationData.put("bElectricity", formatNumber(ThreadLocalRandom.current().nextDouble(CURRENT_RANGE_MIN, CURRENT_RANGE_MAX), FORMAT_3_2));
+                    simulationData.put("cElectricity", formatNumber(ThreadLocalRandom.current().nextDouble(CURRENT_RANGE_MIN, CURRENT_RANGE_MAX), FORMAT_3_2));
 
-                    // 总功率:4位整数+2位小数(1.00~20.00,仅保留1-20范围)
-                    double totalPower = ThreadLocalRandom.current().nextDouble(1, 20.000001);
-                    simulationData.put("totalPower", formatNumber(totalPower, FORMAT_4_2));
+                    // 总功率:4位整数+2位小数(1.00~20.00)
+                    simulationData.put("totalPower", formatNumber(ThreadLocalRandom.current().nextDouble(POWER_RANGE_MIN, POWER_RANGE_MAX), FORMAT_4_2));
 
                     // 线温1-4:2位整数+2位小数(20.00~50.00℃)
-                    double line1TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
-                    simulationData.put("line1TEMP", formatNumber(line1TEMP, FORMAT_2_2));
-                    double line2TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
-                    simulationData.put("Line2TEMP", formatNumber(line2TEMP, FORMAT_2_2));
-                    double line3TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
-                    simulationData.put("Line3TEMP", formatNumber(line3TEMP, FORMAT_2_2));
-                    double line4TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
-                    simulationData.put("Line4TEMP", formatNumber(line4TEMP, FORMAT_2_2));
-
-                    // 剩余电流:4位整数+2位小数(0.00~100.00mA → 对应0.0000~0.1000A,按4.2格式化)
-                    double leakageCurrent = ThreadLocalRandom.current().nextDouble(0, 100);
-                    simulationData.put("leakageCurrent", formatNumber(leakageCurrent, FORMAT_4_2));
-                    break;
+                    simulationData.put("line1TEMP", formatNumber(ThreadLocalRandom.current().nextDouble(TEMP_LINE_RANGE_MIN, TEMP_LINE_RANGE_MAX), FORMAT_2_2));
+                    simulationData.put("Line2TEMP", formatNumber(ThreadLocalRandom.current().nextDouble(TEMP_LINE_RANGE_MIN, TEMP_LINE_RANGE_MAX), FORMAT_2_2));
+                    simulationData.put("Line3TEMP", formatNumber(ThreadLocalRandom.current().nextDouble(TEMP_LINE_RANGE_MIN, TEMP_LINE_RANGE_MAX), FORMAT_2_2));
+                    simulationData.put("Line4TEMP", formatNumber(ThreadLocalRandom.current().nextDouble(TEMP_LINE_RANGE_MIN, TEMP_LINE_RANGE_MAX), FORMAT_2_2));
 
-                // 电能采集(705):保持原有逻辑(若需精度可补充)
-                case 705:
-                    simulationData.put("electrical_energy", ThreadLocalRandom.current().nextDouble() * 10000);
+                    // 剩余电流:4位整数+2位小数(0.00~100.00mA)
+                    simulationData.put("leakageCurrent", formatNumber(ThreadLocalRandom.current().nextDouble(LEAKAGE_CURRENT_RANGE_MIN, LEAKAGE_CURRENT_RANGE_MAX), FORMAT_4_2));
                     break;
 
                 default:
+                    log.warn("未知设备类型:{},无法生成模拟数据", deviceType);
                     break;
             }
 
             simulationList.add(simulationData);
         }
 
-        System.out.println("生成模拟数据,设备类型:" + deviceType + ",数量:" + simulationList.size());
+        log.info("生成模拟数据完成,设备类型:{},数量:{}", deviceType, simulationList.size());
         return simulationList;
     }
 
@@ -298,9 +320,12 @@ public class DeviceDataQuery {
      * 通用数值格式化方法
      * @param value 原始数值
      * @param format 格式化器
-     * @return 格式化后的字符串(可根据需求转为Double)
+     * @return 格式化后的字符串
      */
     private String formatNumber(double value, DecimalFormat format) {
+        if (format == null) {
+            return String.valueOf(value);
+        }
         return format.format(value);
     }
 

+ 20 - 21
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -1,9 +1,9 @@
 package com.usky.cdi.service.util;
 
-import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.service.impl.IotDataTransferService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -14,35 +14,34 @@ import org.springframework.stereotype.Component;
 @Component
 public class DeviceDataSyncService {
 
-    @Autowired
-    private final DmpDeviceMapper dmpDeviceMapper;
-    @Autowired
-    private IotDataTransferService iotDataTransferService;
+    private final IotDataTransferService iotDataTransferService;
 
-    // 构造器注入(推荐)
-    public DeviceDataSyncService(DmpDeviceMapper dmpDeviceMapper) {
-        this.dmpDeviceMapper = dmpDeviceMapper;
+    @Autowired
+    public DeviceDataSyncService(IotDataTransferService iotDataTransferService) {
+        this.iotDataTransferService = iotDataTransferService;
     }
 
-    // @PostConstruct
-    // public void init() {
-    //     scheduledDeviceDataSync();
-    // }
-
     /**
-     * 定时任务:每小时0分、30分执行设备数据同步上报
-     * cron表达式:0 0,30 * * * ? → 秒 分 时 日 月 周 年
-     * 含义:每分钟的0秒、分=0或30、小时任意、日期/月份/星期任意
+     * 定时任务:定期执行设备数据同步上报
+     * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
+     * initialDelay:初始化后立即执行第一次任务
      */
-    // @Scheduled(cron = "*/30 * * * * ?")
-    // @Scheduled(cron = "0 0,30 * * * ?")
     @Scheduled(fixedDelay = 29 * 60 * 1000, initialDelay = 0)
     public void scheduledDeviceDataSync() {
+        Integer tenantId = 1208;
+        Long engineeringId = 3101130019L;
+        log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+
         try {
-            iotDataTransferService.synchronizeDeviceData();
+            // 参数验证
+            if (engineeringId == null) {
+                log.error("设备数据同步失败:工程ID不能为空");
+                return;
+            }
+
+            iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
         } catch (Exception e) {
-            log.error("定时任务执行设备数据同步失败", e);
-            System.err.println("定时任务执行设备数据同步失败:" + e.getMessage());
+            log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
         }
     }
 }

+ 28 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/IotDataTransferVO.java

@@ -0,0 +1,28 @@
+package com.usky.cdi.service.vo;
+
+import com.usky.cdi.domain.DmpDevice;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * @author fu
+ * @date 2025/12/08 10:02
+ */
+@Data
+public class IotDataTransferVO {
+    /**
+     * 设备UUID列表
+     */
+    private List<DmpDevice> devices;
+
+    /**
+     * 人防工程ID
+     */
+    private Long engineeringId;
+
+    /**
+     * 产品ID
+     */
+    private Integer deviceType;
+}

+ 0 - 0
service-cdi/service-cdi-biz/src/main/resources/mapper.cdi/DmpDeviceMapper.xml → service-cdi/service-cdi-biz/src/main/resources/mapper/cdi/DmpDeviceMapper.xml


+ 32 - 0
service-cdi/service-cdi-biz/src/main/resources/mapper/cdi/DmpProductMapper.xml

@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.usky.cdi.mapper.DmpProductMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.usky.cdi.domain.DmpProduct">
+        <id column="id" property="id" />
+        <result column="product_name" property="productName" />
+        <result column="access_mode" property="accessMode" />
+        <result column="network_type" property="networkType" />
+        <result column="device_type" property="deviceType" />
+        <result column="com_protocol" property="comProtocol" />
+        <result column="auth_mode" property="authMode" />
+        <result column="device_model" property="deviceModel" />
+        <result column="product_describe" property="productDescribe" />
+        <result column="factory_name" property="factoryName" />
+        <result column="factory_person" property="factoryPerson" />
+        <result column="factory_phone" property="factoryPhone" />
+        <result column="certificate_url1" property="certificateUrl1" />
+        <result column="certificate_url2" property="certificateUrl2" />
+        <result column="certificate_url3" property="certificateUrl3" />
+        <result column="agreement_url" property="agreementUrl" />
+        <result column="delete_flag" property="deleteFlag" />
+        <result column="created_by" property="createdBy" />
+        <result column="created_time" property="createdTime" />
+        <result column="updated_by" property="updatedBy" />
+        <result column="updated_time" property="updatedTime" />
+        <result column="tenant_id" property="tenantId" />
+        <result column="product_code" property="productCode" />
+    </resultMap>
+
+</mapper>