Selaa lähdekoodia

Merge branch 'fyc-cdi' of uskycloud/usky-modules into master

fuyuchuan 5 päivää sitten
vanhempi
commit
711c67a640

+ 70 - 7
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -160,6 +160,9 @@ public class IotDataTransferService {
                     continue;
                 }
 
+                // 为每条数据的监测时间添加毫秒级微差
+                dataEndTime = addTimeOffset(dataEndTime);
+
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
                 Integer value = deviceDataItem.getInteger("leach_status");
                 if (value == null) {
@@ -177,7 +180,8 @@ public class IotDataTransferService {
                 vo.setDataEndTime(dataEndTime);
 
                 try {
-                    sendMqttMessage(MqttTopics.IotInfo.WATER_LEAK.getTopic(), vo, MqttTopics.IotInfo.WATER_LEAK.getDesc(), transferVO.getUsername());
+                    log.info("【水浸数据】开始推送,设备:{},数据:{}", deviceId, JSON.toJSONString(vo));
+                    // sendMqttMessage(MqttTopics.IotInfo.WATER_LEAK.getTopic(), vo, MqttTopics.IotInfo.WATER_LEAK.getDesc(), transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
                     log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
@@ -248,6 +252,9 @@ public class IotDataTransferService {
                     continue;
                 }
 
+                // 为每条数据的监测时间添加毫秒级微差
+                dataEndTime = addTimeOffset(dataEndTime);
+
                 // 检查业务字段是否为空,为空则计入失败(修复:原逻辑静默跳过并误计为成功)
                 boolean fieldMissing = false;
                 switch (deviceType) {
@@ -427,17 +434,16 @@ public class IotDataTransferService {
                 headcountVO.setSensorValue(peopleCount);
                 // headcountVO.setSensorID(buildUnit.getId());
 
-                // 为每条数据生成秒级微差的时间
-                int secondsOffset = ThreadLocalRandom.current().nextInt(0, 60);
-                LocalDateTime dataEndTimeWithOffset = now.minusMinutes(ThreadLocalRandom.current().nextInt(1, 3)).plusSeconds(secondsOffset);
-                LocalDateTime publishTimeWithOffset = now.plusSeconds(ThreadLocalRandom.current().nextInt(0, 30));
+                // 为每条数据的监测时间添加毫秒级微差
+                LocalDateTime dataEndTime = addTimeOffset(now);
 
-                headcountVO.setDataEndTime(dataEndTimeWithOffset);
-                headcountVO.setPublishTime(publishTimeWithOffset);
+                headcountVO.setDataEndTime(dataEndTime);
+                headcountVO.setPublishTime(getCurrentTime());
                 headcountVO.setUnitName(buildUnit.getUnitName());
                 headcountVO.setFloor(buildUnit.getFloor());
 
                 try {
+                    log.info("【人员统计】开始推送,单元:{},数据:{}", buildUnit.getUnitName(), JSON.toJSONString(headcountVO));
                     sendMqttMessage(
                             MqttTopics.IotInfo.PERSON.getTopic(),
                             headcountVO,
@@ -516,6 +522,9 @@ public class IotDataTransferService {
                     continue;
                 }
 
+                // 处理数据时间
+                dataEndTime = addTimeOffset(dataEndTime);
+
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
 
                 PersonPresenceVO vo = new PersonPresenceVO();
@@ -527,6 +536,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(0); // 固定值(根据业务需求)
 
                 try {
+                    log.info("【人员闯入】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
                     sendMqttMessage(
                             MqttTopics.IotInfo.PERSON_PRESENCE.getTopic(),
                             vo,
@@ -615,6 +625,7 @@ public class IotDataTransferService {
                     result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
+                dataEndTime = addTimeOffset(dataEndTime);
 
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
 
@@ -642,6 +653,7 @@ public class IotDataTransferService {
                 vo.setTotalPower(totalPower);
 
                 try {
+                    log.info("【人防用电负荷情况】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
                     sendMqttMessage(
                             MqttTopics.IotInfo.ELECTRICITY_LOAD.getTopic(),
                             vo,
@@ -707,6 +719,8 @@ public class IotDataTransferService {
         tempVO.setSensorValue(value);
         tempVO.setDataEndTime(dataEndTime);
         System.out.println("监测时间:" + dataEndTime);
+
+        log.info("【温度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(tempVO));
         sendMqttMessage(MqttTopics.IotInfo.TEMP.getTopic(), tempVO, MqttTopics.IotInfo.TEMP.getDesc(), username);
     }
 
@@ -732,6 +746,8 @@ public class IotDataTransferService {
         humidityVO.setPublishTime(getCurrentTime());
         humidityVO.setSensorValue(value);
         humidityVO.setDataEndTime(dataEndTime);
+
+        log.info("【湿度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(humidityVO));
         sendMqttMessage(MqttTopics.IotInfo.HUMIDITY.getTopic(), humidityVO, MqttTopics.IotInfo.HUMIDITY.getDesc(), username);
     }
 
@@ -757,6 +773,8 @@ public class IotDataTransferService {
         oxygenVO.setPublishTime(getCurrentTime());
         oxygenVO.setSensorValue(value);
         oxygenVO.setDataEndTime(dataEndTime);
+
+        log.info("【氧气浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(oxygenVO));
         sendMqttMessage(MqttTopics.IotInfo.OXYGEN.getTopic(), oxygenVO, MqttTopics.IotInfo.OXYGEN.getDesc(), username);
     }
 
@@ -782,6 +800,8 @@ public class IotDataTransferService {
         coVO.setPublishTime(getCurrentTime());
         coVO.setSensorValue(value);
         coVO.setDataEndTime(dataEndTime);
+
+        log.info("【一氧化碳浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(coVO));
         sendMqttMessage(MqttTopics.IotInfo.CO.getTopic(), coVO, MqttTopics.IotInfo.CO.getDesc(), username);
     }
 
@@ -807,6 +827,8 @@ public class IotDataTransferService {
         co2VO.setPublishTime(getCurrentTime());
         co2VO.setSensorValue(value);
         co2VO.setDataEndTime(dataEndTime);
+
+        log.info("【二氧化碳浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(co2VO));
         sendMqttMessage(MqttTopics.IotInfo.CO2.getTopic(), co2VO, MqttTopics.IotInfo.CO2.getDesc(), username);
     }
 
@@ -994,6 +1016,7 @@ public class IotDataTransferService {
                     result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
+                dataEndTime = addTimeOffset(dataEndTime);
 
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
                 Integer value = deviceDataItem.getInteger("wy");
@@ -1012,6 +1035,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(value == 0 ? 0 : 1);
 
                 try {
+                    log.info("【位移数据】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
                     sendMqttMessage(MqttTopics.IotInfo.DEVIATION.getTopic(), vo, MqttTopics.IotInfo.DEVIATION.getDesc(), transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
@@ -1079,6 +1103,7 @@ public class IotDataTransferService {
                     result.put("failureCount", result.get("failureCount") + 1);
                     continue;
                 }
+                dataEndTime = addTimeOffset(dataEndTime);
 
                 Integer deviceId = deviceDataItem.getIntValue("device_id");
                 Double value = deviceDataItem.getDouble("sensorValue");
@@ -1097,6 +1122,7 @@ public class IotDataTransferService {
                 vo.setSensorValue(value);
 
                 try {
+                    log.info("【水位数据】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
                     sendMqttMessage(MqttTopics.IotInfo.SEWAGE_LEVEL.getTopic(), vo, MqttTopics.IotInfo.SEWAGE_LEVEL.getDesc(), transferVO.getUsername());
                     result.put("successCount", result.get("successCount") + 1);
                 } catch (Exception e) {
@@ -1429,4 +1455,41 @@ public class IotDataTransferService {
         Integer tenantId = 0;
         synchronizeDeviceData(tenantId, engineeringId, username, password);
     }
+
+    /**
+     * 为监测时间添加随机偏移(用于模拟真实采集场景)
+     * @param originalTime 原始监测时间
+     * @return 偏移后的时间
+     */
+    private LocalDateTime addTimeOffset(LocalDateTime originalTime) {
+        if (originalTime == null) {
+            return null;
+        }
+
+        // 获取当前时间作为上限
+        LocalDateTime now = LocalDateTime.now();
+
+        // 如果原始时间已经是未来时间,先修正为当前时间
+        if (originalTime.isAfter(now)) {
+            log.warn("检测到未来时间的监测数据,已修正为当前时间:{}", originalTime);
+            originalTime = now;
+        }
+
+        // 随机秒偏移:0、1、2 秒(向前偏移,使用减法)
+        int secondsOffset = ThreadLocalRandom.current().nextInt(0, 3);
+        // 随机毫秒偏移:0 ~ 999 毫秒(向前偏移,使用减法)
+        int millisOffset = ThreadLocalRandom.current().nextInt(0, 1000);
+
+        LocalDateTime offsetTime = originalTime.minusSeconds(secondsOffset)
+                .minusNanos(millisOffset * 1_000_000L);
+
+        // 最终校验:确保不会晚于当前时间
+        if (offsetTime.isAfter(now)) {
+            log.warn("偏移后时间仍然晚于当前时间,已修正:原始={}, 偏移后={}, 当前={}",
+                    originalTime, offsetTime, now);
+            return now;
+        }
+
+        return offsetTime;
+    }
 }