|
|
@@ -0,0 +1,1499 @@
|
|
|
+package com.usky.cdi.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.alibaba.nacos.shaded.com.google.gson.Gson;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
|
|
|
+import com.usky.cdi.domain.*;
|
|
|
+import com.usky.cdi.mapper.*;
|
|
|
+import com.usky.cdi.service.enums.MqttTopics;
|
|
|
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
|
|
|
+import com.usky.cdi.service.util.AirDefenseSimulator;
|
|
|
+import com.usky.cdi.service.util.DeviceDataQuery;
|
|
|
+import com.usky.cdi.service.util.SnowflakeIdGenerator;
|
|
|
+import com.usky.cdi.service.vo.IotDataTransferVO;
|
|
|
+import com.usky.cdi.service.vo.SyncTaskStatisticsVO;
|
|
|
+import com.usky.cdi.service.vo.info.*;
|
|
|
+import com.usky.common.core.exception.BusinessException;
|
|
|
+import com.usky.common.security.utils.SecurityUtils;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.ZoneId;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author fyc
|
|
|
+ * @email yuchuan.fu@chinausky.com
|
|
|
+ * @date 2025/11/20
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class IotDataTransferService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MqttConnectionTool mqttConnectionTool;
|
|
|
+
|
|
|
+ // 注入ApplicationContext,确保总是能获取到
|
|
|
+ @Autowired
|
|
|
+ private ApplicationContext context;
|
|
|
+
|
|
|
+ // MQTT连接相关配置
|
|
|
+ private static final String MQTT_URL = "ssl://114.80.201.143:8883";
|
|
|
+ private static final String MQTT_TOPIC = "iotInfo/+";
|
|
|
+ private static final int KEEP_ALIVE_INTERVAL = 60;
|
|
|
+ private static final int COMPLETION_TIMEOUT = 5000;
|
|
|
+
|
|
|
+ // 存储每个任务的MQTT客户端工厂和网关
|
|
|
+ private final Map<String, MqttConnectionTool.MqttGateway> mqttGatewayMap = new ConcurrentHashMap<>();
|
|
|
+ private final Map<String, DefaultMqttPahoClientFactory> mqttClientFactoryMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private SnowflakeIdGenerator idGenerator;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DeviceDataQuery deviceDataQuery;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DmpDeviceMapper dmpDeviceMapper;
|
|
|
+
|
|
|
+ @Value("${device.data.simulation}")
|
|
|
+ private boolean simulation;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DmpProductMapper dmpProductMapper;
|
|
|
+
|
|
|
+ // 从配置文件读取Snowflake参数,默认值为2
|
|
|
+ @Value("${snowflake.worker-id:2}")
|
|
|
+ private long workerId;
|
|
|
+
|
|
|
+ @Value("${snowflake.data-center-id:2}")
|
|
|
+ private long dataCenterId;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private CdiDefenseProjectMapper cdiDefenseProjectMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private CdiDeliveryLogMapper cdiDeliveryLogMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private BaseBuildUnitMapper baseBuildUnitMapper;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取当前时间
|
|
|
+ */
|
|
|
+ private LocalDateTime getCurrentTime() {
|
|
|
+ return LocalDateTime.now();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成数据包ID
|
|
|
+ */
|
|
|
+ private Long generateDataPacketID() {
|
|
|
+ return idGenerator.nextPacketId();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送水浸状态(deviceType:702)
|
|
|
+ * Topic: iotInfo/flooded
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ */
|
|
|
+ public Map<String, Integer> sendWaterLeak(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ log.warn("获取到的数据:{}", deviceData);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ log.info("开始推送水浸状态数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到水浸数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // ✅ 在空数据时也记录一条日志(可选)
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, 0, totalDevices, 0, 0, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ // ✅ 先处理所有设备,不记录日志
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 为每条数据的监测时间添加毫秒级微差
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ Integer value = deviceDataItem.getInteger("leach_status");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的水浸状态数据为空", deviceId);
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ WaterLeakVO vo = new WaterLeakVO();
|
|
|
+ vo.setDataPacketID(generateDataPacketID());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ vo.setPublishTime(getCurrentTime());
|
|
|
+ vo.setSensorValue(value);
|
|
|
+ vo.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("【水浸数据】开始推送,设备:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.WATER_LEAK.getTopic(), vo, MqttTopics.IotInfo.WATER_LEAK.getDesc(), transferVO.getUsername());
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ✅ 所有设备处理完毕后,统一记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int pending = totalDevices - success - failure;
|
|
|
+
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, success, failure, pending, success > 0 ? 1 : 0, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ log.info("水浸状态数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, success, failure);
|
|
|
+
|
|
|
+ return result;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("水浸状态数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+
|
|
|
+ // ✅ 异常时也记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, transferVO.getDevices().size(), 0, transferVO.getDevices().size(), 0, 0, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送温湿度及气体浓度数据(设备类型:701,707-711)
|
|
|
+ * 包含: wd(温度), sd(湿度), o2(氧气), co(一氧化碳), co2(二氧化碳)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ */
|
|
|
+ public Map<String, Integer> sendEnvData(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ long endTime;
|
|
|
+
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+
|
|
|
+ log.info("开始推送环境数据,设备类型:{},总设备数:{},获取到数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ int skippedNullField = 0;
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 为每条数据的监测时间添加毫秒级微差
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
+ // 检查业务字段是否为空,为空则计入失败(修复:原逻辑静默跳过并误计为成功)
|
|
|
+ boolean fieldMissing = false;
|
|
|
+ switch (deviceType) {
|
|
|
+ case 707:
|
|
|
+ if (deviceDataItem.getFloat("wd") == null) fieldMissing = true;
|
|
|
+ break;
|
|
|
+ case 708:
|
|
|
+ if (deviceDataItem.getFloat("sd") == null) fieldMissing = true;
|
|
|
+ break;
|
|
|
+ case 709:
|
|
|
+ if (deviceDataItem.getFloat("o2") == null) fieldMissing = true;
|
|
|
+ break;
|
|
|
+ case 710:
|
|
|
+ if (deviceDataItem.getFloat("co2") == null) fieldMissing = true;
|
|
|
+ break;
|
|
|
+ case 711:
|
|
|
+ if (deviceDataItem.getFloat("co") == null) fieldMissing = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (fieldMissing) {
|
|
|
+ log.warn("设备{}[类型{}]的业务字段数据为空,跳过推送,计入失败", deviceId, deviceType);
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ skippedNullField++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean deviceSuccess = true;
|
|
|
+ try {
|
|
|
+ switch (deviceType) {
|
|
|
+ case 707:
|
|
|
+ sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
+ break;
|
|
|
+ case 708:
|
|
|
+ sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
+ break;
|
|
|
+ case 709:
|
|
|
+ sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
+ break;
|
|
|
+ case 710:
|
|
|
+ sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
+ break;
|
|
|
+ case 711:
|
|
|
+ sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId, transferVO.getUsername());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}[类型{}]推送异常:{}", deviceId, deviceType, e.getMessage(), e);
|
|
|
+ deviceSuccess = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (deviceSuccess) {
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } else {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (skippedNullField > 0) {
|
|
|
+ log.warn("[类型{}] 因业务字段为空而跳过的设备数量:{}", deviceType, skippedNullField);
|
|
|
+ }
|
|
|
+
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int notSynced = totalDevices - success - failure;
|
|
|
+
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // ✅ 确保这里会被执行,且deviceType被正确记录
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, success, failure, notSynced, 1, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ log.info("空气质量推送完成,设备类型:{},成功:{},失败:{},未同步:{}",
|
|
|
+ deviceType, success, failure, notSynced);
|
|
|
+
|
|
|
+ return result;
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("空气质量推送异常", e);
|
|
|
+ result.put("failureCount", totalDevices - result.get("successCount"));
|
|
|
+
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+
|
|
|
+ // ✅ 异常情况下也要记录日志
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices, success, failure,
|
|
|
+ totalDevices - success - failure, 0, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Async("asyncServiceExecutor")
|
|
|
+ public void saveLog(IotDataTransferVO transferVO, LocalDateTime now, long startTime, long endTime,
|
|
|
+ int total, int success, int failure, int notSynced, int pushFlag, String userName) {
|
|
|
+
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+
|
|
|
+ if (StringUtils.isBlank(userName)) {
|
|
|
+ userName = "自动同步";
|
|
|
+ }
|
|
|
+
|
|
|
+ SyncTaskStatisticsVO vo = new SyncTaskStatisticsVO();
|
|
|
+ vo.setDataType(4);
|
|
|
+ vo.setTotal(total);
|
|
|
+ vo.setSuccessNumber(success);
|
|
|
+ vo.setFailNumber(failure);
|
|
|
+ vo.setNotSynced(notSynced);
|
|
|
+ vo.setCreateTime(now.format(DATE_TIME_FORMATTER));
|
|
|
+ vo.setCostTime((float) (endTime - startTime) / 1000.0f);
|
|
|
+ vo.setState(1);
|
|
|
+ vo.setTopic(MqttTopics.getIotInfoByTypeCode(deviceType).getTopic());
|
|
|
+ vo.setDataTypeName(MqttTopics.getIotInfoByTypeCode(deviceType).getDesc());
|
|
|
+
|
|
|
+ CdiDeliveryLog log = new CdiDeliveryLog();
|
|
|
+ log.setEngineeringId(transferVO.getEngineeringId());
|
|
|
+ log.setDataType(4);
|
|
|
+ log.setTopic(vo.getTopic());
|
|
|
+ log.setDataTypeName(vo.getDataTypeName());
|
|
|
+ log.setUserName(userName);
|
|
|
+ log.setNickName(userName);
|
|
|
+ log.setCreateTime(now);
|
|
|
+ log.setTenantId(getTenantId(transferVO.getEngineeringId()));
|
|
|
+ log.setPushFlag(pushFlag);
|
|
|
+ log.setInfoContent(JSON.toJSONString(vo));
|
|
|
+
|
|
|
+ cdiDeliveryLogMapper.insert(log);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Integer getTenantId(Long engineeringId) {
|
|
|
+ LambdaQueryWrapper<CdiDefenseProject> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.eq(CdiDefenseProject::getEngineeringId, engineeringId)
|
|
|
+ .eq(CdiDefenseProject::getIsEnable, 1);
|
|
|
+ CdiDefenseProject cdiDefenseProject = cdiDefenseProjectMapper.selectOne(queryWrapper);
|
|
|
+ if (cdiDefenseProject == null) {
|
|
|
+ throw new BusinessException("未找到工程信息!无法同步");
|
|
|
+ }
|
|
|
+ return cdiDefenseProject.getTenantId();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送人员统计情况(703)-掩蔽人数
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ public Map<String, Integer> sendPersonCount(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 模拟人员统计
|
|
|
+ log.info("开始执行【人员统计】数据推送,工程ID:{}", transferVO.getEngineeringId());
|
|
|
+ try {
|
|
|
+ LambdaQueryWrapper<BaseBuildUnit> buildUnitQuery = new LambdaQueryWrapper<>();
|
|
|
+ buildUnitQuery.eq(BaseBuildUnit::getTenantId, transferVO.getTenantId());
|
|
|
+ List<BaseBuildUnit> buildUnitList = baseBuildUnitMapper.selectList(buildUnitQuery);
|
|
|
+
|
|
|
+ // 修复3:单元为空,打印日志+标记失败,不静默返回
|
|
|
+ if (buildUnitList.isEmpty()) {
|
|
|
+ log.warn("【人员统计】未查询到建筑单元,租户ID:{},推送终止", transferVO.getTenantId());
|
|
|
+ result.put("failureCount", 1);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("【人员统计】获取到建筑单元数量:{}", buildUnitList.size());
|
|
|
+ for (BaseBuildUnit buildUnit : buildUnitList) {
|
|
|
+ int peopleCount = AirDefenseSimulator.calculatePeopleCount(now);
|
|
|
+ HeadcountVO headcountVO = new HeadcountVO();
|
|
|
+ headcountVO.setDataPacketID(generateDataPacketID());
|
|
|
+ headcountVO.setEngineeringID(transferVO.getEngineeringId());
|
|
|
+ headcountVO.setSensorValue(peopleCount);
|
|
|
+ // headcountVO.setSensorID(buildUnit.getId());
|
|
|
+
|
|
|
+ // 为每条数据的监测时间添加毫秒级微差
|
|
|
+ LocalDateTime dataEndTime = addTimeOffset(now);
|
|
|
+
|
|
|
+ headcountVO.setDataEndTime(dataEndTime);
|
|
|
+ headcountVO.setPublishTime(getCurrentTime());
|
|
|
+ headcountVO.setUnitName(buildUnit.getUnitName());
|
|
|
+ headcountVO.setFloor(buildUnit.getFloor());
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("【人员统计】开始推送,单元:{},数据:{}", buildUnit.getUnitName(), JSON.toJSONString(headcountVO));
|
|
|
+ sendMqttMessage(
|
|
|
+ MqttTopics.IotInfo.PERSON.getTopic(),
|
|
|
+ headcountVO,
|
|
|
+ MqttTopics.IotInfo.PERSON.getDesc(),
|
|
|
+ transferVO.getUsername()
|
|
|
+ );
|
|
|
+ // 添加2S延时
|
|
|
+ Thread.sleep(2000);
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ log.info("【人员统计】推送成功,单元:{},人数:{}", buildUnit.getUnitName(), peopleCount);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("【人员统计】数据推送失败,单元:{},异常:{}", buildUnit.getUnitName(), e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 修复4:统计推送完成,打印汇总日志
|
|
|
+ log.info("【人员统计】推送完成,成功:{},失败:{}", result.get("successCount"), result.get("failureCount"));
|
|
|
+ // 可选:和闯入一样保存汇总日志(推荐加上)
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, buildUnitList.size(),
|
|
|
+ result.get("successCount"), result.get("failureCount"), 0,
|
|
|
+ result.get("successCount") > 0 ? 1 : 0, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 修复5:全局异常兜底,绝不丢失
|
|
|
+ log.error("【人员统计】推送发生全局异常", e);
|
|
|
+ result.put("failureCount", result.getOrDefault("failureCount", 0) + 1);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送人员闯入情况(703)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ public Map<String, Integer> sendPersonPresence(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ try {
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ log.info("开始推送人员闯入情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ // 处理无数据情况
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到人员闯入情况数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ // 遍历所有设备数据,仅执行推送,不记录日志
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 处理数据时间
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+
|
|
|
+ PersonPresenceVO vo = new PersonPresenceVO();
|
|
|
+ vo.setDataPacketID(generateDataPacketID());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setDataEndTime(dataEndTime);
|
|
|
+ vo.setPublishTime(getCurrentTime());
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ vo.setSensorValue(0); // 固定值(根据业务需求)
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("【人员闯入】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
+ sendMqttMessage(
|
|
|
+ MqttTopics.IotInfo.PERSON_PRESENCE.getTopic(),
|
|
|
+ vo,
|
|
|
+ MqttTopics.IotInfo.PERSON_PRESENCE.getDesc(),
|
|
|
+ transferVO.getUsername()
|
|
|
+ );
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 所有设备处理完毕,统一记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int pending = totalDevices - success - failure;
|
|
|
+ int status = (success > 0) ? 1 : 0; // 1=部分或全部成功,0=全失败
|
|
|
+
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ success, failure, pending, status, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ log.info("人员闯入情况数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, success, failure);
|
|
|
+
|
|
|
+ return result;
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("人员闯入情况数据推送发生异常", e);
|
|
|
+ int totalDevices = transferVO.getDevices() != null ? transferVO.getDevices().size() : 0;
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 异常时也记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送人防用电负荷情况(704)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ public Map<String, Integer> sendElectricityLoad(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = (transferVO.getDevices() != null) ? transferVO.getDevices().size() : 0;
|
|
|
+
|
|
|
+ log.info("开始推送人防用电负荷情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ // 处理无数据情况
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到人防用电负荷情况数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+
|
|
|
+ // 遍历设备数据,仅执行推送,不记录日志
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+
|
|
|
+ ElectricityLoadVO vo = new ElectricityLoadVO();
|
|
|
+ vo.setDataPacketID(generateDataPacketID());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ vo.setPublishTime(getCurrentTime());
|
|
|
+ vo.setDataEndTime(dataEndTime);
|
|
|
+ vo.setAVoltage(deviceDataItem.getFloat("aVoltage"));
|
|
|
+ vo.setBVoltage(deviceDataItem.getFloat("bVoltage"));
|
|
|
+ vo.setCVoltage(deviceDataItem.getFloat("cVoltage"));
|
|
|
+ vo.setAElectricity(deviceDataItem.getFloat("aElectricity"));
|
|
|
+ vo.setBElectricity(deviceDataItem.getFloat("bElectricity"));
|
|
|
+ vo.setCElectricity(deviceDataItem.getFloat("cElectricity"));
|
|
|
+ vo.setLine1TEMP(deviceDataItem.getFloat("line1TEMP"));
|
|
|
+ vo.setLine2TEMP(deviceDataItem.getFloat("Line2TEMP"));
|
|
|
+ vo.setLine3TEMP(deviceDataItem.getFloat("Line3TEMP"));
|
|
|
+ vo.setLeakageCurrent(deviceDataItem.getFloat("leakageCurrent"));
|
|
|
+
|
|
|
+ // 根据模拟模式选择不同的功率字段
|
|
|
+ Float totalPower = simulation
|
|
|
+ ? deviceDataItem.getFloat("totalPower")
|
|
|
+ : deviceDataItem.getFloat("active_power");
|
|
|
+ vo.setTotalPower(totalPower);
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("【人防用电负荷情况】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
+ sendMqttMessage(
|
|
|
+ MqttTopics.IotInfo.ELECTRICITY_LOAD.getTopic(),
|
|
|
+ vo,
|
|
|
+ MqttTopics.IotInfo.ELECTRICITY_LOAD.getDesc(),
|
|
|
+ transferVO.getUsername()
|
|
|
+ );
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 所有设备处理完毕,统一记录一条汇总日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ int success = result.get("successCount");
|
|
|
+ int failure = result.get("failureCount");
|
|
|
+ int pending = totalDevices - success - failure;
|
|
|
+ int status = (success > 0) ? 1 : 0; // 1=有成功,0=全失败
|
|
|
+
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ success, failure, pending, status, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ log.info("人防用电负荷情况数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, success, failure);
|
|
|
+
|
|
|
+ return result;
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("人防用电负荷情况数据推送发生异常", e);
|
|
|
+ int totalDevices = (transferVO.getDevices() != null) ? transferVO.getDevices().size() : 0;
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+
|
|
|
+ // 异常时也记录一条日志
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ 0, totalDevices, 0, 0, SecurityUtils.getUsername());
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送温度信息(707)
|
|
|
+ *
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @param deviceId 设备ID
|
|
|
+ * @param dataEndTime 数据结束时间
|
|
|
+ * @param engineeringID 工程ID
|
|
|
+ * @param username 用户名
|
|
|
+ **/
|
|
|
+ private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
|
|
|
+ Float value = deviceDataItem.getFloat("wd");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的温度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TempVO tempVO = new TempVO();
|
|
|
+ tempVO.setDataPacketID(generateDataPacketID());
|
|
|
+ tempVO.setSensorID(deviceId);
|
|
|
+ tempVO.setEngineeringID(engineeringID);
|
|
|
+ tempVO.setPublishTime(getCurrentTime());
|
|
|
+ tempVO.setSensorValue(value);
|
|
|
+ tempVO.setDataEndTime(dataEndTime);
|
|
|
+ System.out.println("监测时间:" + dataEndTime);
|
|
|
+
|
|
|
+ log.info("【温度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(tempVO));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.TEMP.getTopic(), tempVO, MqttTopics.IotInfo.TEMP.getDesc(), username);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送湿度信息(708)
|
|
|
+ *
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @param deviceId 设备ID
|
|
|
+ * @param dataEndTime 数据结束时间
|
|
|
+ * @param engineeringID 工程ID
|
|
|
+ * @param username 用户名
|
|
|
+ **/
|
|
|
+ private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
|
|
|
+ Float value = deviceDataItem.getFloat("sd");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的湿度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ HumidityVO humidityVO = new HumidityVO();
|
|
|
+ humidityVO.setDataPacketID(generateDataPacketID());
|
|
|
+ humidityVO.setSensorID(deviceId);
|
|
|
+ humidityVO.setEngineeringID(engineeringID);
|
|
|
+ humidityVO.setPublishTime(getCurrentTime());
|
|
|
+ humidityVO.setSensorValue(value);
|
|
|
+ humidityVO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【湿度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(humidityVO));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.HUMIDITY.getTopic(), humidityVO, MqttTopics.IotInfo.HUMIDITY.getDesc(), username);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送氧气浓度信息(709)
|
|
|
+ *
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @param deviceId 设备ID
|
|
|
+ * @param dataEndTime 数据结束时间
|
|
|
+ * @param engineeringID 工程ID
|
|
|
+ * @param username 用户名
|
|
|
+ **/
|
|
|
+ private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
|
|
|
+ Float value = deviceDataItem.getFloat("o2");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的氧气浓度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ OxygenVO oxygenVO = new OxygenVO();
|
|
|
+ oxygenVO.setDataPacketID(generateDataPacketID());
|
|
|
+ oxygenVO.setSensorID(deviceId);
|
|
|
+ oxygenVO.setEngineeringID(engineeringID);
|
|
|
+ oxygenVO.setPublishTime(getCurrentTime());
|
|
|
+ oxygenVO.setSensorValue(value);
|
|
|
+ oxygenVO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【氧气浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(oxygenVO));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.OXYGEN.getTopic(), oxygenVO, MqttTopics.IotInfo.OXYGEN.getDesc(), username);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送一氧化碳浓度信息(711)
|
|
|
+ *
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @param deviceId 设备ID
|
|
|
+ * @param dataEndTime 数据结束时间
|
|
|
+ * @param engineeringID 工程ID
|
|
|
+ * @param username 用户名
|
|
|
+ **/
|
|
|
+ private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
|
|
|
+ Float value = deviceDataItem.getFloat("co");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的一氧化碳浓度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ CoVO coVO = new CoVO();
|
|
|
+ coVO.setDataPacketID(generateDataPacketID());
|
|
|
+ coVO.setSensorID(deviceId);
|
|
|
+ coVO.setEngineeringID(engineeringID);
|
|
|
+ coVO.setPublishTime(getCurrentTime());
|
|
|
+ coVO.setSensorValue(value);
|
|
|
+ coVO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【一氧化碳浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(coVO));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.CO.getTopic(), coVO, MqttTopics.IotInfo.CO.getDesc(), username);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送二氧化碳浓度信息(710)
|
|
|
+ *
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @param deviceId 设备ID
|
|
|
+ * @param dataEndTime 数据结束时间
|
|
|
+ * @param engineeringID 工程ID
|
|
|
+ * @param username 用户名
|
|
|
+ **/
|
|
|
+ private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID, String username) {
|
|
|
+ Float value = deviceDataItem.getFloat("co2");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的二氧化碳浓度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Co2VO co2VO = new Co2VO();
|
|
|
+ co2VO.setDataPacketID(generateDataPacketID());
|
|
|
+ co2VO.setSensorID(deviceId);
|
|
|
+ co2VO.setEngineeringID(engineeringID);
|
|
|
+ co2VO.setPublishTime(getCurrentTime());
|
|
|
+ // 将value除以10000并保留三位小数
|
|
|
+ Float processedValue = new java.math.BigDecimal(value / 10000f)
|
|
|
+ .setScale(3, java.math.RoundingMode.HALF_UP)
|
|
|
+ .floatValue();
|
|
|
+ co2VO.setSensorValue(processedValue);
|
|
|
+ co2VO.setDataEndTime(dataEndTime);
|
|
|
+
|
|
|
+ log.info("【二氧化碳浓度】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(co2VO));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.CO2.getTopic(), co2VO, MqttTopics.IotInfo.CO2.getDesc(), username);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送倾斜数据(712)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ // public Map<String, Integer> sendTiltData(IotDataTransferVO transferVO) {
|
|
|
+ // Map<String, Integer> result = new HashMap<>();
|
|
|
+ // result.put("successCount", 0);
|
|
|
+ // result.put("failureCount", 0);
|
|
|
+ //
|
|
|
+ // if (!validateMqttGateway()) {
|
|
|
+ // return result;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // try {
|
|
|
+ // List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ // Integer deviceType = transferVO.getDeviceType();
|
|
|
+ // Integer totalDevices = transferVO.getDevices().size();
|
|
|
+ //
|
|
|
+ // log.info("开始推送倾斜数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ // deviceType, totalDevices, deviceData.size());
|
|
|
+ //
|
|
|
+ // if (deviceData.isEmpty()) {
|
|
|
+ // log.warn("没有获取到倾斜数据!设备类型:{}", deviceType);
|
|
|
+ // result.put("failureCount", totalDevices);
|
|
|
+ // return result;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // Long engineeringId = transferVO.getEngineeringId();
|
|
|
+ // for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ // LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ // if (dataEndTime == null) {
|
|
|
+ // result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ // Integer value = deviceDataItem.getInteger("qx");
|
|
|
+ // if (value == null) {
|
|
|
+ // log.warn("设备{}的倾斜数据为空", deviceId);
|
|
|
+ // result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // TiltVO vo = new TiltVO();
|
|
|
+ // vo.setDataPacketID(generateDataPacketID());
|
|
|
+ // vo.setSensorID(deviceId);
|
|
|
+ // vo.setEngineeringID(engineeringId);
|
|
|
+ // vo.setPublishTime(getCurrentTime());
|
|
|
+ // vo.setDataEndTime(dataEndTime);
|
|
|
+ // vo.setSensorValue(value == 0 ? 0 : 1);
|
|
|
+ //
|
|
|
+ // try {
|
|
|
+ // sendMqttMessage(EnvMonitorMqttTopic.TILT, vo, "倾斜信息");
|
|
|
+ // result.put("successCount", result.get("successCount") + 1);
|
|
|
+ // } catch (Exception e) {
|
|
|
+ // log.warn("设备{}的倾斜数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ // result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // log.info("倾斜数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ // deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+ //
|
|
|
+ // return result;
|
|
|
+ // } catch (Exception e) {
|
|
|
+ // log.error("倾斜数据推送发生异常", e);
|
|
|
+ // result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ // return result;
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送裂缝数据(713)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ // public Map<String, Integer> sendCrackData(IotDataTransferVO transferVO) {
|
|
|
+ // Map<String, Integer> result = new HashMap<>();
|
|
|
+ // result.put("successCount", 0);
|
|
|
+ // result.put("failureCount", 0);
|
|
|
+ //
|
|
|
+ // if (!validateMqttGateway()) {
|
|
|
+ // return result;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // try {
|
|
|
+ // List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ // Integer deviceType = transferVO.getDeviceType();
|
|
|
+ // Integer totalDevices = transferVO.getDevices().size();
|
|
|
+ //
|
|
|
+ // log.info("开始推送裂缝数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ // deviceType, totalDevices, deviceData.size());
|
|
|
+ //
|
|
|
+ // if (deviceData.isEmpty()) {
|
|
|
+ // log.warn("没有获取到裂缝数据!设备类型:{}", deviceType);
|
|
|
+ // result.put("failureCount", totalDevices);
|
|
|
+ // return result;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // Long engineeringId = transferVO.getEngineeringId();
|
|
|
+ // for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ // LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ // if (dataEndTime == null) {
|
|
|
+ // result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ // Integer value = deviceDataItem.getInteger("cd");
|
|
|
+ // if (value == null) {
|
|
|
+ // log.warn("设备{}的裂缝数据为空", deviceId);
|
|
|
+ // result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // CrackVO vo = new CrackVO();
|
|
|
+ // vo.setDataPacketID(generateDataPacketID());
|
|
|
+ // vo.setSensorID(deviceId);
|
|
|
+ // vo.setEngineeringID(engineeringId);
|
|
|
+ // vo.setPublishTime(getCurrentTime());
|
|
|
+ // vo.setDataEndTime(dataEndTime);
|
|
|
+ // vo.setSensorValue(value == 0 ? 0 : 1);
|
|
|
+ //
|
|
|
+ // try {
|
|
|
+ // sendMqttMessage(EnvMonitorMqttTopic.CRACK, vo, "裂缝信息");
|
|
|
+ // result.put("successCount", result.get("successCount") + 1);
|
|
|
+ // } catch (Exception e) {
|
|
|
+ // log.warn("设备{}的裂缝数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ // result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // log.info("裂缝数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ // deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+ //
|
|
|
+ // return result;
|
|
|
+ // } catch (Exception e) {
|
|
|
+ // log.error("裂缝数据推送发生异常", e);
|
|
|
+ // result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ // return result;
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送位移数据(714)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ public Map<String, Integer> sendDeviationData(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ long endTime;
|
|
|
+
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ log.info("开始推送位移数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到位移数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ Integer value = deviceDataItem.getInteger("wy");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的位移数据为空", deviceId);
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ DeviationVO vo = new DeviationVO();
|
|
|
+ vo.setDataPacketID(generateDataPacketID());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ vo.setPublishTime(getCurrentTime());
|
|
|
+ vo.setDataEndTime(dataEndTime);
|
|
|
+ vo.setSensorValue(value == 0 ? 0 : 1);
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("【位移数据】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.DEVIATION.getTopic(), vo, MqttTopics.IotInfo.DEVIATION.getDesc(), transferVO.getUsername());
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的位移数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("位移数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ result.get("successCount"), result.get("failureCount"),
|
|
|
+ totalDevices - result.get("successCount") - result.get("failureCount"), 1, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("位移数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ result.get("successCount"), result.get("failureCount"),
|
|
|
+ totalDevices - result.get("successCount") - result.get("failureCount"), 0, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送液位数据(716)
|
|
|
+ *
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
+ **/
|
|
|
+ public Map<String, Integer> sendWaterLevel(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway(transferVO.getUsername())) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ long endTime;
|
|
|
+
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ log.info("开始推送集水井水位数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到位移数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ dataEndTime = addTimeOffset(dataEndTime);
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ Double value = deviceDataItem.getDouble("sensorValue");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的水位数据为空", deviceId);
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ WaterLevelVO vo = new WaterLevelVO();
|
|
|
+ vo.setDataPacketID(generateDataPacketID());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ vo.setPublishTime(getCurrentTime());
|
|
|
+ vo.setDataEndTime(dataEndTime);
|
|
|
+ vo.setSensorValue(value);
|
|
|
+
|
|
|
+ try {
|
|
|
+ log.info("【水位数据】开始推送,设备ID:{},数据:{}", deviceId, JSON.toJSONString(vo));
|
|
|
+ sendMqttMessage(MqttTopics.IotInfo.SEWAGE_LEVEL.getTopic(), vo, MqttTopics.IotInfo.SEWAGE_LEVEL.getDesc(), transferVO.getUsername());
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的水位数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("水位数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ result.get("successCount"), result.get("failureCount"),
|
|
|
+ totalDevices - result.get("successCount") - result.get("failureCount"), 1, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("水位数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ endTime = System.currentTimeMillis();
|
|
|
+ saveLog(transferVO, now, startTime, endTime, totalDevices,
|
|
|
+ result.get("successCount"), result.get("failureCount"),
|
|
|
+ totalDevices - result.get("successCount") - result.get("failureCount"), 0, SecurityUtils.getUsername());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步设备数据
|
|
|
+ * @param tenantId 租户ID
|
|
|
+ * @param engineeringId 工程ID
|
|
|
+ * @param username MQTT用户名
|
|
|
+ * @param password MQTT密码
|
|
|
+ */
|
|
|
+ public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
|
|
|
+ log.info("用户名:{},密码:{}", username, password);
|
|
|
+ // 参数校验
|
|
|
+ if (engineeringId == null || username == null || password == null) {
|
|
|
+ log.error("工程ID、MQTT用户名或密码不能为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询租户下的所有产品类型
|
|
|
+ List<String> deviceTypeList = getDeviceTypeListByTenant(tenantId);
|
|
|
+ if (deviceTypeList.isEmpty()) {
|
|
|
+ log.warn("租户{}不存在任何产品", tenantId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询设备列表
|
|
|
+ List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList);
|
|
|
+ if (deviceList.isEmpty()) {
|
|
|
+ log.warn("租户{}不存在任何设备", tenantId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 按设备类型分组
|
|
|
+ Map<Integer, List<DmpDevice>> deviceTypeMap = deviceList.stream()
|
|
|
+ .collect(Collectors.groupingBy(DmpDevice::getDeviceType));
|
|
|
+
|
|
|
+ // 构建数据传输对象列表
|
|
|
+ List<IotDataTransferVO> transferList = new ArrayList<>();
|
|
|
+ deviceTypeMap.forEach((deviceType, devices) -> {
|
|
|
+ IotDataTransferVO transferVO = new IotDataTransferVO();
|
|
|
+ transferVO.setDeviceType(deviceType);
|
|
|
+ transferVO.setDevices(devices);
|
|
|
+ transferVO.setEngineeringId(engineeringId);
|
|
|
+ transferVO.setUsername(username); // 保存当前任务的用户名
|
|
|
+ transferVO.setTenantId(tenantId);
|
|
|
+ transferList.add(transferVO);
|
|
|
+ });
|
|
|
+
|
|
|
+ // 按产品代码分组,构建ProductCode到uuid列表的映射,过滤掉产品代码为null的设备
|
|
|
+ Map<String, List<String>> codeDeviceUuidsMap = deviceList.stream()
|
|
|
+ .filter(device -> device.getProductCode() != null)
|
|
|
+ .collect(Collectors.groupingBy(DmpDevice::getProductCode,
|
|
|
+ Collectors.mapping(DmpDevice::getDeviceUuid, Collectors.toList())));
|
|
|
+
|
|
|
+ // 创建MQTT连接
|
|
|
+ createMqttConnection(username, password);
|
|
|
+
|
|
|
+ // 任务开始日志
|
|
|
+ Integer totalDevices = deviceList.size();
|
|
|
+ Integer totalProductTypes = codeDeviceUuidsMap.size();
|
|
|
+ log.info("设备数据同步任务开始,租户ID:{},工程ID:{}", tenantId, engineeringId);
|
|
|
+ log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
|
|
|
+ log.info("总共需要推送设备数量:{}个,涉及设备类型数:{}个,设备类型为:{}",
|
|
|
+ totalDevices, deviceTypeMap.size(), deviceTypeMap.keySet());
|
|
|
+
|
|
|
+ // 记录每种设备类型的设备数量
|
|
|
+ deviceTypeMap.forEach((deviceType, devices) -> {
|
|
|
+ log.info("设备类型:{},设备数量:{}", deviceType, devices.size());
|
|
|
+ });
|
|
|
+
|
|
|
+ // 按设备类型处理数据同步
|
|
|
+ int totalSuccessCount = 0;
|
|
|
+ int totalFailureCount = 0;
|
|
|
+
|
|
|
+ for (IotDataTransferVO transferVO : transferList) {
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+
|
|
|
+ switch (deviceType) {
|
|
|
+ case 707:
|
|
|
+ case 708:
|
|
|
+ case 709:
|
|
|
+ case 710:
|
|
|
+ case 711:
|
|
|
+ result = sendEnvData(transferVO);
|
|
|
+ break;
|
|
|
+ case 702:
|
|
|
+ result = sendWaterLeak(transferVO);
|
|
|
+ break;
|
|
|
+ case 703:
|
|
|
+ if (transferVO.getEngineeringId() == 3101100024L) {
|
|
|
+ result = sendPersonCount(transferVO);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ result = sendPersonPresence(transferVO);
|
|
|
+ break;
|
|
|
+ case 704:
|
|
|
+ if (tenantId == 1205) {
|
|
|
+ // 设置默认值,避免空指针
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ result = sendElectricityLoad(transferVO);
|
|
|
+ break;
|
|
|
+ // case 712:
|
|
|
+ // result = sendTiltData(transferVO);
|
|
|
+ // break;
|
|
|
+ // case 713:
|
|
|
+ // result = sendCrackData(transferVO);
|
|
|
+ // break;
|
|
|
+ case 714:
|
|
|
+ result = sendDeviationData(transferVO);
|
|
|
+ break;
|
|
|
+ case 716:
|
|
|
+ result = sendWaterLevel(transferVO);
|
|
|
+ break;
|
|
|
+ case 719:
|
|
|
+ result = sendPersonCount(transferVO);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.debug("不支持的设备类型:{}", deviceType);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 累加成功数和失败数
|
|
|
+ int typeSuccess = result.getOrDefault("successCount", 0);
|
|
|
+ int typeFailure = result.getOrDefault("failureCount", 0);
|
|
|
+ totalSuccessCount += typeSuccess;
|
|
|
+ totalFailureCount += typeFailure;
|
|
|
+
|
|
|
+ // 分类型诊断日志:精确定位每种类型的成功/失败/未同步
|
|
|
+ int typeTotal = transferVO.getDevices() != null ? transferVO.getDevices().size() : 0;
|
|
|
+ int typeNotSynced = typeTotal - typeSuccess - typeFailure;
|
|
|
+ log.info("[类型分诊] deviceType={} | 总设备={} | 成功={} | 失败={} | 未同步={}",
|
|
|
+ deviceType, typeTotal, typeSuccess, typeFailure, typeNotSynced);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 任务完成总结
|
|
|
+ log.info("设备数据同步任务完成,租户ID:{},工程ID:{}", tenantId, engineeringId);
|
|
|
+ log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
|
|
|
+ log.info("总共推送设备数量:{}个,成功:{}个,失败:{}个",
|
|
|
+ totalDevices, totalSuccessCount, totalFailureCount);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询租户下的设备类型列表
|
|
|
+ * @param tenantId 租户ID
|
|
|
+ * @return 设备类型列表
|
|
|
+ */
|
|
|
+ private List<String> getDeviceTypeListByTenant(Integer tenantId) {
|
|
|
+ LambdaQueryWrapper<DmpProduct> productQueryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ productQueryWrapper.select(DmpProduct::getProductCode)
|
|
|
+ .eq(tenantId != null && tenantId > 0, DmpProduct::getTenantId, tenantId)
|
|
|
+ .eq(DmpProduct::getDeleteFlag, 0);
|
|
|
+ List<DmpProduct> productList = dmpProductMapper.selectList(productQueryWrapper);
|
|
|
+ return productList.stream()
|
|
|
+ .map(DmpProduct::getProductCode)
|
|
|
+ .distinct() // 去重
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据设备类型列表查询设备
|
|
|
+ * @param productCodeList 设备类型列表
|
|
|
+ * @return 设备列表
|
|
|
+ */
|
|
|
+ private List<DmpDevice> getDeviceListByType(List<String> productCodeList) {
|
|
|
+ LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.select(DmpDevice::getDeviceUuid, DmpDevice::getDeviceType, DmpDevice::getDeviceId, DmpDevice::getProductCode)
|
|
|
+ .in(DmpDevice::getProductCode, productCodeList)
|
|
|
+ .eq(DmpDevice::getDeleteFlag, 0)
|
|
|
+ .notIn(DmpDevice::getServiceStatus, 3)
|
|
|
+ .orderByAsc(DmpDevice::getProductCode);
|
|
|
+ return dmpDeviceMapper.selectList(queryWrapper);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 手动创建/刷新 MQTT 连接(含动态 clientId)
|
|
|
+ * @param username MQTT用户名
|
|
|
+ * @param password MQTT密码
|
|
|
+ */
|
|
|
+ public synchronized void createMqttConnection(String username, String password) {
|
|
|
+ log.info("手动创建/刷新 MQTT 连接(含动态 clientId),用户名:{},密码:{}", username, password);
|
|
|
+ try {
|
|
|
+ // 检查MqttConnectionTool是否已注入
|
|
|
+ if (this.mqttConnectionTool == null) {
|
|
|
+ throw new IllegalStateException("MqttConnectionTool未注入,无法获取MQTT Gateway");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 使用MqttConnectionTool创建或刷新MQTT连接
|
|
|
+ MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(username, password);
|
|
|
+
|
|
|
+ // 存储到映射中
|
|
|
+ mqttGatewayMap.put(username, gateway);
|
|
|
+ log.info("MQTT连接创建/刷新成功,用户名:{}", username);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
|
|
|
+ throw new RuntimeException("初始化MQTT连接失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 验证MQTT网关是否初始化
|
|
|
+ * @param username 用户名
|
|
|
+ * @return 是否初始化
|
|
|
+ */
|
|
|
+ private boolean validateMqttGateway(String username) {
|
|
|
+ if (username == null) {
|
|
|
+ log.warn("MQTT Gateway未初始化,无法发送消息,用户名:null");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ // 一次性获取网关实例,避免竞态条件
|
|
|
+ MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
|
|
|
+ if (gateway == null) {
|
|
|
+ log.warn("MQTT Gateway未初始化,无法发送消息,用户名:{}", username);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解析数据时间
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @return 解析后的时间,如果解析失败返回null
|
|
|
+ */
|
|
|
+ private LocalDateTime parseDataTime(JSONObject deviceDataItem) {
|
|
|
+ Object raw = deviceDataItem.get("realtime");
|
|
|
+ Long dataTime = null;
|
|
|
+ if (raw instanceof Long) {
|
|
|
+ dataTime = (Long) raw;
|
|
|
+ } else if (raw instanceof Integer) {
|
|
|
+ dataTime = ((Integer) raw).longValue();
|
|
|
+ } else if (raw instanceof Number) {
|
|
|
+ dataTime = ((Number) raw).longValue();
|
|
|
+ } else if (raw instanceof String) {
|
|
|
+ try {
|
|
|
+ dataTime = Long.parseLong((String) raw);
|
|
|
+ } catch (Exception ignored) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (dataTime == null) return null;
|
|
|
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送MQTT消息
|
|
|
+ * @param topic MQTT topic
|
|
|
+ * @param vo 消息对象
|
|
|
+ * @param messageType 消息类型描述
|
|
|
+ * @param username 用户名
|
|
|
+ */
|
|
|
+ void sendMqttMessage(String topic, Object vo, String messageType, String username) {
|
|
|
+ String json;
|
|
|
+
|
|
|
+ // 针对楼层平面图特殊处理:将 byte[] 转为 Base64 字符串
|
|
|
+ if (vo instanceof com.usky.cdi.service.vo.info.FloorPlaneVO) {
|
|
|
+ json = serializeFloorPlaneVO((com.usky.cdi.service.vo.info.FloorPlaneVO) vo);
|
|
|
+ } else {
|
|
|
+ json = JSON.toJSONString(vo);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("发送MQTT消息,Topic: {}, 消息类型: {}, JSON长度: {}", topic, messageType, json.length());
|
|
|
+
|
|
|
+ MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
|
|
|
+ if (gateway != null) {
|
|
|
+ gateway.sendToMqtt(topic, json);
|
|
|
+ } else {
|
|
|
+ log.warn("MQTT Gateway未找到,无法发送消息,用户名:{}", username);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 序列化楼层平面图VO(将 floorFile byte[] 转为 Base64 字符串)
|
|
|
+ */
|
|
|
+ private String serializeFloorPlaneVO(com.usky.cdi.service.vo.info.FloorPlaneVO vo) {
|
|
|
+ com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
|
|
|
+
|
|
|
+ jsonObject.put("dataPacketID", vo.getDataPacketID());
|
|
|
+ jsonObject.put("engineeringID", vo.getEngineeringID());
|
|
|
+ jsonObject.put("floor", vo.getFloor());
|
|
|
+ jsonObject.put("floorFileID", vo.getFloorFileID());
|
|
|
+ jsonObject.put("floorFileName", vo.getFloorFileName());
|
|
|
+ jsonObject.put("floorFileSuffix", vo.getFloorFileSuffix());
|
|
|
+ jsonObject.put("filePixWidth", vo.getFilePixWidth());
|
|
|
+ jsonObject.put("filePixHeight", vo.getFilePixHeight());
|
|
|
+ jsonObject.put("publishTime", vo.getPublishTime());
|
|
|
+
|
|
|
+ // 关键:将 byte[] 转为 Base64 字符串
|
|
|
+ if (vo.getFloorFile() != null) {
|
|
|
+ String base64File = java.util.Base64.getEncoder().encodeToString(vo.getFloorFile());
|
|
|
+ jsonObject.put("floorFile", base64File);
|
|
|
+ log.info("平面图文件转换Base64成功,FileID: {}, 原始大小: {} bytes, Base64长度: {}",
|
|
|
+ vo.getFloorFileID(), vo.getFloorFile().length, base64File.length());
|
|
|
+ } else {
|
|
|
+ jsonObject.put("floorFile", "");
|
|
|
+ log.warn("平面图文件为空,FileID: {}", vo.getFloorFileID());
|
|
|
+ }
|
|
|
+
|
|
|
+ Gson gson = new Gson();
|
|
|
+ return gson.toJson(jsonObject);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void allData(Long engineeringId, String username, String password) {
|
|
|
+ Integer tenantId = 0;
|
|
|
+ synchronizeDeviceData(tenantId, engineeringId, username, password);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 为监测时间添加随机偏移(用于模拟真实采集场景)
|
|
|
+ * @param originalTime 原始监测时间
|
|
|
+ * @return 偏移后的时间
|
|
|
+ */
|
|
|
+ private LocalDateTime addTimeOffset(LocalDateTime originalTime) {
|
|
|
+ if (originalTime == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取当前时间作为上限
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+
|
|
|
+ // 如果原始时间已经是未来时间,先修正为当前时间
|
|
|
+ if (originalTime.isAfter(now)) {
|
|
|
+ log.warn("检测到未来时间的监测数据,已修正为当前时间:{}", originalTime);
|
|
|
+ originalTime = now;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 随机秒偏移:0、1、2 秒(向前偏移,使用减法)
|
|
|
+ int secondsOffset = ThreadLocalRandom.current().nextInt(0, 3);
|
|
|
+ // 随机毫秒偏移:0 ~ 999 毫秒(向前偏移,使用减法)
|
|
|
+ int millisOffset = ThreadLocalRandom.current().nextInt(0, 1000);
|
|
|
+
|
|
|
+ LocalDateTime offsetTime = originalTime.minusSeconds(secondsOffset)
|
|
|
+ .minusNanos(millisOffset * 1_000_000L);
|
|
|
+
|
|
|
+ // 最终校验:确保不会晚于当前时间
|
|
|
+ if (offsetTime.isAfter(now)) {
|
|
|
+ log.warn("偏移后时间仍然晚于当前时间,已修正:原始={}, 偏移后={}, 当前={}",
|
|
|
+ originalTime, offsetTime, now);
|
|
|
+ return now;
|
|
|
+ }
|
|
|
+
|
|
|
+ return offsetTime;
|
|
|
+ }
|
|
|
+}
|