package com.usky.rule.subscribe; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.usky.common.core.bean.ApiResult; import com.usky.common.core.util.JsonUtils; import com.usky.common.security.utils.SecurityUtils; import com.usky.demo.RemoteTsdbProxyService; import com.usky.demo.domain.LastInnerQueryVO; import com.usky.demo.domain.LastInnerResultVO; import com.usky.rule.domain.RuleEngineDevice; import com.usky.rule.mapper.RuleEngineDeviceMapper; import com.usky.rule.util.DateTimeUtil; import com.usky.rule.util.JsonUtil; import com.usky.rule.domain.RuleEngine; import com.usky.rule.enums.ConstraintTypeEnum; import com.usky.rule.enums.TriggerTypeEnum; import com.usky.rule.enums.TriggerValueTypeEnum; import com.usky.rule.vo.RuleEngineDeviceVO; import com.usky.rule.vo.Condition; import com.usky.rule.vo.Expression; import com.usky.rule.vo.RuleEngineDetail; import com.usky.rule.vo.action.RuleEngineAction; import com.usky.rule.util.CronUtil; import com.usky.rule.vo.constraint.CronConstraint; import com.usky.rule.vo.constraint.DeviceConstraint; import com.usky.rule.vo.log.BaseLog; import com.usky.rule.vo.log.DeviceTriggerLog; import com.usky.rule.vo.log.RuleEngineDetailLog; import com.usky.rule.vo.trigger.DeviceTrigger; import com.usky.rule.vo.visualization.SimpleVO; import com.usky.rule.cache.DeviceAcqTriggerCooldownCache; import com.usky.rule.cache.DeviceTriggerIncludeMinuteCache; import com.usky.rule.util.RuleEngineCallBack; import com.usky.rule.util.RuleEngineUtil; import com.usky.rule.vo.ConditionExpression; //import com.leo.service.device.DeviceFunctionService; //import com.leo.service.device.DeviceService; import com.usky.rule.service.RuleEngineDeviceService; import com.usky.rule.service.RuleEngineService; import java.math.BigDecimal; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; import com.usky.rule.vo.log.CronTriggerLog; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.stereotype.Component; @Component @Slf4j public class TriggerDeviceUtil { private RuleEngineService ruleEngineService; private RuleEngineUtil ruleEngineUtil; private DeviceTriggerIncludeMinuteCache historyRecordCache; @Autowired private RuleEngineDeviceMapper ruleEngineDeviceMapper; @Autowired private RemoteTsdbProxyService remoteTsdbProxyService; @Autowired private DeviceAcqTriggerCooldownCache deviceAcqTriggerCooldownCache; /** 规则对同一设备成功执行动作后,在多少秒内忽略再次触发(0 表示不启用冷却)。 */ @Value("${rule.engine.device-acq-cooldown-seconds:300}") private long deviceAcqCooldownSeconds; private static final Integer maxNumberOfReminders = 1; private static final ExpressionParser parser = new SpelExpressionParser(); public void processMessage(String message) { LocalDateTime now = LocalDateTime.now(); Map valueMap = new HashMap<>(); Map map_data = JsonUtils.fromJson(message, Map.class); String deviceUuId = map_data.get("deviceUUId").toString(); Object met = JSONObject.toJSONString(map_data.get("metrics")); JSONObject metrics = JSON.parseObject(met.toString()); for(String entry : metrics.keySet()){ valueMap.put(entry.toLowerCase(),metrics.get(entry).toString()); } final LocalDateTime currDataTime = Optional.ofNullable(DateTimeUtil.parseMessageTimestamp(map_data.get("timestamp"))) .orElse(LocalDateTime.now()); List ruleEngineDeviceVOList = ruleEngineDeviceMapper.selectByDeviceUuid(deviceUuId); if (!ruleEngineDeviceVOList.isEmpty()) { log.info("enter ruleEngine deviceUuid {}",deviceUuId); ruleEngineDeviceVOList.forEach((ruleEngineDeviceVO) -> { Long ruleEngineId = ruleEngineDeviceVO.getRuleEngineId(); Long productId = ruleEngineDeviceVO.getProductId(); String deviceId = ruleEngineDeviceVO.getDeviceId(); RuleEngine ruleEngine = this.ruleEngineService.getById(ruleEngineId); if (ruleEngine != null && ruleEngine.getStatus() != 0) { if (!StringUtils.isBlank(ruleEngine.getDetail())) { RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class); List actions = this.ruleEngineService.getActions(ruleEngineDetail.getActions()); if (!actions.isEmpty()) { List deviceTriggers = this.ruleEngineService.getDeviceTriggers(ruleEngineDetail.getTriggers()); if (deviceTriggers != null) { deviceTriggers = (List)deviceTriggers.stream().filter((deviceTrigger) -> deviceTrigger.getMethod().equals("acq")).filter((deviceTrigger) -> deviceTrigger.getProductId().equals(productId)).filter((deviceTrigger) -> deviceTrigger.getDevices().stream().anyMatch((device) -> StringUtils.equals(device.getId(), deviceId) || StringUtils.isNotBlank(deviceUuId) && StringUtils.equals(device.getDeviceUuid(), deviceUuId))).collect(Collectors.toList()); if (!deviceTriggers.isEmpty()) { log.info("enter deviceTriggers {}",deviceTriggers); RuleEngineDetailLog ruleEngineDetailLog = new RuleEngineDetailLog(); List meetTriggerConditionList = new ArrayList(); Map meetMinuteExpressionMap = new HashMap(); boolean triggerAction = this.meetDeviceAcqTriggerAction(ruleEngineId, deviceId, currDataTime, valueMap, deviceTriggers, meetMinuteExpressionMap, meetTriggerConditionList); if (triggerAction) { if (this.deviceAcqCooldownSeconds > 0 && this.deviceAcqTriggerCooldownCache.isInCooldown(ruleEngineId, deviceId)) { log.debug("ruleEngineId={} deviceId={} skipped: within acq cooldown ({}s)", ruleEngineId, deviceId, this.deviceAcqCooldownSeconds); return; } log.info("triggerAction is true"); this.setTriggerLog(now, ruleEngineDetailLog, deviceId, deviceTriggers.get(0).getDevices().get(0).getName(), TriggerValueTypeEnum.ACQ.getValue(), TriggerTypeEnum.DEVICE, meetTriggerConditionList, valueMap); List cronConstraints = this.ruleEngineService.getCronConstraints(ruleEngineDetail.getConstraints()); List deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints()); boolean cronOk = this.meetCronConstraintAction(cronConstraints, ruleEngineDetailLog, currDataTime); boolean deviceOk = this.meetConstraintAction(deviceConstraints, ruleEngineDetailLog); if (cronOk && deviceOk) { log.info("ruleEngineId={} constraints satisfied, executing actions", ruleEngineId); this.ruleEngineUtil.performMultipleDevicesControl(ruleEngineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), ruleEngine.getSpaceId(), actions, ruleEngineDetailLog); this.clearMeetConditionCache(ruleEngineId, deviceId, meetMinuteExpressionMap); this.deviceAcqTriggerCooldownCache.startCooldown(ruleEngineId, deviceId, this.deviceAcqCooldownSeconds); } else if (!cronOk) { log.debug("ruleEngineId={} skipped: cron constraint not satisfied at {}", ruleEngineId, currDataTime); } } } } } } } }); } } private void clearMeetConditionCache(Long ruleEngineId, String deviceId, Map meetMinuteExpressionMap) { if (meetMinuteExpressionMap != null) { meetMinuteExpressionMap.forEach((identifier, exp) -> this.historyRecordCache.removeCondition(ruleEngineId, deviceId, identifier, exp)); } } public void setTriggerLog(LocalDateTime now, RuleEngineDetailLog ruleEngineDetailLog, String deviceId, String deviceName, String method, TriggerTypeEnum triggerTypeEnum, List meetTriggerConditionList, Map valueMap) { DeviceTriggerLog deviceTriggerLog = new DeviceTriggerLog(); deviceTriggerLog.setId(deviceId); deviceTriggerLog.setName(deviceName); deviceTriggerLog.setMethod(method); deviceTriggerLog.setTime(DateTimeUtil.format(now)); deviceTriggerLog.setConditions(meetTriggerConditionList); for(Condition condition : meetTriggerConditionList) { // condition.setName(this.deviceFunctionService.getName(deviceId, condition.getIdentifier())); condition.setValue((String)valueMap.get(condition.getIdentifier())); } BaseLog baseLog = new BaseLog(); baseLog.setType(triggerTypeEnum.getType()); baseLog.setDetail(deviceTriggerLog); List baseLogs = new ArrayList(); baseLogs.add(baseLog); ruleEngineDetailLog.setTriggers(baseLogs); } private boolean meetDeviceAcqTriggerAction(Long ruleEngineId, String deviceId, LocalDateTime currDataTime, Map valueMap, List deviceTriggers, Map meetMinuteExpressionMap, List meetTriggerConditionList) { StringBuilder booleanExpression = new StringBuilder(); for(int j = 0; j < deviceTriggers.size(); ++j) { List conditionList = ((DeviceTrigger)deviceTriggers.get(j)).getConditions(); if (j != 0) { booleanExpression.append("||"); } booleanExpression.append("("); for(int i = 0; i < conditionList.size(); ++i) { Condition cond = (Condition)conditionList.get(i); String condition = cond.getCondition(); String identifier = cond.getIdentifier(); if (valueMap.get(identifier.toLowerCase()) == null) { booleanExpression.append(false); } else { BigDecimal value = new BigDecimal((String)valueMap.get(identifier.toLowerCase())); Expression expression = cond.getExpression(); BigDecimal x = new BigDecimal(expression.getX()); BigDecimal y = null; if (expression.getY() != null) { y = new BigDecimal(expression.getY()); } Integer m = null; if (expression.getM() != null) { m = Integer.parseInt(expression.getM()); } boolean meetCondition = false; String express = null; switch (condition) { case "above_minute": express = ">" + x; if (value.compareTo(x) > 0) { meetCondition = this.isMeetMinuteCondition(ruleEngineId, deviceId, identifier, express, m, currDataTime); } else { this.historyRecordCache.removeCondition(ruleEngineId, deviceId, identifier, express); } break; case "below_minute": express = "<" + x; if (value.compareTo(x) < 0) { meetCondition = this.isMeetMinuteCondition(ruleEngineId, deviceId, identifier, express, m, currDataTime); } else { this.historyRecordCache.removeCondition(ruleEngineId, deviceId, identifier, express); } break; case "equal_minute": express = "=" + x; if (value.compareTo(x) == 0) { meetCondition = this.isMeetMinuteCondition(ruleEngineId, deviceId, identifier, express, m, currDataTime); } else { this.historyRecordCache.removeCondition(ruleEngineId, deviceId, identifier, express); } break; case "not_equal_minute": express = "!=" + x; if (value.compareTo(x) == 0) { meetCondition = this.isMeetMinuteCondition(ruleEngineId, deviceId, identifier, express, m, currDataTime); } else { this.historyRecordCache.removeCondition(ruleEngineId, deviceId, identifier, express); } break; case "between_minute": express = ">=" + x + "&&<=" + y; if (isBetweenValue(value, x, y)) { meetCondition = this.isMeetMinuteCondition(ruleEngineId, deviceId, identifier, express, m, currDataTime); } else { this.historyRecordCache.removeCondition(ruleEngineId, deviceId, identifier, express); } break; case "above": if (value.compareTo(x) > 0) { meetCondition = true; } break; case "below": if (value.compareTo(x) < 0) { meetCondition = true; } break; case "equal": if (value.compareTo(x) == 0) { meetCondition = true; } break; case "not_equal": if (value.compareTo(x) != 0) { meetCondition = true; } break; case "between": if (isBetweenValue(value, x, y)) { meetCondition = true; } } if (meetCondition) { meetTriggerConditionList.add(cond); meetMinuteExpressionMap.put(identifier, express); } String operator = cond.getOperator(); if (i != 0) { if (checkOperator(operator)) { throw new RuntimeException("operator的值必须是 && or ||"); } booleanExpression.append(operator); } booleanExpression.append(meetCondition); } } booleanExpression.append(")"); } return getBooleanExpressionValue(booleanExpression.toString()); } private void setMinuteCache(Long ruleEngineId, String deviceId, LocalDateTime currDataTime, String identifier, Integer m, String express) { ConditionExpression conditionExpression = getConditionExpression(express, m, currDataTime); this.historyRecordCache.setCondition(ruleEngineId, deviceId, identifier, conditionExpression); } public static RuleEngineCallBack, Void> getCallBackFunc(LocalDateTime currDataTime) { return (conditions) -> { if (!conditions.isEmpty()) { conditions.forEach((conditionExpression) -> conditionExpression.setLastMeetConditionTime(currDataTime)); } return null; }; } /** * 时间类约束(cron):在设备上报触发路径下与触发条件 AND。 * 使用消息时间戳 {@code effectiveTime}(缺省为当前时间)做 {@link CronExpression#isSatisfiedBy(Date)} 判断。 */ public boolean meetCronConstraintAction(List cronConstraints, RuleEngineDetailLog ruleEngineDetailLog, LocalDateTime effectiveTime) { if (cronConstraints == null || cronConstraints.isEmpty()) { return true; } LocalDateTime t = effectiveTime != null ? effectiveTime : LocalDateTime.now(); Date at = DateTimeUtil.localDateTimeToDate(t); for (CronConstraint cc : cronConstraints) { if (cc == null || StringUtils.isBlank(cc.getCron())) { return false; } if (!CronUtil.isCronSatisfiedBy(cc.getCron().trim(), at)) { return false; } } List baseLogs = ruleEngineDetailLog.getConstraints(); if (baseLogs == null) { baseLogs = new ArrayList(); ruleEngineDetailLog.setConstraints(baseLogs); } for (CronConstraint cc : cronConstraints) { CronTriggerLog cronConstraintLog = new CronTriggerLog(); cronConstraintLog.setCronExp(cc.getCron()); cronConstraintLog.setTime(DateTimeUtil.format(t)); BaseLog cronBaseLog = new BaseLog(); cronBaseLog.setDetail(cronConstraintLog); cronBaseLog.setType(ConstraintTypeEnum.CRON.getType()); baseLogs.add(cronBaseLog); } return true; } public boolean meetConstraintAction(List deviceConstraints, RuleEngineDetailLog ruleEngineDetailLog) { List deviceTriggerLogs = new ArrayList(); boolean constraintAction = true; if (deviceConstraints != null) { label49: for(DeviceConstraint deviceConstraint : deviceConstraints) { String deviceId = ((SimpleVO)deviceConstraint.getDevices().get(0)).getId(); String deviceUuid = deviceConstraint.getDevices().get(0).getDeviceUuid(); DeviceTriggerLog deviceTriggerLog = new DeviceTriggerLog(); List meetConstraintConditionList = new ArrayList(); deviceTriggerLog.setId(deviceId); deviceTriggerLog.setMethod("acq"); // deviceTriggerLog.setName(this.deviceService.getName(deviceId)); deviceTriggerLog.setConditions(meetConstraintConditionList); deviceTriggerLogs.add(deviceTriggerLog); List identifierConditions = deviceConstraint.getConditions(); StringBuilder boolConstraintExp = new StringBuilder(); for(int i = 0; i < identifierConditions.size(); ++i) { Condition identifierCondition = (Condition)identifierConditions.get(i); String operator = identifierCondition.getOperator(); if (i != 0) { if (checkOperator(operator)) { throw new RuntimeException("operator的值必须是 && or ||"); } boolConstraintExp.append(operator); if (checkIsFalse(boolConstraintExp.toString())) { constraintAction = false; break label49; } } String deviceIdentifier = identifierCondition.getIdentifier().toLowerCase(); //BigDecimal currValue = this.getCurrDeviceIdentifierValue(deviceId, deviceIdentifier); LastInnerQueryVO lastInnerQueryVO = new LastInnerQueryVO(); lastInnerQueryVO.setDeviceuuid(Collections.singletonList(deviceUuid)); lastInnerQueryVO.setMetrics(Collections.singletonList(deviceIdentifier)); ApiResult> lastApi = remoteTsdbProxyService.queryLastDeviceData(lastInnerQueryVO); List currValueList = lastApi != null && lastApi.getData() != null ? lastApi.getData() : Collections.emptyList(); String targetKey = lastInnerQueryVO.getMetrics().get(0); BigDecimal currValue = currValueList.stream() .filter(Objects::nonNull) .map(LastInnerResultVO::getMetrics) .filter(Objects::nonNull) .map(metrics -> metrics.get(targetKey)) .filter(Objects::nonNull) .map(String::valueOf) .map(val -> { try { return new BigDecimal(val); } catch (Exception e) { return null; } }) .filter(Objects::nonNull) .findFirst() .orElse(null); if (currValue == null) { boolConstraintExp.append(false); } else { String condition = identifierCondition.getCondition(); Expression expression = identifierCondition.getExpression(); boolean meetCondition = isMeetCondition(currValue, condition, expression.getX(), expression.getY()); if (meetCondition) { meetConstraintConditionList.add(identifierCondition); } boolConstraintExp.append(meetCondition); } } constraintAction = getBooleanExpressionValue(boolConstraintExp.toString()); if (!constraintAction) { break; } } } if (constraintAction && !deviceTriggerLogs.isEmpty()) { List existing = ruleEngineDetailLog.getConstraints(); final List baseLogs = existing != null ? existing : new ArrayList(); if (existing == null) { ruleEngineDetailLog.setConstraints(baseLogs); } deviceTriggerLogs.forEach((deviceConstraintLog) -> { BaseLog baseLog = new BaseLog(); baseLog.setType(ConstraintTypeEnum.DEVICE.getType()); baseLog.setDetail(deviceConstraintLog); baseLogs.add(baseLog); }); } return constraintAction; } public static boolean isMeetCondition(BigDecimal currValue, String condition, String X, String Y) { BigDecimal x = new BigDecimal(X); BigDecimal y = null; if (Y != null) { y = new BigDecimal(Y); } boolean meetCondition = false; switch (condition) { case "above": if (currValue.compareTo(x) > 0) { meetCondition = true; } break; case "below": if (currValue.compareTo(x) < 0) { meetCondition = true; } break; case "equal": if (currValue.compareTo(x) == 0) { meetCondition = true; } break; case "not_equal": if (currValue.compareTo(x) != 0) { meetCondition = true; } break; case "between": if (isBetweenValue(currValue, x, y)) { meetCondition = true; } } return meetCondition; } public static boolean isMeetConsumptionCondition(BigDecimal currValue, String condition, String X) { BigDecimal x = new BigDecimal(X); boolean meetCondition = false; switch (condition) { case "above": if (currValue.compareTo(x) > 0) { meetCondition = true; } break; case "above_equal": if (currValue.compareTo(x) >= 0) { meetCondition = true; } break; case "below": if (currValue.compareTo(x) < 0) { meetCondition = true; } break; case "below_equal": if (currValue.compareTo(x) <= 0) { meetCondition = true; } } return meetCondition; } public static boolean checkIsFalse(String boolConstraintExp) { return !getBooleanExpressionValue(boolConstraintExp + "true"); } public BigDecimal getCurrDeviceIdentifierValue(String id, String deviceIdentifier) { // String value = this.deviceFunctionService.getCurrDeviceIdentifierValue(id, deviceIdentifier); // return (BigDecimal)Optional.ofNullable(value).map(BigDecimal::new).orElse((Object)null); return null; } private static boolean isBetweenValue(BigDecimal value, BigDecimal x, BigDecimal y) { return value.compareTo(x) >= 0 && value.compareTo(y) <= 0; } public static boolean checkOperator(String operator) { return !operator.matches("(\\|\\||&&)"); } public static boolean getBooleanExpressionValue(String booleanExpression) { return StringUtils.isBlank(booleanExpression) ? false : Boolean.TRUE.equals(parser.parseExpression(booleanExpression).getValue(Boolean.class)); } public Boolean isMeetMinuteCondition(Long ruleEngineId, String deviceId, String identifier, String expression, Integer overMinutes, LocalDateTime currDataTime) { ConditionExpression condition = this.historyRecordCache.getConditions(ruleEngineId, deviceId, identifier, expression); if (condition == null) { this.setMinuteCache(ruleEngineId, deviceId, currDataTime, identifier, overMinutes, expression); return false; } else { LocalDateTime lastMeetConditionTime = condition.getLastMeetConditionTime(); long actualOverMinutes = Duration.between(lastMeetConditionTime, currDataTime).toMinutes(); Integer actionNumbers = (Integer)Optional.ofNullable(condition.getNumberOfReminders()).orElse(0); return actionNumbers < maxNumberOfReminders && actualOverMinutes > (long)overMinutes; } } private static ConditionExpression getConditionExpression(String expression, Integer overMinutes, LocalDateTime currDataTime) { ConditionExpression conditionExpression = new ConditionExpression(); conditionExpression.setExpression(expression); conditionExpression.setOverMinutes(overMinutes); conditionExpression.setNumberOfReminders(0); conditionExpression.setLastMeetConditionTime(currDataTime); return conditionExpression; } public TriggerDeviceUtil(final RuleEngineService ruleEngineService, final RuleEngineUtil ruleEngineUtil, final DeviceTriggerIncludeMinuteCache historyRecordCache) { this.ruleEngineService = ruleEngineService; this.ruleEngineUtil = ruleEngineUtil; this.historyRecordCache = historyRecordCache; } }