|
|
@@ -4,14 +4,19 @@ import com.alibaba.fastjson.JSON;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.usky.cdi.domain.BaseAlarm;
|
|
|
import com.usky.cdi.domain.BaseBuildFacility;
|
|
|
+import com.usky.cdi.domain.CdiDefenseProject;
|
|
|
+import com.usky.cdi.domain.DmpDevice;
|
|
|
import com.usky.cdi.mapper.BaseAlarmMapper;
|
|
|
import com.usky.cdi.mapper.BaseBuildFacilityMapper;
|
|
|
+import com.usky.cdi.mapper.CdiDefenseProjectMapper;
|
|
|
+import com.usky.cdi.mapper.DmpDeviceMapper;
|
|
|
import com.usky.cdi.service.CdiDeliveryLogService;
|
|
|
import com.usky.cdi.service.enums.MqttTopics;
|
|
|
import com.usky.cdi.service.mqtt.MqttConnectionTool;
|
|
|
import com.usky.cdi.service.util.SnowflakeIdGenerator;
|
|
|
import com.usky.cdi.service.vo.alarm.AlarmMessageVO;
|
|
|
import com.usky.cdi.service.enums.AlarmType;
|
|
|
+import com.usky.common.core.exception.BusinessException;
|
|
|
import com.usky.common.security.utils.SecurityUtils;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
@@ -25,6 +30,8 @@ import javax.annotation.PostConstruct;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
@@ -47,6 +54,13 @@ public class AlarmDataSyncService {
|
|
|
private static final String PEACETIME = "peacetime";
|
|
|
private static final String WARTIME = "wartime";
|
|
|
private static final String ALARM_DELIVERY_KEY_PREFIX = "alarm:delivery:";
|
|
|
+ private static final String ALARM_TOPIC = "alarm/message";
|
|
|
+ private static final long AUTO_PUSH_INTERVAL_MS = 70_000L;
|
|
|
+ private static final int[] AUTO_ALARM_STATUSES = {1, 2, 3, 4, 5};
|
|
|
+ /** deviceType 即产品编码,如 702=水浸、707=温度 */
|
|
|
+ private static final List<Integer> AUTO_DEVICE_TYPE_ORDER = Arrays.asList(
|
|
|
+ 702, 716, 707, 708, 709, 710, 711, 703, 704, 705, 712, 713, 714
|
|
|
+ );
|
|
|
private final MqttConnectionTool mqttConnectionTool;
|
|
|
private final SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
|
|
|
|
|
@@ -58,6 +72,10 @@ public class AlarmDataSyncService {
|
|
|
private BaseBuildFacilityMapper baseBuildFacilityMapper;
|
|
|
@Autowired
|
|
|
private CdiDeliveryLogService cdiDeliveryLogService;
|
|
|
+ @Autowired
|
|
|
+ private CdiDefenseProjectMapper cdiDefenseProjectMapper;
|
|
|
+ @Autowired
|
|
|
+ private DmpDeviceMapper dmpDeviceMapper;
|
|
|
|
|
|
private SnowflakeIdGenerator idGenerator;
|
|
|
|
|
|
@@ -566,4 +584,284 @@ public class AlarmDataSyncService {
|
|
|
private boolean isValidLineNo(int lineNo) {
|
|
|
return lineNo >= 1 && lineNo <= 20; // 限制在1-20范围内,符合实际情况
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动自动告警推送任务(异步执行,每种告警按状态 1-5 各推送一次,间隔 70s)
|
|
|
+ */
|
|
|
+ public boolean publishAlarmAuto(String alarmPassword) {
|
|
|
+ try {
|
|
|
+ Integer tenantId = SecurityUtils.getTenantId();
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ doPublishAlarmAuto(tenantId, alarmPassword);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("自动告警推送任务执行失败, tenantId={}", tenantId, e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("启动自动告警推送任务失败", e);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doPublishAlarmAuto(Integer tenantId, String alarmPassword) {
|
|
|
+ log.info("租户{}自动告警推送任务开始", tenantId);
|
|
|
+
|
|
|
+ CdiDefenseProject project = cdiDefenseProjectMapper.selectOne(
|
|
|
+ new LambdaQueryWrapper<CdiDefenseProject>().eq(CdiDefenseProject::getTenantId, tenantId));
|
|
|
+ if (project == null) {
|
|
|
+ throw new BusinessException("未找到人防工程项目配置");
|
|
|
+ }
|
|
|
+ if (!project.getPasswd().equals(alarmPassword)) {
|
|
|
+ throw new BusinessException("密码错误!");
|
|
|
+ }
|
|
|
+ Long engineeringId = project.getEngineeringId();
|
|
|
+ String mqttUserName = project.getMqttUserName();
|
|
|
+ String mqttPassword = project.getMqttPassword();
|
|
|
+
|
|
|
+ List<DmpDevice> allDevices = dmpDeviceMapper.selectList(
|
|
|
+ new LambdaQueryWrapper<DmpDevice>()
|
|
|
+ .eq(DmpDevice::getTenantId, tenantId)
|
|
|
+ .eq(DmpDevice::getDeleteFlag, 0));
|
|
|
+ if (allDevices.isEmpty()) {
|
|
|
+ log.warn("租户{}没有可用设备,自动告警推送任务结束", tenantId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<Integer, DmpDevice> deviceByDeviceType = allDevices.stream()
|
|
|
+ .filter(d -> d.getDeviceType() != null)
|
|
|
+ .collect(Collectors.toMap(DmpDevice::getDeviceType, d -> d, (a, b) -> a, LinkedHashMap::new));
|
|
|
+ log.info("设备类型-设备映射: {}", deviceByDeviceType.keySet());
|
|
|
+ deviceByDeviceType.forEach((type, device) -> log.info(" deviceType={}, deviceId={}", type, device.getDeviceId()));
|
|
|
+
|
|
|
+ if (deviceByDeviceType.isEmpty()) {
|
|
|
+ log.info("租户{}没有可用设备,自动告警推送任务结束", tenantId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<String> deviceIds = deviceByDeviceType.values().stream()
|
|
|
+ .map(DmpDevice::getDeviceId)
|
|
|
+ .filter(id -> id != null && !id.trim().isEmpty())
|
|
|
+ .distinct()
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ Map<String, BaseBuildFacility> facilityMap = buildFacilityMapByDeviceIds(deviceIds, tenantId);
|
|
|
+ log.info("设备-空间映射: {}", facilityMap.keySet());
|
|
|
+
|
|
|
+ List<AlarmMessageVO<Integer>> alarmTemplates = buildAutoAlarmTemplates(deviceByDeviceType, facilityMap, engineeringId);
|
|
|
+ if (alarmTemplates.isEmpty()) {
|
|
|
+ log.info("租户{}当前无支持告警推送的设备类型(deviceType: {}),任务结束",
|
|
|
+ tenantId, deviceByDeviceType.keySet());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("租户{}匹配到{}种告警类型,开始按70s间隔推送",
|
|
|
+ tenantId, alarmTemplates.size());
|
|
|
+ MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(mqttUserName, mqttPassword);
|
|
|
+
|
|
|
+ int totalPushes = alarmTemplates.size() * AUTO_ALARM_STATUSES.length;
|
|
|
+ int pushCount = 0;
|
|
|
+ for (AlarmMessageVO<Integer> template : alarmTemplates) {
|
|
|
+ for (int alarmStatus : AUTO_ALARM_STATUSES) {
|
|
|
+ pushCount++;
|
|
|
+ Date publishDate = new Date();
|
|
|
+ template.setDataPacketID(generateDataPacketID());
|
|
|
+ template.setAlarmStatus(alarmStatus);
|
|
|
+ template.setPublishTime(timeFormat.format(publishDate));
|
|
|
+ template.setAlarmUpdateTime(calcAutoAlarmUpdateTime(publishDate));
|
|
|
+
|
|
|
+ String json = JSON.toJSONString(template);
|
|
|
+ gateway.sendToMqtt(ALARM_TOPIC, json);
|
|
|
+ log.info("推送数据:{}", json);
|
|
|
+ log.info("自动告警推送成功({}/{}): alarmType={}, alarmStatus={}, sensorID={}, 数据={}",
|
|
|
+ pushCount, totalPushes, template.getAlarmType(), alarmStatus, template.getSensorID(), json);
|
|
|
+
|
|
|
+ if (pushCount < totalPushes) {
|
|
|
+ sleepForAutoPush();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("租户{}自动告警推送任务完成,共推送{}条", tenantId, totalPushes);
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<AlarmMessageVO<Integer>> buildAutoAlarmTemplates(Map<Integer, DmpDevice> deviceByDeviceType,
|
|
|
+ Map<String, BaseBuildFacility> facilityMap,
|
|
|
+ Long engineeringId) {
|
|
|
+ List<AlarmMessageVO<Integer>> templates = new ArrayList<>();
|
|
|
+ Set<AlarmType> addedTypes = new HashSet<>();
|
|
|
+ boolean electricalAdded = false;
|
|
|
+ int alarmIdSeq = 1;
|
|
|
+
|
|
|
+ for (Integer deviceType : AUTO_DEVICE_TYPE_ORDER) {
|
|
|
+ DmpDevice device = deviceByDeviceType.get(deviceType);
|
|
|
+ if (device == null) {
|
|
|
+ continue; // 该租户无此类型设备,跳过
|
|
|
+ }
|
|
|
+
|
|
|
+ if (deviceType == 704 || deviceType == 705) {
|
|
|
+ if (electricalAdded) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ DmpDevice electricalDevice = deviceByDeviceType.getOrDefault(704, device);
|
|
|
+ for (AlarmType alarmType : AlarmType.getAlarmTypesByProductCode("704")) {
|
|
|
+ if (addedTypes.add(alarmType)) {
|
|
|
+ templates.add(buildAutoAlarmTemplate(alarmType, electricalDevice,
|
|
|
+ facilityMap.get(electricalDevice.getDeviceId()), engineeringId, alarmIdSeq++));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ electricalAdded = true;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (AlarmType alarmType : AlarmType.getAlarmTypesByProductCode(String.valueOf(deviceType))) {
|
|
|
+ if (addedTypes.add(alarmType)) {
|
|
|
+ templates.add(buildAutoAlarmTemplate(alarmType, device,
|
|
|
+ facilityMap.get(device.getDeviceId()), engineeringId, alarmIdSeq++));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return templates;
|
|
|
+ }
|
|
|
+
|
|
|
+ private AlarmMessageVO<Integer> buildAutoAlarmTemplate(AlarmType alarmType, DmpDevice device,
|
|
|
+ BaseBuildFacility facility, Long engineeringId,
|
|
|
+ int alarmId) {
|
|
|
+ AlarmMessageVO<Integer> vo = new AlarmMessageVO<>();
|
|
|
+ vo.setEngineeringID(engineeringId);
|
|
|
+ vo.setAlarmID(alarmId);
|
|
|
+ vo.setAlarmSource(1);
|
|
|
+ vo.setSensorID(parseSensorId(device.getDeviceId()));
|
|
|
+ vo.setAlarmType(alarmType.getMappedCode());
|
|
|
+ vo.setAlarmDesc(alarmType.getDescription());
|
|
|
+
|
|
|
+ if (facility != null) {
|
|
|
+ vo.setMonitorObjNo(facility.getFacilityNum());
|
|
|
+ if (alarmType == AlarmType.TILT || alarmType == AlarmType.CRACK || alarmType == AlarmType.DISPLACEMENT) {
|
|
|
+ vo.setUnitName(facility.getFacilityDesc() != null ? facility.getFacilityDesc()
|
|
|
+ : alarmType.getDefaultUnitName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ populateAutoSensorAndThreshold(vo, alarmType);
|
|
|
+ return vo;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void populateAutoSensorAndThreshold(AlarmMessageVO<Integer> vo, AlarmType alarmType) {
|
|
|
+ switch (alarmType) {
|
|
|
+ case WATER:
|
|
|
+ case PERSON_INTRUSION:
|
|
|
+ case TILT:
|
|
|
+ case CRACK:
|
|
|
+ case DISPLACEMENT:
|
|
|
+ vo.setSensorValue("1");
|
|
|
+ break;
|
|
|
+ case SEWAGE_LEVEL:
|
|
|
+ vo.setThresholding(1.15f);
|
|
|
+ vo.setSensorValue(formatSensorValue(1.15f + randomOffset(0.01f, 0.05f)));
|
|
|
+ break;
|
|
|
+ case AIR_TEMP:
|
|
|
+ vo.setThresholding(30f);
|
|
|
+ vo.setSensorValue(formatSensorValue(30f + randomOffset(0.5f, 2f)));
|
|
|
+ break;
|
|
|
+ case AIR_HUMIDITY:
|
|
|
+ vo.setThresholding(80f);
|
|
|
+ vo.setSensorValue(formatSensorValue(80f + randomOffset(2f, 5f)));
|
|
|
+ break;
|
|
|
+ case OXYGEN_LOW:
|
|
|
+ vo.setThresholding(19.5f);
|
|
|
+ vo.setSensorValue(formatSensorValue(19.5f - randomOffset(0.1f, 0.5f)));
|
|
|
+ break;
|
|
|
+ case CO2_HIGH:
|
|
|
+ // 上海市人防地下室:正常 CO2 约 0.03%~0.04%,告警阈值设为 0.05%
|
|
|
+ vo.setThresholding(0.05f);
|
|
|
+ vo.setSensorValue(formatSensorValue(0.05f + randomOffset(0.002f, 0.008f)));
|
|
|
+ break;
|
|
|
+ case CO_HIGH:
|
|
|
+ // 正常 CO 约 0.3~0.4ppm,地下室告警阈值设为 10ppm
|
|
|
+ vo.setThresholding(10.0f);
|
|
|
+ vo.setSensorValue(formatSensorValue(10.0f + randomOffset(0.05f, 0.25f)));
|
|
|
+ break;
|
|
|
+ case RESIDUAL_CURRENT:
|
|
|
+ vo.setThresholding(100f);
|
|
|
+ vo.setSensorValue(formatSensorValue(100f + randomOffset(5f, 15f)));
|
|
|
+ vo.setLineNo(randomLineNo());
|
|
|
+ break;
|
|
|
+ case CABLE_TEMP:
|
|
|
+ vo.setThresholding(60f);
|
|
|
+ vo.setSensorValue(formatSensorValue(60f + randomOffset(0.5f, 3f)));
|
|
|
+ vo.setLineNo(randomLineNo());
|
|
|
+ break;
|
|
|
+ case CURRENT_HIGH:
|
|
|
+ vo.setThresholding(100f);
|
|
|
+ vo.setSensorValue(formatSensorValue(100f + randomOffset(5f, 15f)));
|
|
|
+ vo.setLineNo(randomLineNo());
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ vo.setSensorValue("1");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, BaseBuildFacility> buildFacilityMapByDeviceIds(List<String> deviceIds, Integer tenantId) {
|
|
|
+ Map<String, BaseBuildFacility> facilityMap = new HashMap<>();
|
|
|
+ if (deviceIds.isEmpty()) {
|
|
|
+ return facilityMap;
|
|
|
+ }
|
|
|
+ LambdaQueryWrapper<BaseBuildFacility> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
+ queryWrapper.in(BaseBuildFacility::getDeviceId, deviceIds)
|
|
|
+ .eq(BaseBuildFacility::getTenantId, tenantId)
|
|
|
+ .eq(BaseBuildFacility::getDeleteFlag, 0);
|
|
|
+ List<BaseBuildFacility> facilities = baseBuildFacilityMapper.selectList(queryWrapper);
|
|
|
+ for (BaseBuildFacility facility : facilities) {
|
|
|
+ if (facility.getDeviceId() != null) {
|
|
|
+ facilityMap.put(facility.getDeviceId(), facility);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return facilityMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Integer parseSensorId(String deviceId) {
|
|
|
+ if (deviceId == null || deviceId.trim().isEmpty()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ return Integer.parseInt(deviceId.trim());
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.warn("无法解析设备ID为sensorID: {}", deviceId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String calcAutoAlarmUpdateTime(Date publishDate) {
|
|
|
+ Calendar cal = Calendar.getInstance();
|
|
|
+ cal.setTime(publishDate);
|
|
|
+ cal.add(Calendar.MINUTE, -1);
|
|
|
+ cal.add(Calendar.SECOND, -ThreadLocalRandom.current().nextInt(0, 60));
|
|
|
+ return timeFormat.format(cal.getTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ private float randomOffset(float min, float max) {
|
|
|
+ return (float) ThreadLocalRandom.current().nextDouble(min, max);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int randomLineNo() {
|
|
|
+ return ThreadLocalRandom.current().nextInt(1, 4);
|
|
|
+ }
|
|
|
+
|
|
|
+ private String formatSensorValue(float value) {
|
|
|
+ if (value == (int) value) {
|
|
|
+ return String.valueOf((int) value);
|
|
|
+ }
|
|
|
+ return String.format(Locale.US, "%.2f", value);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sleepForAutoPush() {
|
|
|
+ try {
|
|
|
+ Thread.sleep(AUTO_PUSH_INTERVAL_MS);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new BusinessException("自动告警推送被中断");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|