|
|
@@ -2,24 +2,35 @@ package com.usky.cdi.service.impl;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.usky.cdi.domain.DmpDevice;
|
|
|
+import com.usky.cdi.domain.DmpProduct;
|
|
|
+import com.usky.cdi.mapper.DmpDeviceMapper;
|
|
|
+import com.usky.cdi.mapper.DmpProductMapper;
|
|
|
import com.usky.cdi.service.config.mqtt.MqttGateway;
|
|
|
import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
|
|
|
import com.usky.cdi.service.util.DeviceDataQuery;
|
|
|
import com.usky.cdi.service.util.SnowflakeIdGenerator;
|
|
|
+import com.usky.cdi.service.vo.IotDataTransferVO;
|
|
|
import com.usky.cdi.service.vo.base.*;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+
|
|
|
import java.time.Instant;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
-import java.util.Random;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
- *
|
|
|
* @author fyc
|
|
|
* @email yuchuan.fu@chinausky.com
|
|
|
* @date 2025/11/20
|
|
|
@@ -32,15 +43,30 @@ public class IotDataTransferService {
|
|
|
@Autowired(required = false)
|
|
|
private MqttGateway mqttGateway;
|
|
|
|
|
|
- private final SnowflakeIdGenerator idGenerator;
|
|
|
+ private SnowflakeIdGenerator idGenerator;
|
|
|
|
|
|
@Autowired
|
|
|
private DeviceDataQuery deviceDataQuery;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private DmpDeviceMapper dmpDeviceMapper;
|
|
|
+
|
|
|
+ @Value("${device.data.simulation}")
|
|
|
+ private boolean simulation;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DmpProductMapper dmpProductMapper;
|
|
|
+
|
|
|
+ // 从配置文件读取Snowflake参数,默认值为2
|
|
|
+ @Value("${snowflake.worker-id:2}")
|
|
|
+ private long workerId;
|
|
|
+
|
|
|
+ @Value("${snowflake.data-center-id:2}")
|
|
|
+ private long dataCenterId;
|
|
|
|
|
|
- public IotDataTransferService() {
|
|
|
- // 使用默认的workerId和datacenterId,实际项目中可以从配置读取
|
|
|
- this.idGenerator = new SnowflakeIdGenerator(1L, 1L);
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -61,225 +87,554 @@ public class IotDataTransferService {
|
|
|
* 发送水浸状态(deviceType:702)
|
|
|
* Topic: iotInfo/flooded
|
|
|
*
|
|
|
- * @return 是否发送成功
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
*/
|
|
|
- public boolean sendWaterLeak() {
|
|
|
- if (mqttGateway == null) {
|
|
|
- log.warn("MQTT Gateway未初始化,无法发送消息");
|
|
|
- return false;
|
|
|
+ public Map<String, Integer> sendWaterLeak(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway()) {
|
|
|
+ return result;
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
- List<JSONObject> deviceData = deviceDataQuery.getDeviceData(702);
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ log.info("开始推送水浸状态数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
|
|
|
if (deviceData.isEmpty()) {
|
|
|
- log.warn("没有获取到水浸数据!");
|
|
|
- return false;
|
|
|
+ log.warn("没有获取到水浸数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
- Integer leachStatus = Integer.valueOf(deviceDataItem.getString("leach_status"));
|
|
|
- Long dataTime = deviceDataItem.getLong("time");
|
|
|
- LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ Integer value = deviceDataItem.getInteger("leach_status");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的水浸状态数据为空", deviceId);
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
WaterLeakVO vo = new WaterLeakVO();
|
|
|
vo.setDataPacketID(generateDataPacketID());
|
|
|
- vo.setSensorID(4399L);
|
|
|
- vo.setEngineeringID(9527L);
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
vo.setPublishTime(getCurrentTime());
|
|
|
- vo.setSensorValue(leachStatus);
|
|
|
+ vo.setSensorValue(value);
|
|
|
vo.setDataEndTime(dataEndTime);
|
|
|
|
|
|
- String json = JSON.toJSONString(vo);
|
|
|
- String topic = EnvMonitorMqttTopic.WATER_LEAK.getTopic();
|
|
|
-
|
|
|
- log.info("发送水浸状态信息,Topic: {}, Data: {}", topic, json);
|
|
|
- mqttGateway.sendToMqtt(topic, json);
|
|
|
+ try {
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.WATER_LEAK, vo, "水浸状态信息");
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的水浸状态数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return true;
|
|
|
+ log.info("水浸状态数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+
|
|
|
+ return result;
|
|
|
} catch (Exception e) {
|
|
|
- log.error("发送水浸状态信息失败", e);
|
|
|
- return false;
|
|
|
+ log.error("水浸状态数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ return result;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送温湿度及气体浓度数据(设备类型:701)
|
|
|
+ * 发送温湿度及气体浓度数据(设备类型:701,707-711)
|
|
|
* 包含: wd(温度), sd(湿度), o2(氧气), co(一氧化碳), co2(二氧化碳)
|
|
|
*
|
|
|
- * @return 是否发送成功
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
*/
|
|
|
- public boolean sendEnvData() {
|
|
|
- if (mqttGateway == null) {
|
|
|
- log.warn("MQTT Gateway未初始化,无法发送消息");
|
|
|
- return false;
|
|
|
+ public Map<String, Integer> sendEnvData(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway()) {
|
|
|
+ return result;
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
- List<JSONObject> deviceData = deviceDataQuery.getDeviceData(701);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ log.info("开始推送温湿度及气体浓度数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
|
|
|
if (deviceData.isEmpty()) {
|
|
|
- log.warn("没有获取到空气质量数据!");
|
|
|
- return false;
|
|
|
+ log.warn("没有获取到空气质量数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
- Long dataTime = deviceDataItem.getLong("time");
|
|
|
- LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
|
|
|
-
|
|
|
- // 发送温度
|
|
|
- TempVO tempVO = new TempVO();
|
|
|
- tempVO.setDataPacketID(generateDataPacketID());
|
|
|
- tempVO.setSensorID(4399L);
|
|
|
- tempVO.setEngineeringID(9527L);
|
|
|
- tempVO.setPublishTime(getCurrentTime());
|
|
|
- tempVO.setSensorValue(deviceDataItem.getFloat("wd"));
|
|
|
- tempVO.setDataEndTime(dataEndTime);
|
|
|
- String tempJson = JSON.toJSONString(tempVO);
|
|
|
- mqttGateway.sendToMqtt(EnvMonitorMqttTopic.TEMP.getTopic(), tempJson);
|
|
|
- log.info("发送温度信息,Topic: {}, Data: {}", EnvMonitorMqttTopic.TEMP.getTopic(), tempJson);
|
|
|
-
|
|
|
- // 发送湿度
|
|
|
- HumidityVO humidityVO = new HumidityVO();
|
|
|
- humidityVO.setDataPacketID(generateDataPacketID());
|
|
|
- humidityVO.setSensorID(4399L);
|
|
|
- humidityVO.setEngineeringID(9527L);
|
|
|
- humidityVO.setPublishTime(getCurrentTime());
|
|
|
- humidityVO.setSensorValue(deviceDataItem.getFloat("sd"));
|
|
|
- humidityVO.setDataEndTime(dataEndTime);
|
|
|
- String humidityJson = JSON.toJSONString(humidityVO);
|
|
|
- mqttGateway.sendToMqtt(EnvMonitorMqttTopic.HUMIDITY.getTopic(), humidityJson);
|
|
|
- log.info("发送湿度信息,Topic: {}, Data: {}", EnvMonitorMqttTopic.HUMIDITY.getTopic(), humidityJson);
|
|
|
-
|
|
|
- // 发送氧气浓度
|
|
|
- OxygenVO oxygenVO = new OxygenVO();
|
|
|
- oxygenVO.setDataPacketID(generateDataPacketID());
|
|
|
- oxygenVO.setSensorID(4399L);
|
|
|
- oxygenVO.setEngineeringID(9527L);
|
|
|
- oxygenVO.setPublishTime(getCurrentTime());
|
|
|
- oxygenVO.setSensorValue(deviceDataItem.getFloat("o2"));
|
|
|
- oxygenVO.setDataEndTime(dataEndTime);
|
|
|
- String oxygenJson = JSON.toJSONString(oxygenVO);
|
|
|
- mqttGateway.sendToMqtt(EnvMonitorMqttTopic.OXYGEN.getTopic(), oxygenJson);
|
|
|
- log.info("发送氧气浓度信息,Topic: {}, Data: {}", EnvMonitorMqttTopic.OXYGEN.getTopic(), oxygenJson);
|
|
|
-
|
|
|
- // 发送一氧化碳浓度
|
|
|
- CoVO coVO = new CoVO();
|
|
|
- coVO.setDataPacketID(generateDataPacketID());
|
|
|
- coVO.setSensorID(4399L);
|
|
|
- coVO.setEngineeringID(9527L);
|
|
|
- coVO.setPublishTime(getCurrentTime());
|
|
|
- coVO.setSensorValue(deviceDataItem.getFloat("co"));
|
|
|
- coVO.setDataEndTime(dataEndTime);
|
|
|
- String coJson = JSON.toJSONString(coVO);
|
|
|
- mqttGateway.sendToMqtt(EnvMonitorMqttTopic.CO.getTopic(), coJson);
|
|
|
- log.info("发送一氧化碳浓度信息,Topic: {}, Data: {}", EnvMonitorMqttTopic.CO.getTopic(), coJson);
|
|
|
-
|
|
|
- // 发送二氧化碳浓度
|
|
|
- Co2VO co2VO = new Co2VO();
|
|
|
- co2VO.setDataPacketID(generateDataPacketID());
|
|
|
- co2VO.setSensorID(4399L);
|
|
|
- co2VO.setEngineeringID(9527L);
|
|
|
- co2VO.setPublishTime(getCurrentTime());
|
|
|
- co2VO.setSensorValue(deviceDataItem.getFloat("co2"));
|
|
|
- co2VO.setDataEndTime(dataEndTime);
|
|
|
- String co2Json = JSON.toJSONString(co2VO);
|
|
|
- mqttGateway.sendToMqtt(EnvMonitorMqttTopic.CO2.getTopic(), co2Json);
|
|
|
- log.info("发送二氧化碳浓度信息,Topic: {}, Data: {}", EnvMonitorMqttTopic.CO2.getTopic(), co2Json);
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+ boolean deviceSuccess = true;
|
|
|
+
|
|
|
+ // 根据设备类型发送对应的数据
|
|
|
+ try {
|
|
|
+ switch (deviceType) {
|
|
|
+ case 707:
|
|
|
+ sendTempData(deviceId, dataEndTime, deviceDataItem, engineeringId);
|
|
|
+ break;
|
|
|
+ case 708:
|
|
|
+ sendHumidityData(deviceId, dataEndTime, deviceDataItem, engineeringId);
|
|
|
+ break;
|
|
|
+ case 709:
|
|
|
+ sendOxygenData(deviceId, dataEndTime, deviceDataItem, engineeringId);
|
|
|
+ break;
|
|
|
+ case 710:
|
|
|
+ sendCo2Data(deviceId, dataEndTime, deviceDataItem, engineeringId);
|
|
|
+ break;
|
|
|
+ case 711:
|
|
|
+ sendCoData(deviceId, dataEndTime, deviceDataItem, engineeringId);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的环境数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ deviceSuccess = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 统计设备推送结果
|
|
|
+ if (deviceSuccess) {
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } else {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return true;
|
|
|
+ log.info("温湿度及气体浓度数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+
|
|
|
+ return result;
|
|
|
} catch (Exception e) {
|
|
|
- log.error("发送环境数据失败", e);
|
|
|
- return false;
|
|
|
+ log.error("温湿度及气体浓度数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ return result;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送人员闯入情况
|
|
|
+ * 发送人员闯入情况(703)
|
|
|
*
|
|
|
- * @param vo 人员闯入
|
|
|
- * @return 是否发送成功
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
**/
|
|
|
- public boolean sendPersonPresence(PersonPresenceVO vo) {
|
|
|
- if (mqttGateway == null) {
|
|
|
- log.warn("MQTT Gateway未初始化,无法发送消息");
|
|
|
- return false;
|
|
|
+ public Map<String, Integer> sendPersonPresence(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway()) {
|
|
|
+ return result;
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
- if (vo.getDataPacketID() == null) {
|
|
|
- vo.setDataPacketID(generateDataPacketID());
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
+
|
|
|
+ log.info("开始推送人员闯入情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到人员闯入情况数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+ return result;
|
|
|
}
|
|
|
- if (vo.getPublishTime() == null) {
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
+ for (JSONObject deviceDataItem : deviceData) {
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
+
|
|
|
+ PersonPresenceVO vo = new PersonPresenceVO();
|
|
|
+ vo.setDataPacketID(generateDataPacketID());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setDataEndTime(dataEndTime);
|
|
|
vo.setPublishTime(getCurrentTime());
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ // 传感器值固定为0(可能是默认值或占位符)
|
|
|
+ vo.setSensorValue(0);
|
|
|
+
|
|
|
+ try {
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.PERSON_PRESENCE, vo, "人员闯入情况");
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的人员闯入情况数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- String json = JSON.toJSONString(vo);
|
|
|
- String topic = EnvMonitorMqttTopic.PERSON_PRESENCE.getTopic();
|
|
|
- log.info("发送人员闯入情况,Topic: {}, Data: {}", topic, json);
|
|
|
- mqttGateway.sendToMqtt(topic, json);
|
|
|
- return true;
|
|
|
+ log.info("人员闯入情况数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+
|
|
|
+ return result;
|
|
|
} catch (Exception e) {
|
|
|
- log.error("发送人员闯入情况失败", e);
|
|
|
- return false;
|
|
|
+ log.error("人员闯入情况数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ return result;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送人防用电负荷情况
|
|
|
+ * 发送人防用电负荷情况(704)
|
|
|
*
|
|
|
- * @return 是否发送成功
|
|
|
+ * @return 推送结果,包含成功数和失败数
|
|
|
**/
|
|
|
- public boolean sendElectricityLoad() {
|
|
|
- if (mqttGateway == null) {
|
|
|
- log.warn("MQTT Gateway未初始化,无法发送消息");
|
|
|
- return false;
|
|
|
+ public Map<String, Integer> sendElectricityLoad(IotDataTransferVO transferVO) {
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+ result.put("successCount", 0);
|
|
|
+ result.put("failureCount", 0);
|
|
|
+
|
|
|
+ if (!validateMqttGateway()) {
|
|
|
+ return result;
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
- List<JSONObject> deviceData = deviceDataQuery.getDeviceData(704);
|
|
|
+ List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Integer totalDevices = transferVO.getDevices().size();
|
|
|
|
|
|
+ log.info("开始推送人防用电负荷情况数据,设备类型:{},设备数量:{},获取到的数据条数:{}",
|
|
|
+ deviceType, totalDevices, deviceData.size());
|
|
|
+
|
|
|
+ if (deviceData.isEmpty()) {
|
|
|
+ log.warn("没有获取到人防用电负荷情况数据!设备类型:{}", deviceType);
|
|
|
+ result.put("failureCount", totalDevices);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ Long engineeringId = transferVO.getEngineeringId();
|
|
|
for (JSONObject deviceDataItem : deviceData) {
|
|
|
- Long dataTime = deviceDataItem.getLong("time");
|
|
|
- LocalDateTime dataEndTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
|
|
|
- Integer voltageA = deviceDataItem.getInteger("voltage_a");
|
|
|
- Integer voltageB = deviceDataItem.getInteger("voltage_b");
|
|
|
- Integer voltageC = deviceDataItem.getInteger("voltage_c");
|
|
|
- Integer currentA = deviceDataItem.getInteger("current_a");
|
|
|
- Integer currentB = deviceDataItem.getInteger("current_b");
|
|
|
- Integer currentC = deviceDataItem.getInteger("current_c");
|
|
|
- Integer temperatureA = deviceDataItem.getInteger("temperature_a");
|
|
|
- Integer temperatureB = deviceDataItem.getInteger("temperature_b");
|
|
|
- Integer temperatureC = deviceDataItem.getInteger("temperature_c");
|
|
|
- Integer currentResidual = deviceDataItem.getInteger("current_residual");
|
|
|
+ LocalDateTime dataEndTime = parseDataTime(deviceDataItem);
|
|
|
+ if (dataEndTime == null) {
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
+ Integer deviceId = deviceDataItem.getIntValue("device_id");
|
|
|
ElectricityLoadVO vo = new ElectricityLoadVO();
|
|
|
vo.setDataPacketID(generateDataPacketID());
|
|
|
- vo.setSensorID(new Random().nextLong());
|
|
|
- vo.setEngineeringID(new Random().nextLong());
|
|
|
+ vo.setSensorID(deviceId);
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
vo.setPublishTime(getCurrentTime());
|
|
|
vo.setDataEndTime(dataEndTime);
|
|
|
- vo.setAVoltage(Float.valueOf(voltageA));
|
|
|
- vo.setBVoltage(Float.valueOf(voltageB));
|
|
|
- vo.setCVoltage(Float.valueOf(voltageC));
|
|
|
- vo.setAElectricity(Float.valueOf(currentA));
|
|
|
- vo.setBElectricity(Float.valueOf(currentB));
|
|
|
- vo.setCElectricity(Float.valueOf(currentC));
|
|
|
- vo.setLine1TEMP(Float.valueOf(temperatureA));
|
|
|
- vo.setLine2TEMP(Float.valueOf(temperatureB));
|
|
|
- vo.setLine3TEMP(Float.valueOf(temperatureC));
|
|
|
- vo.setLeakageCurrent(Float.valueOf(currentResidual));
|
|
|
- vo.setTotalPower(new Random().nextFloat() * 1000F);
|
|
|
-
|
|
|
- String json = JSON.toJSONString(vo);
|
|
|
- String topic = EnvMonitorMqttTopic.ELECTRICITY_LOAD.getTopic();
|
|
|
- log.info("发送人防用电负荷情况,Topic: {}, Data: {}", topic, json);
|
|
|
- mqttGateway.sendToMqtt(topic, json);
|
|
|
+ vo.setAVoltage(deviceDataItem.getFloat("aVoltage"));
|
|
|
+ vo.setBVoltage(deviceDataItem.getFloat("bVoltage"));
|
|
|
+ vo.setCVoltage(deviceDataItem.getFloat("cVoltage"));
|
|
|
+ vo.setAElectricity(deviceDataItem.getFloat("aElectricity"));
|
|
|
+ vo.setBElectricity(deviceDataItem.getFloat("bElectricity"));
|
|
|
+ vo.setCElectricity(deviceDataItem.getFloat("cElectricity"));
|
|
|
+ vo.setLine1TEMP(deviceDataItem.getFloat("line1TEMP"));
|
|
|
+ vo.setLine2TEMP(deviceDataItem.getFloat("Line2TEMP"));
|
|
|
+ vo.setLine3TEMP(deviceDataItem.getFloat("Line3TEMP"));
|
|
|
+ vo.setLeakageCurrent(deviceDataItem.getFloat("leakageCurrent"));
|
|
|
+
|
|
|
+ // 根据模拟模式选择不同的功率字段
|
|
|
+ vo.setTotalPower(simulation ?
|
|
|
+ deviceDataItem.getFloat("totalPower") :
|
|
|
+ deviceDataItem.getFloat("active_power"));
|
|
|
+
|
|
|
+ try {
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.ELECTRICITY_LOAD, vo, "人防用电负荷情况");
|
|
|
+ result.put("successCount", result.get("successCount") + 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("设备{}的人防用电负荷情况数据推送失败:{}", deviceId, e.getMessage());
|
|
|
+ result.put("failureCount", result.get("failureCount") + 1);
|
|
|
+ }
|
|
|
}
|
|
|
- return true;
|
|
|
+
|
|
|
+ log.info("人防用电负荷情况数据推送完成,设备类型:{},成功:{},失败:{}",
|
|
|
+ deviceType, result.get("successCount"), result.get("failureCount"));
|
|
|
+
|
|
|
+ return result;
|
|
|
} catch (Exception e) {
|
|
|
- log.error("发送人防用电负荷情况失败", e);
|
|
|
+ log.error("人防用电负荷情况数据推送发生异常", e);
|
|
|
+ result.put("failureCount", transferVO.getDevices().size());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 提取公共发送方法,减少代码冗余
|
|
|
+ private void sendTempData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
|
|
|
+ Float value = deviceDataItem.getFloat("wd");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的温度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TempVO tempVO = new TempVO();
|
|
|
+ tempVO.setDataPacketID(generateDataPacketID());
|
|
|
+ tempVO.setSensorID(deviceId);
|
|
|
+ tempVO.setEngineeringID(engineeringID);
|
|
|
+ tempVO.setPublishTime(getCurrentTime());
|
|
|
+ tempVO.setSensorValue(value);
|
|
|
+ tempVO.setDataEndTime(dataEndTime);
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.TEMP, tempVO, "温度信息");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendHumidityData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
|
|
|
+ Float value = deviceDataItem.getFloat("sd");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的湿度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ HumidityVO humidityVO = new HumidityVO();
|
|
|
+ humidityVO.setDataPacketID(generateDataPacketID());
|
|
|
+ humidityVO.setSensorID(deviceId);
|
|
|
+ humidityVO.setEngineeringID(engineeringID);
|
|
|
+ humidityVO.setPublishTime(getCurrentTime());
|
|
|
+ humidityVO.setSensorValue(value);
|
|
|
+ humidityVO.setDataEndTime(dataEndTime);
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.HUMIDITY, humidityVO, "湿度信息");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendOxygenData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
|
|
|
+ Float value = deviceDataItem.getFloat("o2");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的氧气浓度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ OxygenVO oxygenVO = new OxygenVO();
|
|
|
+ oxygenVO.setDataPacketID(generateDataPacketID());
|
|
|
+ oxygenVO.setSensorID(deviceId);
|
|
|
+ oxygenVO.setEngineeringID(engineeringID);
|
|
|
+ oxygenVO.setPublishTime(getCurrentTime());
|
|
|
+ oxygenVO.setSensorValue(value);
|
|
|
+ oxygenVO.setDataEndTime(dataEndTime);
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.OXYGEN, oxygenVO, "氧气浓度信息");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendCoData(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
|
|
|
+ Float value = deviceDataItem.getFloat("co");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的一氧化碳浓度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ CoVO coVO = new CoVO();
|
|
|
+ coVO.setDataPacketID(generateDataPacketID());
|
|
|
+ coVO.setSensorID(deviceId);
|
|
|
+ coVO.setEngineeringID(engineeringID);
|
|
|
+ coVO.setPublishTime(getCurrentTime());
|
|
|
+ coVO.setSensorValue(value);
|
|
|
+ coVO.setDataEndTime(dataEndTime);
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.CO, coVO, "一氧化碳浓度信息");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendCo2Data(Integer deviceId, LocalDateTime dataEndTime, JSONObject deviceDataItem, Long engineeringID) {
|
|
|
+ Float value = deviceDataItem.getFloat("co2");
|
|
|
+ if (value == null) {
|
|
|
+ log.warn("设备{}的二氧化碳浓度数据为空", deviceId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Co2VO co2VO = new Co2VO();
|
|
|
+ co2VO.setDataPacketID(generateDataPacketID());
|
|
|
+ co2VO.setSensorID(deviceId);
|
|
|
+ co2VO.setEngineeringID(engineeringID);
|
|
|
+ co2VO.setPublishTime(getCurrentTime());
|
|
|
+ co2VO.setSensorValue(value);
|
|
|
+ co2VO.setDataEndTime(dataEndTime);
|
|
|
+ sendMqttMessage(EnvMonitorMqttTopic.CO2, co2VO, "二氧化碳浓度信息");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步设备数据
|
|
|
+ * @param tenantId 租户ID
|
|
|
+ * @param engineeringId 工程ID
|
|
|
+ */
|
|
|
+ public void synchronizeDeviceData(Integer tenantId, Long engineeringId) {
|
|
|
+ // 参数校验
|
|
|
+ if (tenantId == null || engineeringId == null) {
|
|
|
+ log.error("租户ID或工程ID不能为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询租户下的所有产品类型
|
|
|
+ List<String> deviceTypeList = getDeviceTypeListByTenant(tenantId);
|
|
|
+ if (deviceTypeList.isEmpty()) {
|
|
|
+ log.warn("租户{}不存在任何产品", tenantId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询设备列表
|
|
|
+ List<DmpDevice> deviceList = getDeviceListByType(deviceTypeList, tenantId);
|
|
|
+ if (deviceList.isEmpty()) {
|
|
|
+ log.warn("租户{}不存在任何设备", tenantId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 按设备类型分组
|
|
|
+ Map<Integer, List<DmpDevice>> deviceTypeMap = deviceList.stream()
|
|
|
+ .collect(Collectors.groupingBy(DmpDevice::getDeviceType));
|
|
|
+
|
|
|
+ // 构建数据传输对象列表
|
|
|
+ List<IotDataTransferVO> transferList = new ArrayList<>();
|
|
|
+ deviceTypeMap.forEach((deviceType, devices) -> {
|
|
|
+ IotDataTransferVO transferVO = new IotDataTransferVO();
|
|
|
+ transferVO.setDeviceType(deviceType);
|
|
|
+ transferVO.setDevices(devices);
|
|
|
+ transferVO.setEngineeringId(engineeringId);
|
|
|
+ transferList.add(transferVO);
|
|
|
+ });
|
|
|
+
|
|
|
+ // 按产品代码分组,构建ProductCode到uuid列表的映射,过滤掉产品代码为null的设备
|
|
|
+ Map<String, List<String>> codeDeviceUuidsMap = deviceList.stream()
|
|
|
+ .filter(device -> device.getProductCode() != null)
|
|
|
+ .collect(Collectors.groupingBy(DmpDevice::getProductCode,
|
|
|
+ Collectors.mapping(DmpDevice::getDeviceUuid, Collectors.toList())));
|
|
|
+
|
|
|
+ // 任务开始日志
|
|
|
+ Integer totalDevices = deviceList.size();
|
|
|
+ Integer totalProductTypes = codeDeviceUuidsMap.size();
|
|
|
+ log.info("设备数据同步任务开始,租户ID:{},工程ID:{}", tenantId, engineeringId);
|
|
|
+ log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
|
|
|
+ log.info("总共需要推送设备数量:{}个,涉及设备类型数:{}个,设备类型为:{}",
|
|
|
+ totalDevices, deviceTypeMap.size(), deviceTypeMap.keySet());
|
|
|
+
|
|
|
+ // 记录每种设备类型的设备数量
|
|
|
+ deviceTypeMap.forEach((deviceType, devices) -> {
|
|
|
+ log.info("设备类型:{},设备数量:{}", deviceType, devices.size());
|
|
|
+ });
|
|
|
+
|
|
|
+ // 按设备类型处理数据同步
|
|
|
+ int totalSuccessCount = 0;
|
|
|
+ int totalFailureCount = 0;
|
|
|
+
|
|
|
+ for (IotDataTransferVO transferVO : transferList) {
|
|
|
+ Integer deviceType = transferVO.getDeviceType();
|
|
|
+ Map<String, Integer> result = new HashMap<>();
|
|
|
+
|
|
|
+ switch (deviceType) {
|
|
|
+ case 707:
|
|
|
+ case 708:
|
|
|
+ case 709:
|
|
|
+ case 710:
|
|
|
+ case 711:
|
|
|
+ result = sendEnvData(transferVO);
|
|
|
+ break;
|
|
|
+ case 702:
|
|
|
+ result = sendWaterLeak(transferVO);
|
|
|
+ break;
|
|
|
+ case 703:
|
|
|
+ result = sendPersonPresence(transferVO);
|
|
|
+ break;
|
|
|
+ case 704:
|
|
|
+ result = sendElectricityLoad(transferVO);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.debug("不支持的设备类型:{}", deviceType);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 累加成功数和失败数
|
|
|
+ totalSuccessCount += result.get("successCount");
|
|
|
+ totalFailureCount += result.get("failureCount");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 任务完成总结
|
|
|
+ log.info("设备数据同步任务完成,租户ID:{},工程ID:{}", tenantId, engineeringId);
|
|
|
+ log.info("总共涉及产品类型数:{}个,产品代码为:{}", totalProductTypes, codeDeviceUuidsMap.keySet());
|
|
|
+ log.info("总共推送设备数量:{}个,成功:{}个,失败:{}个",
|
|
|
+ totalDevices, totalSuccessCount, totalFailureCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询租户下的设备类型列表
|
|
|
+ * @param tenantId 租户ID
|
|
|
+ * @return 设备类型列表
|
|
|
+ */
|
|
|
+ private List<String> getDeviceTypeListByTenant(Integer tenantId) {
|
|
|
+ LambdaQueryWrapper<DmpProduct> productQueryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ productQueryWrapper.select(DmpProduct::getProductCode)
|
|
|
+ .eq(DmpProduct::getTenantId, tenantId)
|
|
|
+ .eq(DmpProduct::getDeleteFlag, 0);
|
|
|
+ List<DmpProduct> productList = dmpProductMapper.selectList(productQueryWrapper);
|
|
|
+ return productList.stream()
|
|
|
+ .map(DmpProduct::getProductCode)
|
|
|
+ .distinct() // 去重
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据设备类型列表查询设备
|
|
|
+ * @param productCodeList 设备类型列表
|
|
|
+ * @return 设备列表
|
|
|
+ */
|
|
|
+ private List<DmpDevice> getDeviceListByType(List<String> productCodeList, Integer tenantId) {
|
|
|
+ LambdaQueryWrapper<DmpDevice> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.select(DmpDevice::getDeviceUuid, DmpDevice::getDeviceType, DmpDevice::getDeviceId, DmpDevice::getProductCode)
|
|
|
+ .in(DmpDevice::getProductCode, productCodeList)
|
|
|
+ .eq(DmpDevice::getDeleteFlag, 0)
|
|
|
+ .notIn(DmpDevice::getServiceStatus, 3)
|
|
|
+ .orderByAsc(DmpDevice::getProductCode);
|
|
|
+ return dmpDeviceMapper.selectList(queryWrapper);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 验证MQTT网关是否初始化
|
|
|
+ * @return 是否初始化
|
|
|
+ */
|
|
|
+ private boolean validateMqttGateway() {
|
|
|
+ if (mqttGateway == null) {
|
|
|
+ log.warn("MQTT Gateway未初始化,无法发送消息");
|
|
|
return false;
|
|
|
}
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 解析数据时间
|
|
|
+ * @param deviceDataItem 设备数据
|
|
|
+ * @return 解析后的时间,如果解析失败返回null
|
|
|
+ */
|
|
|
+ private LocalDateTime parseDataTime(JSONObject deviceDataItem) {
|
|
|
+ Long dataTime = deviceDataItem.getLong("time");
|
|
|
+ if (dataTime == null) {
|
|
|
+ log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime), ZoneId.systemDefault());
|
|
|
+ }
|
|
|
|
|
|
-}
|
|
|
+ /**
|
|
|
+ * 发送MQTT消息
|
|
|
+ * @param topicEnum 主题枚举
|
|
|
+ * @param vo 消息对象
|
|
|
+ * @param messageType 消息类型描述
|
|
|
+ */
|
|
|
+ private void sendMqttMessage(EnvMonitorMqttTopic topicEnum, Object vo, String messageType) {
|
|
|
+ String json = JSON.toJSONString(vo);
|
|
|
+ String topic = topicEnum.getTopic();
|
|
|
+ // 不再记录每条数据的详情,只记录发送操作
|
|
|
+ mqttGateway.sendToMqtt(topic, json);
|
|
|
+ }
|
|
|
+}
|