Przeglądaj źródła

订阅消息时增加定时约束条件的判断

james 6 dni temu
rodzic
commit
bfe41a0f09

+ 7 - 3
service-rule/service-rule-biz/src/main/java/com/usky/rule/jobs/ConsumptionJob.java

@@ -139,11 +139,15 @@ public class ConsumptionJob implements Job {
 
                 if (spaceTriggerAction) {
                     log.info("consumptionJob spaceTriggerAction true");
+                    boolean cronOk = this.triggerDeviceUtil.meetCronConstraintAction(
+                            this.ruleEngineService.getCronConstraints(ruleEngineDetail.getConstraints()),
+                            ruleEngineDetailLog,
+                            now);
                     List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints());
-                    boolean constraintAction = this.triggerDeviceUtil.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
-                    if (constraintAction) {
+                    boolean deviceOk = this.triggerDeviceUtil.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
+                    if (cronOk && deviceOk) {
 
-                        log.info("ConsumptionJob constraintAction engineId: {}, actions: {}", engineId, actions);
+                        log.info("ConsumptionJob constraints satisfied engineId: {}, actions: {}", engineId, actions);
                         this.ruleEngineUtil.performMultipleDevicesControl(engineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), spaceId, actions, ruleEngineDetailLog);
                     }
                 }

+ 5 - 6
service-rule/service-rule-biz/src/main/java/com/usky/rule/listeners/CommonListener.java

@@ -11,10 +11,8 @@ import com.usky.rule.vo.log.RuleEngineDetailLog;
 import com.usky.rule.subscribe.TriggerDeviceUtil;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
