Przeglądaj źródła

人防代码优化,新增定时任务

fuyuchuan 1 dzień temu
rodzic
commit
c13ae4c5d9

+ 7 - 0
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/RemotecdiTaskService.java

@@ -1,4 +1,11 @@
 package com.usky.cdi;
 
+import com.usky.cdi.factory.RemotecdiTaskFactory;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.GetMapping;
+
+@FeignClient(contextId = "RemotecdiTaskService", value = "service-cdi", fallbackFactory = RemotecdiTaskFactory.class)
 public interface RemotecdiTaskService {
+    @GetMapping("/synchronizeDeviceData")
+    void synchronizeDeviceData();
 }

+ 22 - 1
service-cdi/service-cdi-api/src/main/java/com/usky/cdi/factory/RemotecdiTaskFactory.java

@@ -1,4 +1,25 @@
 package com.usky.cdi.factory;
 
-public class RemotecdiTaskFactory {
+import com.usky.cdi.RemotecdiTaskService;
+import com.usky.common.core.exception.FeignBadRequestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.openfeign.FallbackFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RemotecdiTaskFactory implements FallbackFactory<RemotecdiTaskService> {
+    private static final Logger log = LoggerFactory.getLogger(RemotecdiTaskFactory.class);
+
+    @Override
+    public RemotecdiTaskService create(Throwable throwable) {
+        log.error("用户服务调用失败:{}", throwable.getMessage());
+        return new RemotecdiTaskService() {
+            @Override
+            public void synchronizeDeviceData() {
+                throw new FeignBadRequestException(500, "人防设备数据定时推送异常" + throwable.getMessage());
+            }
+
+        };
+    }
 }

+ 2 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/RuoYiSystemApplication.java

@@ -11,6 +11,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.core.env.Environment;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -22,6 +23,7 @@ import java.net.UnknownHostException;
  */
 
 //@EnableSwagger2
+@EnableScheduling
 @EnableFeignClients(basePackages = "com.usky")
 @MapperScan(value = "com.usky.cdi.mapper")
 @ComponentScan("com.usky")

+ 6 - 6
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/IotDataController.java

@@ -30,8 +30,8 @@ public class IotDataController {
      * 上报水浸状态
      */
     @PostMapping("/flooded")
-    public String sendWaterLeak(@RequestBody WaterLeakVO vo) {
-        boolean success = iotDataTransferService.sendWaterLeak(vo);
+    public String sendWaterLeak(@RequestBody JSONObject jsonObject) {
+        boolean success = iotDataTransferService.sendWaterLeak(jsonObject);
         return success ? "上报成功" : "上报失败";
     }
 
@@ -48,8 +48,8 @@ public class IotDataController {
      * 上报人员闯入
      */
     @PostMapping("/personPresence")
-    public String sendPerson(@RequestBody PersonPresenceVO vo) {
-        boolean success = iotDataTransferService.sendPersonPresence(vo);
+    public String sendPerson(@RequestBody JSONObject jsonObject) {
+        boolean success = iotDataTransferService.sendPersonPresence(jsonObject);
         return success ? "上报成功" : "上报失败";
     }
 
@@ -57,8 +57,8 @@ public class IotDataController {
      * 上报用电负荷
      */
     @PostMapping("/electricityLoad")
-    public String sendElectricityLoad() {
-        boolean success = iotDataTransferService.sendElectricityLoad();
+    public String sendElectricityLoad(@RequestBody JSONObject jsonObject) {
+        boolean success = iotDataTransferService.sendElectricityLoad(jsonObject);
         return success ? "上报成功" : "上报失败";
     }
 

+ 27 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/api/RemotecdiTaskApi.java

@@ -0,0 +1,27 @@
+package com.usky.cdi.controller.api;
+
+import com.usky.cdi.RemotecdiTaskService;
+import com.usky.cdi.service.impl.IotDataTransferService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.time.LocalDateTime;
+
+/**
+ * <p>
+ *  数据统一查询 前端控制器
+ * </p>
+ *
+ * @author f
+ */
+@RestController
+public class RemotecdiTaskApi implements RemotecdiTaskService {
+
+    @Autowired
+    private IotDataTransferService iotDataTransferService;
+
+    @Override
+    public void synchronizeDeviceData() {
+        iotDataTransferService.synchronizeDeviceData();
+    }
+}

+ 73 - 60
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -2,6 +2,9 @@ package com.usky.cdi.service.impl;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.usky.cdi.domain.DmpDevice;
+import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.service.config.mqtt.MqttGateway;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
 import com.usky.cdi.service.util.DeviceDataQuery;
@@ -13,12 +16,12 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 /**
  * @author fyc
@@ -41,6 +44,12 @@ public class IotDataTransferService {
     @Value("${config.engineeringID}")
     private Long engineeringID;
 
+    @Autowired
+    private DmpDeviceMapper dmpDeviceMapper;
+
+    @Value("${device.data.simulation}")
+    private boolean simulation;
+
 
     public IotDataTransferService() {
         // 使用默认的workerId和datacenterId,实际项目中可以从配置读取
@@ -67,35 +76,20 @@ public class IotDataTransferService {
      *
      * @return 是否发送成功
      */
-    public boolean sendWaterLeak(WaterLeakVO waterLeakVO) {
+    public boolean sendWaterLeak(JSONObject jsonObject) {
         if (mqttGateway == null) {
             log.warn("MQTT Gateway未初始化,无法发送消息");
             return false;
         }
         try {
-            String deviceUuid = waterLeakVO.getDeviceUuid() == null ? "" : waterLeakVO.getDeviceUuid();
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(702, deviceUuid);
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
 
             if (deviceData.isEmpty()) {
-                log.warn("没有获取到水浸数据!deviceUuid:{}", deviceUuid);
+                log.warn("没有获取到水浸数据!deviceUuids:{}", jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
                 return false;
             }
 
             for (JSONObject deviceDataItem : deviceData) {
-                // 增加空值判断
-                String leachStatusStr = deviceDataItem.getString("leach_status");
-                if (leachStatusStr == null || leachStatusStr.isEmpty()) {
-                    log.warn("设备{}的leach_status为空", deviceDataItem.getString("device_id"));
-                    continue;
-                }
-                Integer leachStatus;
-                try {
-                    leachStatus = Integer.valueOf(leachStatusStr);
-                } catch (NumberFormatException e) {
-                    log.error("leach_status转换失败:{}", leachStatusStr, e);
-                    continue;
-                }
-
                 Long dataTime = deviceDataItem.getLong("time");
                 if (dataTime == null) {
                     log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
@@ -108,7 +102,7 @@ public class IotDataTransferService {
                 vo.setSensorID(deviceDataItem.getIntValue("device_id"));
                 vo.setEngineeringID(engineeringID);
                 vo.setPublishTime(getCurrentTime());
-                vo.setSensorValue(leachStatus);
+                vo.setSensorValue(deviceDataItem.getInteger("leach_status"));
                 vo.setDataEndTime(dataEndTime);
 
                 String json = JSON.toJSONString(vo);
@@ -138,17 +132,11 @@ public class IotDataTransferService {
         }
         try {
             Integer deviceType = jsonObject.getInteger("deviceType");
-            String deviceUuid = jsonObject.getString("deviceUuid");
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(deviceType, jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
 
-            if (deviceType == null) {
-                log.error("deviceType不能为空");
-                return false;
-            }
-
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(deviceType, deviceUuid);
 
             if (deviceData.isEmpty()) {
-                log.warn("没有获取到空气质量数据!deviceType:{}, deviceUuid:{}", deviceType, deviceUuid);
+                log.warn("没有获取到空气质量数据!deviceType:{}, deviceUuids:{}", jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
                 return false;
             }
 
@@ -185,8 +173,6 @@ public class IotDataTransferService {
                     case 711:
                         sendCoData(deviceId, dataEndTime, deviceDataItem);
                         break;
-                    default:
-                        log.warn("不支持的设备类型:{}", deviceType);
                 }
             }
             return true;
@@ -199,31 +185,39 @@ public class IotDataTransferService {
     /**
      * 发送人员闯入情况
      *
-     * @param vo 人员闯入
      * @return 是否发送成功
      **/
-    public boolean sendPersonPresence(PersonPresenceVO vo) {
+    public boolean sendPersonPresence(JSONObject jsonObject) {
         if (mqttGateway == null) {
             log.warn("MQTT Gateway未初始化,无法发送消息");
             return false;
         }
-        if (vo == null) {
-            log.error("PersonPresenceVO不能为空");
-            return false;
-        }
+
         try {
-            if (vo.getDataPacketID() == null) {
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
+            for (JSONObject deviceDataItem : deviceData) {
+
+                Long dataTime = deviceDataItem.getLong("time");
+                if (dataTime == null) {
+                    log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
+                    continue;
+                }
+                LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
+                Integer deviceId = deviceDataItem.getIntValue("device_id");
+
+                PersonPresenceVO vo = new PersonPresenceVO();
                 vo.setDataPacketID(generateDataPacketID());
-            }
-            if (vo.getPublishTime() == null) {
+                vo.setSensorID(deviceId);
+                vo.setDataEndTime(dataEndTime);
                 vo.setPublishTime(getCurrentTime());
+                vo.setEngineeringID(engineeringID);
+                // vo.setSensorValue(deviceDataItem.getIntValue("sensorValue"));
+                vo.setSensorValue(0);
+                String json = JSON.toJSONString(vo);
+                String topic = EnvMonitorMqttTopic.PERSON_PRESENCE.getTopic();
+                log.info("发送人员闯入情况,Topic: {}, Data: {}", topic, json);
+                mqttGateway.sendToMqtt(topic, json);
             }
-            vo.setEngineeringID(engineeringID); // 确保工程ID被设置
-
-            String json = JSON.toJSONString(vo);
-            String topic = EnvMonitorMqttTopic.PERSON_PRESENCE.getTopic();
-            log.info("发送人员闯入情况,Topic: {}, Data: {}", topic, json);
-            mqttGateway.sendToMqtt(topic, json);
             return true;
         } catch (Exception e) {
             log.error("发送人员闯入情况失败", e);
@@ -236,13 +230,13 @@ public class IotDataTransferService {
      *
      * @return 是否发送成功
      **/
-    public boolean sendElectricityLoad() {
+    public boolean sendElectricityLoad(JSONObject jsonObject) {
         if (mqttGateway == null) {
             log.warn("MQTT Gateway未初始化,无法发送消息");
             return false;
         }
         try {
-            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(704, "");
+            List<JSONObject> deviceData = deviceDataQuery.getDeviceData(jsonObject.getInteger("deviceType"), jsonObject.getJSONArray("deviceUuids") == null ? new ArrayList<String>() : jsonObject.getJSONArray("deviceUuids").toJavaList(String.class));
 
             for (JSONObject deviceDataItem : deviceData) {
                 Long dataTime = deviceDataItem.getLong("time");
@@ -258,18 +252,21 @@ public class IotDataTransferService {
                 vo.setEngineeringID(engineeringID);
                 vo.setPublishTime(getCurrentTime());
                 vo.setDataEndTime(dataEndTime);
-                vo.setAVoltage(deviceDataItem.getFloat("voltage_a"));
-                vo.setBVoltage(deviceDataItem.getFloat("voltage_b"));
-                vo.setCVoltage(deviceDataItem.getFloat("voltage_c"));
-                vo.setAElectricity(deviceDataItem.getFloat("current_a"));
-                vo.setBElectricity(deviceDataItem.getFloat("current_b"));
-                vo.setCElectricity(deviceDataItem.getFloat("current_c"));
-                vo.setLine1TEMP(deviceDataItem.getFloat("temperature_a"));
-                vo.setLine2TEMP(deviceDataItem.getFloat("temperature_b"));
-                vo.setLine3TEMP(deviceDataItem.getFloat("temperature_c"));
-                vo.setLeakageCurrent(deviceDataItem.getFloat("current_residual"));
-                // 使用线程安全的随机数生成器
-                vo.setTotalPower(ThreadLocalRandom.current().nextFloat() * 10f);
+                vo.setAVoltage(deviceDataItem.getFloat("aVoltage"));
+                vo.setBVoltage(deviceDataItem.getFloat("bVoltage"));
+                vo.setCVoltage(deviceDataItem.getFloat("cVoltage"));
+                vo.setAElectricity(deviceDataItem.getFloat("aElectricity"));
+                vo.setBElectricity(deviceDataItem.getFloat("bElectricity"));
+                vo.setCElectricity(deviceDataItem.getFloat("cElectricity"));
+                vo.setLine1TEMP(deviceDataItem.getFloat("line1TEMP"));
+                vo.setLine2TEMP(deviceDataItem.getFloat("Line2TEMP"));
+                vo.setLine3TEMP(deviceDataItem.getFloat("Line3TEMP"));
+                vo.setLeakageCurrent(deviceDataItem.getFloat("leakageCurrent"));
+                if (simulation) {
+                    vo.setTotalPower(deviceDataItem.getFloat("totalPower"));
+                } else {
+                    vo.setTotalPower(deviceDataItem.getFloat("active_power"));
+                }
 
                 String json = JSON.toJSONString(vo);
                 String topic = EnvMonitorMqttTopic.ELECTRICITY_LOAD.getTopic();
@@ -378,4 +375,20 @@ public class IotDataTransferService {
         mqttGateway.sendToMqtt(topic, json);
         log.info("发送二氧化碳浓度信息,Topic: {}, Data: {}", topic, json);
     }
+
+    public void synchronizeDeviceData() {
+        LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.eq(DmpDevice::getTenantId, 1208);
+        List<DmpDevice> deviceList = dmpDeviceMapper.selectList(queryWrapper);
+        List<Integer> deviceTypeList = deviceList.stream().map(DmpDevice::getDeviceType).distinct().collect(Collectors.toList());
+
+        for (Integer deviceType : deviceTypeList) {
+            JSONObject json = new JSONObject();
+            json.put("deviceType", deviceType);
+            this.sendWaterLeak(json);
+            this.sendEnvData(json);
+            this.sendPersonPresence(json);
+            this.sendElectricityLoad(json);
+        }
+    }
 }

+ 123 - 48
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java

@@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.usky.cdi.domain.DmpDevice;
 import com.usky.cdi.mapper.DmpDeviceMapper;
 import lombok.Data;
@@ -14,6 +13,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
+import java.text.DecimalFormat;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -37,19 +37,31 @@ public class DeviceDataQuery {
     private boolean simulation;
     private Map<String, String> deviceFieldMapping;
 
+    // 定义各参数的格式化器(整数位,小数位)
+    private static final DecimalFormat FORMAT_2_2 = new DecimalFormat("00.00"); // 2位整数+2位小数
+    private static final DecimalFormat FORMAT_0_3 = new DecimalFormat("0.000"); // 0位整数+3位小数
+    private static final DecimalFormat FORMAT_3_2 = new DecimalFormat("000.00"); // 3位整数+2位小数
+    private static final DecimalFormat FORMAT_4_2 = new DecimalFormat("0000.00"); // 4位整数+2位小数
+
     /**
      * 获取指定设备类型的设备数据
      */
-    public List<JSONObject> getDeviceData(Integer deviceType, String deviceUuid) {
+    public List<JSONObject> getDeviceData(Integer deviceType, List<String> deviceUuid) {
+
         List<DmpDevice> devices = getDeviceUuids(deviceType, deviceUuid);
         if (devices.isEmpty()) {
             log.warn("该租户下没有注册设备!");
             return Collections.emptyList();
         }
-        List<String> deviceUuids = devices.stream().map(DmpDevice::getDeviceUuid).collect(Collectors.toList());
 
         JSONObject requestBody = new JSONObject();
-        requestBody.put("deviceUuids", deviceUuids);
+
+        if (deviceUuid != null && !deviceUuid.isEmpty()) {
+            requestBody.put("deviceUuids", deviceUuid);
+        } else {
+            List<String> deviceUuids = devices.stream().map(DmpDevice::getDeviceUuid).collect(Collectors.toList());
+            requestBody.put("deviceUuids", deviceUuids);
+        }
 
         String response = HttpClientUtils.doPostJson(baseUrl, String.valueOf(requestBody));
 
@@ -66,11 +78,11 @@ public class DeviceDataQuery {
     /**
      * 获取指定设备类型的设备UUID列表
      */
-    private List<DmpDevice> getDeviceUuids(Integer deviceType, String deviceUuid) {
+    private List<DmpDevice> getDeviceUuids(Integer deviceType, List<String> deviceUuid) {
         LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(DmpDevice::getTenantId, 1208)
                 .eq(deviceType != null, DmpDevice::getDeviceType, deviceType)
-                .eq(StringUtils.isNotBlank(deviceUuid), DmpDevice::getDeviceUuid, deviceUuid);
+                .in(deviceUuid != null && !deviceUuid.isEmpty(), DmpDevice::getDeviceUuid, deviceUuid);
         return dmpDeviceMapper.selectList(queryWrapper);
     }
 
@@ -148,7 +160,10 @@ public class DeviceDataQuery {
     }
 
     /**
-     * 生成模拟数据
+     * 生成模拟数据(按指定精度格式化)
+     * @param deviceType 设备类型
+     * @param devices 设备列表
+     * @return 模拟数据列表
      */
     private List<JSONObject> generateSimulationData(Integer deviceType, List<DmpDevice> devices) {
         List<JSONObject> simulationList = new ArrayList<>();
@@ -156,66 +171,116 @@ public class DeviceDataQuery {
 
         for (DmpDevice device : devices) {
             JSONObject simulationData = new JSONObject();
-            // simulationData.put("deviceuuid", deviceUuid);
             simulationData.put("time", currentTime);
-            simulationData.put("device_id", device.getId());
+            simulationData.put("device_id", device.getDeviceId());
 
             switch (deviceType) {
-                // 空气质量
+                // 空气质量(701):温度/湿度/氧气/二氧化碳/一氧化碳
                 case 701:
-                    simulationData.put("wd", ThreadLocalRandom.current().nextInt(50) - 10); // 温度:-10~40℃
-                    simulationData.put("sd", ThreadLocalRandom.current().nextInt(101)); // 湿度:0~100%
-                    simulationData.put("o2", ThreadLocalRandom.current().nextDouble() * 21); // 氧气:0~21%
-                    simulationData.put("co", ThreadLocalRandom.current().nextDouble() * 100); // 一氧化碳:0~100ppm
-                    simulationData.put("co2", ThreadLocalRandom.current().nextDouble() * 2000); // 二氧化碳:0~2000ppm
+                    // 温度:-10~40℃ → 2位整数+2位小数(格式化时自动补零)
+                    double temperature = ThreadLocalRandom.current().nextDouble(10, 20);
+                    simulationData.put("wd", formatNumber(temperature, FORMAT_2_2));
+                    // 湿度:0~100% → 2位整数+2位小数
+                    double humidity = ThreadLocalRandom.current().nextDouble(40, 85);
+                    simulationData.put("sd", formatNumber(humidity, FORMAT_2_2));
+                    // 氧气浓度:0~21% → 2位整数+2位小数
+                    double o2 = ThreadLocalRandom.current().nextDouble(20, 21);
+                    simulationData.put("o2", formatNumber(o2, FORMAT_2_2));
+                    // 一氧化碳浓度:0~100ppm → 2位整数+2位小数
+                    double co = ThreadLocalRandom.current().nextDouble(0);
+                    simulationData.put("co", formatNumber(co, FORMAT_2_2));
+                    // 二氧化碳浓度:0~2000ppm → 0位整数+3位小数(实际范围0~2.000,对应0~2000ppm)
+                    double co2 = ThreadLocalRandom.current().nextDouble(750, 760);
+                    simulationData.put("co2", formatNumber(co2, FORMAT_0_3));
                     break;
-                case 707: // 温度:wd
-                    simulationData.put("wd", ThreadLocalRandom.current().nextInt(50) - 10);
+
+                // 单一温度传感器(707)
+                case 707:
+                    double temp707 = ThreadLocalRandom.current().nextDouble(10, 20);
+                    simulationData.put("wd", formatNumber(temp707, FORMAT_2_2));
                     break;
-                case 708: // 湿度:sd
-                    simulationData.put("sd", ThreadLocalRandom.current().nextInt(101));
+
+                // 单一湿度传感器(708)
+                case 708:
+                    double hum708 = ThreadLocalRandom.current().nextDouble(40, 85);
+                    simulationData.put("sd", formatNumber(hum708, FORMAT_2_2));
                     break;
-                case 709: // 氧气:o2
-                    simulationData.put("o2", ThreadLocalRandom.current().nextDouble() * 21);
+
+                // 单一氧气传感器(709)
+                case 709:
+                    double o2709 = ThreadLocalRandom.current().nextDouble(20, 21);
+                    simulationData.put("o2", formatNumber(o2709, FORMAT_2_2));
                     break;
-                case 710: // 二氧化碳:co2
-                    simulationData.put("co2", ThreadLocalRandom.current().nextDouble() * 2000);
+
+                // 单一二氧化碳传感器(710)
+                case 710:
+                    double co2710 = ThreadLocalRandom.current().nextDouble(750, 760);
+                    simulationData.put("co2", formatNumber(co2710, FORMAT_0_3));
                     break;
-                case 711: // 一氧化碳:co
-                    simulationData.put("co", ThreadLocalRandom.current().nextDouble() * 100);
+
+                // 单一一氧化碳传感器(711)
+                case 711:
+                    // double co711 = ThreadLocalRandom.current().nextDouble(0, 100);
+                    simulationData.put("co", 0);
                     break;
-                // 水浸
+
+                // 水浸(702):保持原有逻辑
                 case 702:
-                    // 渗漏状态:0-正常,1-渗漏
-                    simulationData.put("leach_status", ThreadLocalRandom.current().nextInt(2));
+                    // simulationData.put("leach_status", ThreadLocalRandom.current().nextInt(2));
+                    simulationData.put("leach_status", 0);
                     break;
-                // 人员统计
+
+                // 人员统计(703):保持原有逻辑(若需精度可补充格式化)
                 case 703:
-                    // 流量数据:模拟正数
-                    simulationData.put("amount_into", ThreadLocalRandom.current().nextDouble() * 100);
-                    simulationData.put("amount_out", ThreadLocalRandom.current().nextDouble() * 100);
-                    simulationData.put("day_into", ThreadLocalRandom.current().nextDouble() * 1000);
-                    simulationData.put("day_out", ThreadLocalRandom.current().nextDouble() * 1000);
+                    // simulationData.put("amount_into", ThreadLocalRandom.current().nextDouble() * 100);
+                    // simulationData.put("amount_out", ThreadLocalRandom.current().nextDouble() * 100);
+                    // simulationData.put("day_into", ThreadLocalRandom.current().nextDouble() * 1000);
+                    // simulationData.put("day_out", ThreadLocalRandom.current().nextDouble() * 1000);
+                    simulationData.put("sensorValue", 0);
                     break;
-                // 电气火灾
+
+                // 电气火灾(704):对齐数据项代码+精度要求
                 case 704:
-                    // 电气参数:模拟合理范围
-                    simulationData.put("voltage_a", 220 + ThreadLocalRandom.current().nextDouble() * 10); // 电压A:220~230V
-                    simulationData.put("voltage_b", 220 + ThreadLocalRandom.current().nextDouble() * 10);
-                    simulationData.put("voltage_c", 220 + ThreadLocalRandom.current().nextDouble() * 10);
-                    simulationData.put("current_a", ThreadLocalRandom.current().nextDouble() * 50); // 电流A:0~50A
-                    simulationData.put("current_b", ThreadLocalRandom.current().nextDouble() * 50);
-                    simulationData.put("current_c", ThreadLocalRandom.current().nextDouble() * 50);
-                    simulationData.put("temperature_a", 20 + ThreadLocalRandom.current().nextDouble() * 30); // 线温A:20~50℃
-                    simulationData.put("temperature_b", 20 + ThreadLocalRandom.current().nextDouble() * 30);
-                    simulationData.put("temperature_c", 20 + ThreadLocalRandom.current().nextDouble() * 30);
-                    simulationData.put("current_residual", ThreadLocalRandom.current().nextDouble() * 100); // 剩余电流:0~1A
+                    // A/B/C相电压:3位整数+2位小数(220.00~230.00V)
+                    double aVoltage = ThreadLocalRandom.current().nextDouble(220, 230);
+                    simulationData.put("aVoltage", formatNumber(aVoltage, FORMAT_3_2));
+                    double bVoltage = ThreadLocalRandom.current().nextDouble(220, 230);
+                    simulationData.put("bVoltage", formatNumber(bVoltage, FORMAT_3_2));
+                    double cVoltage = ThreadLocalRandom.current().nextDouble(220, 230);
+                    simulationData.put("cVoltage", formatNumber(cVoltage, FORMAT_3_2));
+
+                    // A/B/C相电流:3位整数+2位小数(0.00~50.00A)
+                    double aElectricity = ThreadLocalRandom.current().nextDouble(0, 50);
+                    simulationData.put("aElectricity", formatNumber(aElectricity, FORMAT_3_2));
+                    double bElectricity = ThreadLocalRandom.current().nextDouble(0, 50);
+                    simulationData.put("bElectricity", formatNumber(bElectricity, FORMAT_3_2));
+                    double cElectricity = ThreadLocalRandom.current().nextDouble(0, 50);
+                    simulationData.put("cElectricity", formatNumber(cElectricity, FORMAT_3_2));
+
+                    // 总功率:4位整数+2位小数(1.00~20.00,仅保留1-20范围)
+                    double totalPower = ThreadLocalRandom.current().nextDouble(1, 20.000001);
+                    simulationData.put("totalPower", formatNumber(totalPower, FORMAT_4_2));
+
+                    // 线温1-4:2位整数+2位小数(20.00~50.00℃)
+                    double line1TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
+                    simulationData.put("line1TEMP", formatNumber(line1TEMP, FORMAT_2_2));
+                    double line2TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
+                    simulationData.put("Line2TEMP", formatNumber(line2TEMP, FORMAT_2_2));
+                    double line3TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
+                    simulationData.put("Line3TEMP", formatNumber(line3TEMP, FORMAT_2_2));
+                    double line4TEMP = ThreadLocalRandom.current().nextDouble(20, 50);
+                    simulationData.put("Line4TEMP", formatNumber(line4TEMP, FORMAT_2_2));
+
+                    // 剩余电流:4位整数+2位小数(0.00~100.00mA → 对应0.0000~0.1000A,按4.2格式化)
+                    double leakageCurrent = ThreadLocalRandom.current().nextDouble(0, 100);
+                    simulationData.put("leakageCurrent", formatNumber(leakageCurrent, FORMAT_4_2));
                     break;
-                // 电能采集
+
+                // 电能采集(705):保持原有逻辑(若需精度可补充)
                 case 705:
-                    // 电能:模拟正数
                     simulationData.put("electrical_energy", ThreadLocalRandom.current().nextDouble() * 10000);
                     break;
+
                 default:
                     break;
             }
@@ -227,4 +292,14 @@ public class DeviceDataQuery {
         return simulationList;
     }
 
+    /**
+     * 通用数值格式化方法
+     * @param value 原始数值
+     * @param format 格式化器
+     * @return 格式化后的字符串(可根据需求转为Double)
+     */
+    private String formatNumber(double value, DecimalFormat format) {
+        return format.format(value);
+    }
+
 }

+ 76 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -0,0 +1,76 @@
+package com.usky.cdi.service.util;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.usky.cdi.domain.DmpDevice;
+import com.usky.cdi.mapper.DmpDeviceMapper;
+import com.usky.cdi.service.config.mqtt.MqttGateway;
+import com.usky.cdi.service.impl.IotDataTransferService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 设备数据同步组件(包含定时任务)
+ */
+@Slf4j
+@Component
+public class DeviceDataSyncService {
+
+    @Autowired
+    private final DmpDeviceMapper dmpDeviceMapper;
+    @Autowired
+    private IotDataTransferService iotDataTransferService;
+
+    // 构造器注入(推荐)
+    public DeviceDataSyncService(DmpDeviceMapper dmpDeviceMapper) {
+        this.dmpDeviceMapper = dmpDeviceMapper;
+    }
+
+    // @PostConstruct
+    // public void init() {
+    //     scheduledDeviceDataSync();
+    // }
+
+    /**
+     * 定时任务:每小时0分、30分执行设备数据同步上报
+     * cron表达式:0 0,30 * * * ? → 秒 分 时 日 月 周 年
+     * 含义:每分钟的0秒、分=0或30、小时任意、日期/月份/星期任意
+     */
+    @Scheduled(cron = "0 0,30 * * * ?")
+    // @Scheduled(cron = "*/30 * * * * ?")
+    public void scheduledDeviceDataSync() {
+        try {
+            this.synchronizeDeviceData();
+        } catch (Exception e) {
+            log.error("定时任务执行设备数据同步失败", e);
+            System.err.println("定时任务执行设备数据同步失败:" + e.getMessage());
+        }
+    }
+
+    /**
+     * 核心:设备数据同步逻辑(原有方法)
+     */
+    public void synchronizeDeviceData() {
+        LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.eq(DmpDevice::getTenantId, 1208);
+        List<DmpDevice> deviceList = dmpDeviceMapper.selectList(queryWrapper);
+        List<Integer> deviceTypeList = deviceList.stream()
+                .map(DmpDevice::getDeviceType)
+                .distinct()
+                .collect(Collectors.toList());
+
+        for (Integer deviceType : deviceTypeList) {
+            com.alibaba.fastjson.JSONObject json = new com.alibaba.fastjson.JSONObject();
+            json.put("deviceType", deviceType);
+            iotDataTransferService.synchronizeDeviceData();
+        }
+        System.out.println("设备数据同步完成,涉及设备类型数:" + deviceTypeList.size());
+    }
+
+}

+ 0 - 7
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/vo/base/BaseEnvMonitorPushVO.java

@@ -37,13 +37,6 @@ public abstract class BaseEnvMonitorPushVO implements Serializable {
      */
     private Integer sensorID;
 
-    /**
-     * 设备UUID
-     * 类型:String,长度16
-     * 说明:设备唯一标识
-     */
-    private String deviceUuid;
-
     /**
      * 监测时间(必填)
      * 类型:Time(yyyy-MM-dd HH:mm:ss.SSS)

+ 6 - 1
service-job/pom.xml

@@ -69,7 +69,12 @@
             <version>0.0.1</version>
             <scope>compile</scope>
         </dependency>
-
+        <dependency>
+            <groupId>com.usky</groupId>
+            <artifactId>service-cdi-api</artifactId>
+            <version>0.0.1</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 9 - 0
service-job/src/main/java/com/ruoyi/job/task/RyTask.java

@@ -1,5 +1,6 @@
 package com.ruoyi.job.task;
 
+import com.usky.cdi.RemotecdiTaskService;
 import com.usky.common.core.utils.StringUtils;
 import com.usky.demo.RemoteMeetingService;
 import com.usky.fire.RemoteFireService;
@@ -27,6 +28,9 @@ public class RyTask {
     @Autowired
     private RemoteMeetingService remoteMeetingService;
 
+    @Autowired
+    private RemotecdiTaskService remoteCdiTaskService;
+
     public void ryMultipleParams(String s, Boolean b, Long l, Double d, Integer i) {
         System.out.println(StringUtils.format("执行多参方法: 字符串类型{},布尔类型{},长整型{},浮点型{},整形{}", s, b, l, d, i));
     }
@@ -69,4 +73,9 @@ public class RyTask {
         remoteMeetingService.meetingInfoStatus();
     }
 
+    private void synchronizeDeviceData() {
+        System.out.println("synchronizeDeviceData start......");
+        remoteCdiTaskService.synchronizeDeviceData();
+    }
+
 }