|
|
@@ -3,30 +3,31 @@ package com.usky.cdi.service.impl;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.usky.cdi.domain.CdiDefenseProject;
|
|
|
+import com.usky.cdi.domain.CdiDeliveryLog;
|
|
|
import com.usky.cdi.domain.DmpDevice;
|
|
|
import com.usky.cdi.domain.DmpProduct;
|
|
|
+import com.usky.cdi.mapper.CdiDefenseProjectMapper;
|
|
|
+import com.usky.cdi.mapper.CdiDeliveryLogMapper;
|
|
|
import com.usky.cdi.mapper.DmpDeviceMapper;
|
|
|
import com.usky.cdi.mapper.DmpProductMapper;
|
|
|
+import com.usky.cdi.service.CdiDeliveryLogService;
|
|
|
import com.usky.cdi.service.config.mqtt.MqttOutConfig;
|
|
|
-import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
|
|
|
+import com.usky.cdi.service.enums.MqttTopics;
|
|
|
import com.usky.cdi.service.mqtt.MqttConnectionTool;
|
|
|
import com.usky.cdi.service.util.DeviceDataQuery;
|
|
|
import com.usky.cdi.service.util.SnowflakeIdGenerator;
|
|
|
import com.usky.cdi.service.vo.IotDataTransferVO;
|
|
|
-import com.usky.cdi.service.vo.base.*;
|
|
|
+import com.usky.cdi.service.vo.SyncTaskStatisticsVO;
|
|
|
+import com.usky.cdi.service.vo.info.*;
|
|
|
+import com.usky.common.core.exception.BusinessException;
|
|
|
+import com.usky.common.security.utils.SecurityUtils;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
-import org.springframework.beans.factory.support.DefaultListableBeanFactory;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
-import org.springframework.context.ConfigurableApplicationContext;
|
|
|
-import org.springframework.context.support.GenericApplicationContext;
|
|
|
-import org.springframework.integration.dsl.IntegrationFlow;
|
|
|
-import org.springframework.integration.dsl.IntegrationFlows;
|
|
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
|
|
-import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
-import org.springframework.messaging.MessageChannel;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
@@ -34,6 +35,7 @@ import javax.annotation.PostConstruct;
|
|
|
import java.time.Instant;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
@@ -87,11 +89,19 @@ public class IotDataTransferService {
|
|
|
@Value("${snowflake.data-center-id:2}")
|
|
|
private long dataCenterId;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private CdiDefenseProjectMapper cdiDefenseProjectMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private CdiDeliveryLogMapper cdiDeliveryLogMapper;
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
|
|
|
}
|
|
|
|
|
|
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
/**
|
|
|
* 获取当前时间
|
|
|
*/
|
|
|
@@ -121,6 +131,9 @@ public class IotDataTransferService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
try {
|
|
|
List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
log.warn("获取到的数据:{}", deviceData);
|
|
|
@@ -133,10 +146,16 @@ public class IotDataTransferService {
|
|
|
if (deviceData.isEmpty()) {
|
|
|
log.warn("没有获取到水浸数据!设备类型:{}", deviceType);
|
|
|
result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // ✅ 在空数据时也记录一条日志(可选)
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, 0, totalDevices, 0, 0);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ // ✅ 先处理所有设备,不记录日志
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
if (dataEndTime == null) {
|
|
|
@@ -161,7 +180,7 @@ public class IotDataTransferService {
|
|
|
vo.setDataEndTime(dataEndTime);
|
|
|
|
|
|
try {
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息", transferVO.getUsername());
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.WATER_LEAK.getTopic(), vo, MqttTopics.IotInfo.WATER_LEAK.getDesc(), transferVO.getUsername());
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
|
|
|
@@ -169,13 +188,25 @@ public class IotDataTransferService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // ✅ 所有设备处理完毕后,统一记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int pending = totalDevices - success - failure;
|
|
|
+
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, success, failure, pending, success > 0 ? 1 : 0);
|
|
|
+
|
|
|
log.info("水浸状态数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
- deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+ deviceType, success, failure);
|
|
|
|
|
|
return result;
|
|
|
} catch (Exception e) {
|
|
|
log.error("水浸状态数据推送发生异常", e);
|
|
|
result.put("failureCount", transferVO.getDevices().size());
|
|
|
+
|
|
|
+ // ✅ 异常时也记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, transferVO.getDevices().size(), 0, transferVO.getDevices().size(), 0, 0);
|
|
|
return result;
|
|
|
}
|
|
|
}
|
|
|
@@ -195,44 +226,31 @@ public class IotDataTransferService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ long endTime;
|
|
|
+
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
try {
|
|
|
- Integer deviceType = transferVO.getDeviceType();
|
|
|
List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
- Integer totalDevices = transferVO.getDevices().size();
|
|
|
-
|
|
|
- // 统计各类型设备数据数量
|
|
|
- int tempCount = 0, humidityCount = 0, coCount = 0, o2Count = 0, co2Count = 0;
|
|
|
- for (JSONObject dataItem : deviceData) {
|
|
|
- if (dataItem.containsKey("wd") && dataItem.getFloat("wd") != null) tempCount++;
|
|
|
- if (dataItem.containsKey("sd") && dataItem.getFloat("sd") != null) humidityCount++;
|
|
|
- if (dataItem.containsKey("co") && dataItem.getFloat("co") != null) coCount++;
|
|
|
- if (dataItem.containsKey("o2") && dataItem.getFloat("o2") != null) o2Count++;
|
|
|
- if (dataItem.containsKey("co2") && dataItem.getFloat("co2") != null) co2Count++;
|
|
|
- }
|
|
|
|
|
|
- log.info("开始推送温湿度及气体浓度数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ log.info("开始推送环境数据,设备类型:{},总设备数:{},获取到数据条数:{}",
|
|
|
deviceType, totalDevices, deviceData.size());
|
|
|
- log.info("各类型设备数据数量:温度{}条,湿度{}条,一氧化碳{}条,氧气{}条,二氧化碳{}条",
|
|
|
- tempCount, humidityCount, coCount, o2Count, co2Count);
|
|
|
-
|
|
|
- if (deviceData.isEmpty()) {
|
|
|
- log.warn("没有获取到空气质量数据!设备类型:{}", deviceType);
|
|
|
- result.put("failureCount", totalDevices);
|
|
|
- return result;
|
|
|
- }
|
|
|
|
|
|
Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+
|
|
|
if (dataEndTime == null) {
|
|
|
result.put("failureCount", result.get("failureCount") + 1);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
boolean deviceSuccess = true;
|
|
|
-
|
|
|
- // 根据设备类型发送对应的数据
|
|
|
try {
|
|
|
switch (deviceType) {
|
|
|
case 707:
|
|
|
@@ -241,22 +259,12 @@ public class IotDataTransferService {
|
|
|
case 708:
|
|
|
sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
break;
|
|
|
- case 709:
|
|
|
- sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
- break;
|
|
|
- case 710:
|
|
|
- sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
- break;
|
|
|
- case 711:
|
|
|
- sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
- break;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.warn("设备{}的环境数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ log.warn("设备{}推送失败:{}", deviceId, e.getMessage());
|
|
|
deviceSuccess = false;
|
|
|
}
|
|
|
|
|
|
- // 统计设备推送结果
|
|
|
if (deviceSuccess) {
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} else {
|
|
|
@@ -264,17 +272,80 @@ public class IotDataTransferService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- log.info("温湿度及气体浓度数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
- deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int notSynced = totalDevices - success - failure;
|
|
|
+
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // ✅ 确保这里会被执行,且deviceType被正确记录
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, success, failure, notSynced, 1);
|
|
|
+
|
|
|
+ log.info("空气质量推送完成,设备类型:{},成功:{},失败:{},未同步:{}",
|
|
|
+ deviceType, success, failure, notSynced);
|
|
|
|
|
|
return result;
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
- log.error("温湿度及气体浓度数据推送发生异常", e);
|
|
|
- result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ log.error("空气质量推送异常", e);
|
|
|
+ result.put("failureCount", totalDevices - result.get("successCount"));
|
|
|
+
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+
|
|
|
+ // ✅ 异常情况下也要记录日志
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, success, failure,
|
|
|
+ totalDevices - success - failure, 0);
|
|
|
+
|
|
|
return result;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Async("asyncServiceExecutor")
|
|
|
+ public void saveLog(IotDataTransferVO transferVO, LocalDateTime now, long startTime, long endTime,
|
|
|
+ int total, int success, int failure, int notSynced, int pushFlag) {
|
|
|
+
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+
|
|
|
+ SyncTaskStatisticsVO vo = new SyncTaskStatisticsVO();
|
|
|
+ vo.setDataType(4);
|
|
|
+ vo.setTotal(total);
|
|
|
+ vo.setSuccessNumber(success);
|
|
|
+ vo.setFailNumber(failure);
|
|
|
+ vo.setNotSynced(notSynced);
|
|
|
+ vo.setCreateTime(now.format(DATE_TIME_FORMATTER));
|
|
|
+ vo.setCostTime((float) (endTime - startTime) / 1000.0f);
|
|
|
+ vo.setState(1);
|
|
|
+ vo.setTopic(MqttTopics.getIotInfoByTypeCode(deviceType).getTopic());
|
|
|
+ vo.setDataTypeName(MqttTopics.getIotInfoByTypeCode(deviceType).getDesc());
|
|
|
+
|
|
|
+ CdiDeliveryLog log = new CdiDeliveryLog();
|
|
|
+ log.setEngineeringId(transferVO.getEngineeringId());
|
|
|
+ log.setDataType(4);
|
|
|
+ log.setTopic(vo.getTopic());
|
|
|
+ log.setDataTypeName(vo.getDataTypeName());
|
|
|
+ log.setUserName(SecurityUtils.getUsername() == null ? "自动同步" : SecurityUtils.getUsername());
|
|
|
+ log.setNickName(SecurityUtils.getUsername() == null ? "自动同步" : SecurityUtils.getLoginUser().getSysUser().getNickName());
|
|
|
+ log.setCreateTime(now);
|
|
|
+ log.setTenantId(getTenantId(transferVO.getEngineeringId()));
|
|
|
+ log.setPushFlag(pushFlag);
|
|
|
+ log.setInfoContent(JSON.toJSONString(vo));
|
|
|
+
|
|
|
+ cdiDeliveryLogMapper.insert(log);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Integer getTenantId(Long engineeringId) {
|
|
|
+ LambdaQueryWrapper<CdiDefenseProject> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.eq(CdiDefenseProject::getEngineeringId, engineeringId)
|
|
|
+ .eq(CdiDefenseProject::getIsEnable, 1);
|
|
|
+ CdiDefenseProject cdiDefenseProject = cdiDefenseProjectMapper.selectOne(queryWrapper);
|
|
|
+ if (cdiDefenseProject == null) {
|
|
|
+ throw new BusinessException("未找到工程信息!无法同步");
|
|
|
+ }
|
|
|
+ return cdiDefenseProject.getTenantId();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 发送人员闯入情况(703)
|
|
|
*
|
|
|
@@ -289,6 +360,9 @@ public class IotDataTransferService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
try {
|
|
|
List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
Integer deviceType = transferVO.getDeviceType();
|
|
|
@@ -297,13 +371,21 @@ public class IotDataTransferService {
|
|
|
log.info("开始推送人员闯入情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
deviceType, totalDevices, deviceData.size());
|
|
|
|
|
|
+ // 处理无数据情况
|
|
|
if (deviceData.isEmpty()) {
|
|
|
log.warn("没有获取到人员闯入情况数据!设备类型:{}", deviceType);
|
|
|
result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ // 遍历所有设备数据,仅执行推送,不记录日志
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
if (dataEndTime == null) {
|
|
|
@@ -319,11 +401,15 @@ public class IotDataTransferService {
|
|
|
vo.setDataEndTime(dataEndTime);
|
|
|
vo.setPublishTime(getCurrentTime());
|
|
|
vo.setEngineeringID(engineeringId);
|
|
|
- // 传感器值固定为0(可能是默认值或占位符)
|
|
|
- vo.setSensorValue(0);
|
|
|
+ vo.setSensorValue(0); // 固定值(根据业务需求)
|
|
|
|
|
|
try {
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况", transferVO.getUsername());
|
|
|
+ sendMqttMessage(
|
|
|
+ MqttTopics.IotInfo.PERSON_PRESENCE.getTopic(),
|
|
|
+ vo,
|
|
|
+ MqttTopics.IotInfo.PERSON_PRESENCE.getDesc(),
|
|
|
+ transferVO.getUsername()
|
|
|
+ );
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
|
|
|
@@ -331,13 +417,31 @@ public class IotDataTransferService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // 所有设备处理完毕,统一记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int pending = totalDevices - success - failure;
|
|
|
+ int status = (success > 0) ? 1 : 0; // 1=部分或全部成功,0=全失败
|
|
|
+
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ success, failure, pending, status);
|
|
|
+
|
|
|
log.info("人员闯入情况数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
- deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+ deviceType, success, failure);
|
|
|
|
|
|
return result;
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
log.error("人员闯入情况数据推送发生异常", e);
|
|
|
- result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ int totalDevices = transferVO.getDevices() != null ? transferVO.getDevices().size() : 0;
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 异常时也记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0);
|
|
|
+
|
|
|
return result;
|
|
|
}
|
|
|
}
|
|
|
@@ -356,21 +460,32 @@ public class IotDataTransferService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
try {
|
|
|
List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
Integer deviceType = transferVO.getDeviceType();
|
|
|
- Integer totalDevices = transferVO.getDevices().size();
|
|
|
+ Integer totalDevices = (transferVO.getDevices() != null) ? transferVO.getDevices().size() : 0;
|
|
|
|
|
|
log.info("开始推送人防用电负荷情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
deviceType, totalDevices, deviceData.size());
|
|
|
|
|
|
+ // 处理无数据情况
|
|
|
if (deviceData.isEmpty()) {
|
|
|
log.warn("没有获取到人防用电负荷情况数据!设备类型:{}", deviceType);
|
|
|
result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ // 遍历设备数据,仅执行推送,不记录日志
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
if (dataEndTime == null) {
|
|
|
@@ -379,6 +494,7 @@ public class IotDataTransferService {
|
|
|
}
|
|
|
|
|
|
Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+
|
|
|
ElectricityLoadVO vo = new ElectricityLoadVO();
|
|
|
vo.setDataPacketID(generateDataPacketID());
|
|
|
vo.setSensorID(deviceId);
|
|
|
@@ -397,12 +513,18 @@ public class IotDataTransferService {
|
|
|
vo.setLeakageCurrent(deviceDataItem.getFloat("leakageCurrent"));
|
|
|
|
|
|
// 根据模拟模式选择不同的功率字段
|
|
|
- vo.setTotalPower(simulation ?
|
|
|
- deviceDataItem.getFloat("totalPower") :
|
|
|
- deviceDataItem.getFloat("active_power"));
|
|
|
+ Float totalPower = simulation
|
|
|
+ ? deviceDataItem.getFloat("totalPower")
|
|
|
+ : deviceDataItem.getFloat("active_power");
|
|
|
+ vo.setTotalPower(totalPower);
|
|
|
|
|
|
try {
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况", transferVO.getUsername());
|
|
|
+ sendMqttMessage(
|
|
|
+ MqttTopics.IotInfo.ELECTRICITY_LOAD.getTopic(),
|
|
|
+ vo,
|
|
|
+ MqttTopics.IotInfo.ELECTRICITY_LOAD.getDesc(),
|
|
|
+ transferVO.getUsername()
|
|
|
+ );
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
|
|
|
@@ -410,13 +532,31 @@ public class IotDataTransferService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // 所有设备处理完毕,统一记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int pending = totalDevices - success - failure;
|
|
|
+ int status = (success > 0) ? 1 : 0; // 1=有成功,0=全失败
|
|
|
+
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ success, failure, pending, status);
|
|
|
+
|
|
|
log.info("人防用电负荷情况数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
- deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+ deviceType, success, failure);
|
|
|
|
|
|
return result;
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
log.error("人防用电负荷情况数据推送发生异常", e);
|
|
|
- result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ int totalDevices = (transferVO.getDevices() != null) ? transferVO.getDevices().size() : 0;
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 异常时也记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0);
|
|
|
+
|
|
|
return result;
|
|
|
}
|
|
|
}
|
|
|
@@ -443,7 +583,7 @@ public class IotDataTransferService {
|
|
|
tempVO.setPublishTime(getCurrentTime());
|
|
|
tempVO.setSensorValue(value);
|
|
|
tempVO.setDataEndTime(dataEndTime);
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息", username);
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.TEMP.getTopic(), tempVO, MqttTopics.IotInfo.TEMP.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -468,7 +608,7 @@ public class IotDataTransferService {
|
|
|
humidityVO.setPublishTime(getCurrentTime());
|
|
|
humidityVO.setSensorValue(value);
|
|
|
humidityVO.setDataEndTime(dataEndTime);
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息", username);
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.HUMIDITY.getTopic(), humidityVO, MqttTopics.IotInfo.HUMIDITY.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -493,7 +633,7 @@ public class IotDataTransferService {
|
|
|
oxygenVO.setPublishTime(getCurrentTime());
|
|
|
oxygenVO.setSensorValue(value);
|
|
|
oxygenVO.setDataEndTime(dataEndTime);
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息", username);
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.OXYGEN.getTopic(), oxygenVO, MqttTopics.IotInfo.OXYGEN.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -518,7 +658,7 @@ public class IotDataTransferService {
|
|
|
coVO.setPublishTime(getCurrentTime());
|
|
|
coVO.setSensorValue(value);
|
|
|
coVO.setDataEndTime(dataEndTime);
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息", username);
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.CO.getTopic(), coVO, MqttTopics.IotInfo.CO.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -543,7 +683,7 @@ public class IotDataTransferService {
|
|
|
co2VO.setPublishTime(getCurrentTime());
|
|
|
co2VO.setSensorValue(value);
|
|
|
co2VO.setDataEndTime(dataEndTime);
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息", username);
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.CO2.getTopic(), co2VO, MqttTopics.IotInfo.CO2.getDesc(), username);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -704,10 +844,15 @@ public class IotDataTransferService {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ long endTime;
|
|
|
+
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
try {
|
|
|
- List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
- Integer deviceType = transferVO.getDeviceType();
|
|
|
- Integer totalDevices = transferVO.getDevices().size();
|
|
|
|
|
|
log.info("开始推送位移数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
deviceType, totalDevices, deviceData.size());
|
|
|
@@ -743,7 +888,7 @@ public class IotDataTransferService {
|
|
|
vo.setSensorValue(value == 0 ? 0 : 1);
|
|
|
|
|
|
try {
|
|
|
- sendMqttMessage(EnvMonitorMqttTopic.DEVIATION, vo, "位移信息", transferVO.getUsername());
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.DEVIATION.getTopic(), vo, MqttTopics.IotInfo.DEVIATION.getDesc(), transferVO.getUsername());
|
|
|
result.put("successCount", result.get("successCount") + 1);
|
|
|
} catch (Exception e) {
|
|
|
log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
|
|
|
@@ -754,10 +899,18 @@ public class IotDataTransferService {
|
|
|
log.info("位移数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ result.get("successCount"), result.get("failureCount"),
|
|
|
+ totalDevices - result.get("successCount") - result.get("failureCount"), 1);
|
|
|
return result;
|
|
|
} catch (Exception e) {
|
|
|
log.error("位移数据推送发生异常", e);
|
|
|
result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ result.get("successCount"), result.get("failureCount"),
|
|
|
+ totalDevices - result.get("successCount") - result.get("failureCount"), 0);
|
|
|
return result;
|
|
|
}
|
|
|
}
|
|
|
@@ -878,6 +1031,7 @@ public class IotDataTransferService {
|
|
|
log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
|
|
|
log.info("总共推送设备数量:{}个,成功:{}个,失败:{}个",
|
|
|
totalDevices, totalSuccessCount, totalFailureCount);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -931,7 +1085,7 @@ public class IotDataTransferService {
|
|
|
// 存储到映射中
|
|
|
mqttGatewayMap.put(username, gateway);
|
|
|
log.info("MQTT连接创建/刷新成功,用户名:{}", username);
|
|
|
- } catch (Exception e) {
|
|
|
+ } catch (Exception e) {
|
|
|
log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
|
|
|
throw new RuntimeException("初始化MQTT连接失败", e);
|
|
|
}
|
|
|
@@ -962,7 +1116,7 @@ public class IotDataTransferService {
|
|
|
* @return 解析后的时间,如果解析失败返回null
|
|
|
*/
|
|
|
private LocalDateTime parseDataTime(JSONObject deviceDataItem) {
|
|
|
- log.warn("解析的json{}", deviceDataItem.toString());
|
|
|
+ // log.warn("解析的json{}", deviceDataItem.toString());
|
|
|
Long dataTime = deviceDataItem.getLong("realtime");
|
|
|
if (dataTime == null) {
|
|
|
log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
|
|
|
@@ -973,14 +1127,13 @@ public class IotDataTransferService {
|
|
|
|
|
|
/**
|
|
|
* 发送MQTT消息
|
|
|
- * @param topicEnum 主题枚举
|
|
|
+ * @param topic MQTT topic
|
|
|
* @param vo 消息对象
|
|
|
* @param messageType 消息类型描述
|
|
|
* @param username 用户名
|
|
|
*/
|
|
|
- private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType, String username) {
|
|
|
+ void sendMqttMessage(String topic, Object vo, String messageType, String username) {
|
|
|
String json = JSON.toJSONString(vo);
|
|
|
- String topic = topicEnum.getTopic();
|
|
|
// 不再记录每条数据的详情,只记录发送操作
|
|
|
MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
|
|
|
if (gateway != null) {
|