-
-import com.usky.rule.vo.log.CronTriggerLog;
-import com.usky.rule.vo.log.RuleEngineDetailLog;
 import org.quartz.CronTrigger;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -47,8 +45,9 @@ public class CommonListener implements JobListener {
         ruleEngineDetailLog.setConstraints(constraintLogs);
         LOGGER.info("Job {} is about to be executed", context.getJobDetail().getKey());
         if (this.cronConstraintList != null) {
-            for(CronConstraint cronConstraint : this.cronConstraintList) {
-                if (CronUtil.isCronMatched(cronConstraint.getCron())) {
+            Date evalAt = DateTimeUtil.localDateTimeToDate(now);
+            for (CronConstraint cronConstraint : this.cronConstraintList) {
+                if (!CronUtil.isCronSatisfiedBy(cronConstraint.getCron(), evalAt)) {
                     throw new RuntimeException(" cron condition not met, aborting job execution.");
                 }
 
@@ -62,7 +61,7 @@ public class CommonListener implements JobListener {
             }
         }
 
-        if (this.deviceConstraintList != null && this.triggerDeviceUtil.meetConstraintAction(this.deviceConstraintList, ruleEngineDetailLog)) {
+        if (this.deviceConstraintList != null && !this.triggerDeviceUtil.meetConstraintAction(this.deviceConstraintList, ruleEngineDetailLog)) {
             throw new RuntimeException("device constraint condition not met, aborting job execution.");
         } else {
             context.put("detail", ruleEngineDetailLog);

+ 2 - 2
service-rule/service-rule-biz/src/main/java/com/usky/rule/service/impl/RuleEngineServiceImpl.java

@@ -582,8 +582,8 @@ public class RuleEngineServiceImpl extends AbstractCrudService<RuleEngineMapper,
             List<CronConstraint> cronConstraintList = new ArrayList();
             Class<?> clazz = (Class)constraintMap.get(ConstraintTypeEnum.CRON.getType());
             commonVOList.forEach((commonVO) -> {
-                if (TriggerTypeEnum.CRON.getType().equals(commonVO.getType())) {
-                    cronConstraintList.add((CronConstraint)JsonUtil.toObject(commonVO.getDetail(), clazz));
+                if (ConstraintTypeEnum.CRON.getType().equals(commonVO.getType())) {
+                    cronConstraintList.add((CronConstraint)JsonUtil.toObject(JsonUtil.IGNORE_UNKNOWN_PROPERTIES_JSON_MAPPER, commonVO.getDetail(), clazz));
                 }
 
             });

+ 50 - 6
service-rule/service-rule-biz/src/main/java/com/usky/rule/subscribe/TriggerDeviceUtil.java

@@ -20,6 +20,8 @@ import com.usky.rule.vo.Condition;
 import com.usky.rule.vo.Expression;
 import com.usky.rule.vo.RuleEngineDetail;
 import com.usky.rule.vo.action.RuleEngineAction;
+import com.usky.rule.util.CronUtil;
+import com.usky.rule.vo.constraint.CronConstraint;
 import com.usky.rule.vo.constraint.DeviceConstraint;
 import com.usky.rule.vo.log.BaseLog;
 import com.usky.rule.vo.log.DeviceTriggerLog;
@@ -40,7 +42,7 @@ import java.time.LocalDateTime;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import com.usky.rule.vo.log.BaseLog;
+import com.usky.rule.vo.log.CronTriggerLog;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -102,12 +104,16 @@ public class TriggerDeviceUtil {
                                     if (triggerAction) {
                                         log.info("triggerAction is true");
                                         this.setTriggerLog(now, ruleEngineDetailLog, deviceId, null, TriggerValueTypeEnum.ACQ.getValue(), TriggerTypeEnum.DEVICE, meetTriggerConditionList, valueMap);
+                                        List<CronConstraint> cronConstraints = this.ruleEngineService.getCronConstraints(ruleEngineDetail.getConstraints());
                                         List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints());
-                                        boolean constraintAction = this.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
-                                        if (constraintAction) {
-                                            log.info("constraintAction is true");
+                                        boolean cronOk = this.meetCronConstraintAction(cronConstraints, ruleEngineDetailLog, currDataTime);
+                                        boolean deviceOk = this.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
+                                        if (cronOk && deviceOk) {
+                                            log.info("ruleEngineId={} constraints satisfied, executing actions", ruleEngineId);
                                             this.ruleEngineUtil.performMultipleDevicesControl(ruleEngineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), ruleEngine.getSpaceId(), actions, ruleEngineDetailLog);
                                             this.clearMeetConditionCache(ruleEngineId, deviceId, meetMinuteExpressionMap);
+                                        } else if (!cronOk) {
+                                            log.debug("ruleEngineId={} skipped: cron constraint not satisfied at {}", ruleEngineId, currDataTime);
                                         }
                                     }
 
@@ -287,6 +293,41 @@ public class TriggerDeviceUtil {
         };
     }
 
+    /**
+     * 时间类约束(cron):在设备上报触发路径下与触发条件 AND。
+     * 使用消息时间戳 {@code effectiveTime}(缺省为当前时间)做 {@link CronExpression#isSatisfiedBy(Date)} 判断。
+     */
+    public boolean meetCronConstraintAction(List<CronConstraint> cronConstraints, RuleEngineDetailLog ruleEngineDetailLog, LocalDateTime effectiveTime) {
+        if (cronConstraints == null || cronConstraints.isEmpty()) {
+            return true;
+        }
+        LocalDateTime t = effectiveTime != null ? effectiveTime : LocalDateTime.now();
+        Date at = DateTimeUtil.localDateTimeToDate(t);
+        for (CronConstraint cc : cronConstraints) {
+            if (cc == null || StringUtils.isBlank(cc.getCron())) {
+                return false;
+            }
+            if (!CronUtil.isCronSatisfiedBy(cc.getCron().trim(), at)) {
+                return false;
+            }
+        }
+        List<BaseLog> baseLogs = ruleEngineDetailLog.getConstraints();
+        if (baseLogs == null) {
+            baseLogs = new ArrayList();
+            ruleEngineDetailLog.setConstraints(baseLogs);
+        }
+        for (CronConstraint cc : cronConstraints) {
+            CronTriggerLog cronConstraintLog = new CronTriggerLog();
+            cronConstraintLog.setCronExp(cc.getCron());
+            cronConstraintLog.setTime(DateTimeUtil.format(t));
+            BaseLog cronBaseLog = new BaseLog();
+            cronBaseLog.setDetail(cronConstraintLog);
+            cronBaseLog.setType(ConstraintTypeEnum.CRON.getType());
+            baseLogs.add(cronBaseLog);
+        }
+        return true;
+    }
+
     public boolean meetConstraintAction(List<DeviceConstraint> deviceConstraints, RuleEngineDetailLog ruleEngineDetailLog) {
         List<DeviceTriggerLog> deviceTriggerLogs = new ArrayList();
         boolean constraintAction = true;
@@ -358,14 +399,17 @@ public class TriggerDeviceUtil {
         }
 
         if (constraintAction && !deviceTriggerLogs.isEmpty()) {
-            List<BaseLog> baseLogs = new ArrayList();
+            List<BaseLog> existing = ruleEngineDetailLog.getConstraints();
+            final List<BaseLog> baseLogs = existing != null ? existing : new ArrayList();
+            if (existing == null) {
+                ruleEngineDetailLog.setConstraints(baseLogs);
+            }
             deviceTriggerLogs.forEach((deviceConstraintLog) -> {
                 BaseLog baseLog = new BaseLog();
                 baseLog.setType(ConstraintTypeEnum.DEVICE.getType());
                 baseLog.setDetail(deviceConstraintLog);
                 baseLogs.add(baseLog);
             });
-            ruleEngineDetailLog.setConstraints(baseLogs);
         }
 
         return constraintAction;

+ 44 - 0
service-rule/service-rule-biz/src/main/java/com/usky/rule/util/CronUtil.java

@@ -2,13 +2,57 @@ package com.usky.rule.util;
 
 import com.usky.rule.exception.BizException;
 import java.text.ParseException;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.stream.Collectors;
 import org.quartz.CronExpression;
 
 public class CronUtil {
     public CronUtil() {
     }
 
+    /**
+     * 将前端 / 配置中的 cron 规范为 Quartz {@link CronExpression} 可解析形式:
+     * <ul>
+     *   <li>5 段(分 时 日 月 周)→ 前补秒字段 {@code 0}</li>
+     *   <li>7 段且末段为全年通配({@code *} 或 {@code ?})→ 去掉年字段,按 6 段解析</li>
+     * </ul>
+     */
+    public static String normalizeCronExpression(String cron) {
+        if (cron == null || cron.trim().isEmpty()) {
+            return "0 0 * * * ?";
+        }
+        String s = cron.trim();
+        String[] parts = s.split("\\s+");
+        if (parts.length == 5) {
+            return "0 " + s;
+        }
+        if (parts.length == 7) {
+            String last = parts[6];
+            if ("*".equals(last) || "?".equals(last)) {
+                return Arrays.stream(parts, 0, 6).collect(Collectors.joining(" "));
+            }
+        }
+        return s;
+    }
+
+    /**
+     * 判断给定时刻是否落在 cron 调度点上(用于规则「时间范围 / 时间表达式」约束)。
+     * 与 {@link #isCronMatched(String)} 不同:后者仅根据是否存在「下一次触发时间」判断,无法表达「当前是否满足」。
+     */
+    public static boolean isCronSatisfiedBy(String cronExpression, Date at) {
+        if (cronExpression == null || cronExpression.trim().isEmpty() || at == null) {
+            return false;
+        }
+        CronExpression cron;
+        try {
+            cron = new CronExpression(normalizeCronExpression(cronExpression.trim()));
+        } catch (ParseException e) {
+            throw new BizException(cronExpression + "不是一个正确的cron表达式");
+        }
+        return cron.isSatisfiedBy(at);
+    }
+
     public static boolean isCronExpired(String cronExpression) {
         CronExpression cron;
         try {

+ 1 - 1
service-rule/service-rule-biz/src/main/java/com/usky/rule/util/RuleEngineUtil.java

@@ -93,7 +93,7 @@ public class RuleEngineUtil {
             ruleEngineLog.setCreatedBy("admin");
             ruleEngineLog.setUpdatedBy("admin");
             ruleEngineLog.setTenantId(1);
-            if(SecurityUtils.getUsername() != null) {
+            if(!SecurityUtils.getUsername().isEmpty()) {
                 ruleEngineLog.setCreatedBy(SecurityUtils.getUsername());
                 ruleEngineLog.setUpdatedBy(SecurityUtils.getUsername());
                 ruleEngineLog.setTenantId(SecurityUtils.getTenantId());