|
|
@@ -160,6 +160,9 @@ public class IotDataTransferService {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
+ // 为每条数据的监测时间添加毫秒级微差
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
Integer value = deviceDataItem.getInteger("leach_status");
|
|
|
if (value == null) {
|
|
|
@@ -177,7 +180,8 @@ public class IotDataTransferService {
|
|
|
vo.setDataEndTime(dataEndTime);
|
|
|
|
|
|
try {
|
|
|
- sendMqttMessage(MqttTopics.IotInfo.WATER_LEAK.getTopic(), vo, MqttTopics.IotInfo.WATER_LEAK.getDesc(), transferVO.getUsername());
|
|
|
+ log.info("【水浸数据】开始推送,设备:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
+ // sendMqttMessage(MqttTopics.IotInfo.WATER_LEAK.getTopic(), vo, MqttTopics.IotInfo.WATER_LEAK.getDesc(), transferVO.getUsername());
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
|
|
|
@@ -248,6 +252,9 @@ public class IotDataTransferService {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
+ // 为每条数据的监测时间添加毫秒级微差
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
// 检查业务字段是否为空,为空则计入失败(修复:原逻辑静默跳过并误计为成功)
|
|
|
boolean fieldMissing = false;
|
|
|
switch (deviceType) {
|
|
|
@@ -427,17 +434,16 @@ public class IotDataTransferService {
|
|
|
headcountVO.setSensorValue(peopleCount);
|
|
|
// headcountVO.setSensorID(buildUnit.getId());
|
|
|
|
|
|
- // 为每条数据生成秒级微差的时间
|
|
|
- int secondsOffset = ThreadLocalRandom.current().nextInt(0, 60);
|
|
|
- LocalDateTime dataEndTimeWithOffset = now.minusMinutes(ThreadLocalRandom.current().nextInt(1, 3)).plusSeconds(secondsOffset);
|
|
|
- LocalDateTime publishTimeWithOffset = now.plusSeconds(ThreadLocalRandom.current().nextInt(0, 30));
|
|
|
+ // 为每条数据的监测时间添加毫秒级微差
|
|
|
+ LocalDateTime dataEndTime = addTimeOffset(now);
|
|
|
|
|
|
- headcountVO.setDataEndTime(dataEndTimeWithOffset);
|
|
|
- headcountVO.setPublishTime(publishTimeWithOffset);
|
|
|
+ headcountVO.setDataEndTime(dataEndTime);
|
|
|
+ headcountVO.setPublishTime(getCurrentTime());
|
|
|
headcountVO.setUnitName(buildUnit.getUnitName());
|
|
|
headcountVO.setFloor(buildUnit.getFloor());
|
|
|
|
|
|
try {
|
|
|
+ log.info("【人员统计】开始推送,单元:{},数据:{}", buildUnit.getUnitName(), JSON.toJSONString(headcountVO));
|
|
|
sendMqttMessage(
|
|
|
MqttTopics.IotInfo.PERSON.getTopic(),
|
|
|
headcountVO,
|
|
|
@@ -516,6 +522,9 @@ public class IotDataTransferService {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
+ // 处理数据时间
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
|
|
|
PersonPresenceVO vo = new PersonPresenceVO();
|
|
|
@@ -527,6 +536,7 @@ public class IotDataTransferService {
|
|
|
vo.setSensorValue(0); // 固定值(根据业务需求)
|
|
|
|
|
|
try {
|
|
|
+ log.info("【人员闯入】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
sendMqttMessage(
|
|
|
MqttTopics.IotInfo.PERSON_PRESENCE.getTopic(),
|
|
|
vo,
|
|
|
@@ -615,6 +625,7 @@ public class IotDataTransferService {
|
|
|
result.put("failureCount", result.get("failureCount") + 1);
|
|
|
continue;
|
|
|
}
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
|
|
|
Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
|
|
|
@@ -642,6 +653,7 @@ public class IotDataTransferService {
|
|
|
vo.setTotalPower(totalPower);
|
|
|
|
|
|
try {
|
|
|
+ log.info("【人防用电负荷情况】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
sendMqttMessage(
|
|
|
MqttTopics.IotInfo.ELECTRICITY_LOAD.getTopic(),
|
|
|
vo,
|
|
|
@@ -707,6 +719,8 @@ public class IotDataTransferService {
|
|
|
tempVO.setSensorValue(value);
|
|
|
tempVO.setDataEndTime(dataEndTime);
|
|
|
System.out.println("监测时间:" + dataEndTime);
|
|
|
+
|
|
|
+ log.info("【温度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(tempVO));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.TEMP.getTopic(), tempVO, MqttTopics.IotInfo.TEMP.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
@@ -732,6 +746,8 @@ public class IotDataTransferService {
|
|
|
humidityVO.setPublishTime(getCurrentTime());
|
|
|
humidityVO.setSensorValue(value);
|
|
|
humidityVO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【湿度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(humidityVO));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.HUMIDITY.getTopic(), humidityVO, MqttTopics.IotInfo.HUMIDITY.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
@@ -757,6 +773,8 @@ public class IotDataTransferService {
|
|
|
oxygenVO.setPublishTime(getCurrentTime());
|
|
|
oxygenVO.setSensorValue(value);
|
|
|
oxygenVO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【氧气浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(oxygenVO));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.OXYGEN.getTopic(), oxygenVO, MqttTopics.IotInfo.OXYGEN.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
@@ -782,6 +800,8 @@ public class IotDataTransferService {
|
|
|
coVO.setPublishTime(getCurrentTime());
|
|
|
coVO.setSensorValue(value);
|
|
|
coVO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【一氧化碳浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(coVO));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.CO.getTopic(), coVO, MqttTopics.IotInfo.CO.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
@@ -807,6 +827,8 @@ public class IotDataTransferService {
|
|
|
co2VO.setPublishTime(getCurrentTime());
|
|
|
co2VO.setSensorValue(value);
|
|
|
co2VO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【二氧化碳浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(co2VO));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.CO2.getTopic(), co2VO, MqttTopics.IotInfo.CO2.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
@@ -994,6 +1016,7 @@ public class IotDataTransferService {
|
|
|
result.put("failureCount", result.get("failureCount") + 1);
|
|
|
continue;
|
|
|
}
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
|
|
|
Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
Integer value = deviceDataItem.getInteger("wy");
|
|
|
@@ -1012,6 +1035,7 @@ public class IotDataTransferService {
|
|
|
vo.setSensorValue(value == 0 ? 0 : 1);
|
|
|
|
|
|
try {
|
|
|
+ log.info("【位移数据】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.DEVIATION.getTopic(), vo, MqttTopics.IotInfo.DEVIATION.getDesc(), transferVO.getUsername());
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
@@ -1079,6 +1103,7 @@ public class IotDataTransferService {
|
|
|
result.put("failureCount", result.get("failureCount") + 1);
|
|
|
continue;
|
|
|
}
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
|
|
|
Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
Double value = deviceDataItem.getDouble("sensorValue");
|
|
|
@@ -1097,6 +1122,7 @@ public class IotDataTransferService {
|
|
|
vo.setSensorValue(value);
|
|
|
|
|
|
try {
|
|
|
+ log.info("【水位数据】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
sendMqttMessage(MqttTopics.IotInfo.SEWAGE_LEVEL.getTopic(), vo, MqttTopics.IotInfo.SEWAGE_LEVEL.getDesc(), transferVO.getUsername());
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
@@ -1429,4 +1455,41 @@ public class IotDataTransferService {
|
|
|
Integer tenantId = 0;
|
|
|
synchronizeDeviceData(tenantId, engineeringId, username, password);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 为监测时间添加随机偏移(用于模拟真实采集场景)
|
|
|
+ * @param originalTime 原始监测时间
|
|
|
+ * @return 偏移后的时间
|
|
|
+ */
|
|
|
+ private LocalDateTime addTimeOffset(LocalDateTime originalTime) {
|
|
|
+ if (originalTime == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取当前时间作为上限
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+
|
|
|
+ // 如果原始时间已经是未来时间,先修正为当前时间
|
|
|
+ if (originalTime.isAfter(now)) {
|
|
|
+ log.warn("检测到未来时间的监测数据,已修正为当前时间:{}", originalTime);
|
|
|
+ originalTime = now;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 随机秒偏移:0、1、2 秒(向前偏移,使用减法)
|
|
|
+ int secondsOffset = ThreadLocalRandom.current().nextInt(0, 3);
|
|
|
+ // 随机毫秒偏移:0 ~ 999 毫秒(向前偏移,使用减法)
|
|
|
+ int millisOffset = ThreadLocalRandom.current().nextInt(0, 1000);
|
|
|
+
|
|
|
+ LocalDateTime offsetTime = originalTime.minusSeconds(secondsOffset)
|
|
|
+ .minusNanos(millisOffset * 1_000_000L);
|
|
|
+
|
|
|
+ // 最终校验:确保不会晚于当前时间
|
|
|
+ if (offsetTime.isAfter(now)) {
|
|
|
+ log.warn("偏移后时间仍然晚于当前时间,已修正:原始={}, 偏移后={}, 当前={}",
|
|
|
+ originalTime, offsetTime, now);
|
|
|
+ return now;
|
|
|
+ }
|
|
|
+
|
|
|
+ return offsetTime;
|
|
|
+ }
|
|
|
}
|