Преглед на файлове

添加任务分布式锁

fuyuchuan преди 8 часа
родител
ревизия
81cb1e25b3
променени са 1 файла, в които са добавени 61 реда и са изтрити 22 реда
  1. 61 22
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/AlarmDataSyncService.java

+ 61 - 22
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/AlarmDataSyncService.java

@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -48,6 +49,8 @@ public class AlarmDataSyncService {
     private static final String WARTIME = "wartime";
     private static final String MQTT_TOPIC = "alarm/message";
     private static final String ALARM_DELIVERY_KEY_PREFIX = "alarm:delivery:";
+    private static final String ALARM_LOCK_KEY_PREFIX = "alarm:lock:";
+    private static final long LOCK_EXPIRE_TIME = 1 * 60 * 1000; // 锁过期时间1分钟
     private final MqttConnectionTool mqttConnectionTool;
 
     @Autowired
@@ -80,36 +83,69 @@ public class AlarmDataSyncService {
         return idGenerator.nextPacketId();
     }
 
+    /**
+     * 尝试获取分布式锁
+     * 使用Redis的setIfAbsent原子操作,确保线程安全
+     *
+     * @param lockKey 锁的Key
+     * @return 是否获取到锁
+     */
+    private boolean tryLock(String lockKey) {
+        // 使用setIfAbsent原子操作设置锁,同时设置过期时间,避免死锁
+        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE_TIME, TimeUnit.MILLISECONDS);
+        return Boolean.TRUE.equals(result);
+    }
+
+    /**
+     * 释放分布式锁
+     *
+     * @param lockKey 锁的Key
+     */
+    private void releaseLock(String lockKey) {
+        // 直接删除锁Key
+        stringRedisTemplate.delete(lockKey);
+    }
+
     @Async
     public void synchronizeAlarmData(Integer tenantId, Long engineeringId, String username, String password, String status) {
-        log.info("租户:{}的人防告警数据推送定时任务开始执行,平战时状态:{}", tenantId, PEACETIME.equals(status) ? "平时" : "战时");
-
+        // 生成锁key,基于租户ID,确保同一租户同一时间只能执行一个任务
+        String lockKey = ALARM_LOCK_KEY_PREFIX + tenantId;
+        
+        // 尝试获取锁
+        if (!tryLock(lockKey)) {
+            log.info("租户:{}的告警数据推送任务正在执行中,本次请求跳过", tenantId);
+            return;
+        }
+        
         Long startTime = System.currentTimeMillis();
-        log.info("开始时间:{}", getCurrentTime());
+        
+        try {
+            log.info("租户:{}的人防告警数据推送定时任务开始执行,平战时状态:{}", tenantId, PEACETIME.equals(status) ? "平时" : "战时");
 
-        // 1.查询 base_alarm 表中的告警数据,筛选条件为 tenant_id = tenantId
-        LambdaQueryWrapper<BaseAlarm> queryWrapper = new LambdaQueryWrapper<>();
-        queryWrapper.eq(BaseAlarm::getTenantId, tenantId)
-                .eq(BaseAlarm::getAlarmGrade, 1);
-        List<BaseAlarm> alarmList = baseAlarmMapper.selectList(queryWrapper);
+            log.info("开始时间:{}", getCurrentTime());
 
-        if (alarmList.isEmpty()) {
-            log.warn("租户{}没有新的告警数据,任务结束", tenantId);
-            return;
-        }
-        log.info("查询到租户{}的告警数据总数:{}", tenantId, alarmList.size());
+            // 1.查询 base_alarm 表中的告警数据,筛选条件为 tenant_id = tenantId
+            LambdaQueryWrapper<BaseAlarm> queryWrapper = new LambdaQueryWrapper<>();
+            queryWrapper.eq(BaseAlarm::getTenantId, tenantId)
+                    .eq(BaseAlarm::getAlarmGrade, 1);
+            List<BaseAlarm> alarmList = baseAlarmMapper.selectList(queryWrapper);
+
+            if (alarmList.isEmpty()) {
+                log.warn("租户{}没有新的告警数据,任务结束", tenantId);
+                return;
+            }
+            log.info("查询到租户{}的告警数据总数:{}", tenantId, alarmList.size());
 
-        // 批量查询建筑设施数据,构建设备ID到设施的映射
-        Map<String, BaseBuildFacility> facilityMap = buildFacilityMap(alarmList, tenantId);
+            // 批量查询建筑设施数据,构建设备ID到设施的映射
+            Map<String, BaseBuildFacility> facilityMap = buildFacilityMap(alarmList, tenantId);
 
-        // 记录每一条告警数据
-        for (BaseAlarm alarm : alarmList) {
-            log.info("告警数据:ID={}, 设备ID={}, 告警类型={}, 告警时间={}, 处理状态={}, 是否误报={}",
-                    alarm.getId(), alarm.getDeviceId(), alarm.getAlarmType(),
-                    alarm.getAlarmTime(), alarm.getHandleStatus(), alarm.getAlarmFalse());
-        }
+            // 记录每一条告警数据
+            for (BaseAlarm alarm : alarmList) {
+                log.info("告警数据:ID={}, 设备ID={}, 告警类型={}, 告警时间={}, 处理状态={}, 是否误报={}",
+                        alarm.getId(), alarm.getDeviceId(), alarm.getAlarmType(),
+                        alarm.getAlarmTime(), alarm.getHandleStatus(), alarm.getAlarmFalse());
+            }
 
-        try {
             // 2.创建MQTT连接
             mqttConnectionTool.connectOrRefresh(username, password);
 
@@ -194,6 +230,9 @@ public class AlarmDataSyncService {
         } catch (Exception e) {
             log.error("租户{}的告警数据推送定时任务执行失败:{}", tenantId, e.getMessage(), e);
         } finally {
+            // 释放锁
+            releaseLock(lockKey);
+            
             Long endTime = System.currentTimeMillis();
             log.info("结束时间:{}, 耗时:{}ms", getCurrentTime(), endTime - startTime);
         }