package com.usky.rule.jobs; import com.usky.common.core.bean.ApiResult; import com.usky.common.security.utils.SecurityUtils; import com.usky.demo.RemoteTsdbProxyService; import com.usky.demo.domain.HistorysInnerRequestVO; import com.usky.demo.domain.HistorysInnerResultVO; import com.usky.demo.domain.MetricVO; import com.usky.rule.domain.RuleEngine; import com.usky.rule.enums.TimeTypeEnum; import com.usky.rule.enums.TriggerTypeEnum; import com.usky.rule.enums.TriggerValueTypeEnum; import com.usky.rule.util.JsonUtil; import com.usky.rule.vo.DataPointVO; import com.usky.rule.vo.Condition; import com.usky.rule.vo.Expression; import com.usky.rule.vo.RuleEngineDetail; import com.usky.rule.vo.TimeRange; import com.usky.rule.vo.action.RuleEngineAction; import com.usky.rule.vo.constraint.DeviceConstraint; import com.usky.rule.vo.log.RuleEngineDetailLog; import com.usky.rule.vo.trigger.DeviceTrigger; import com.usky.rule.vo.visualization.SimpleVO; import com.usky.rule.subscribe.TriggerDeviceUtil; import com.usky.rule.util.RuleEngineUtil; //import com.usky.rule.DeviceService; import com.usky.rule.service.RuleEngineService; import java.math.BigDecimal; import java.time.DayOfWeek; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.*; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.cache2k.Cache; import org.cache2k.CacheEntry; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @DisallowConcurrentExecution @Slf4j public class ConsumptionJob implements Job { private static final DateTimeFormatter CONSUMPTION_TIME_DISPLAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Resource private Cache> consumptionTriggerCache; @Resource private TriggerDeviceUtil triggerDeviceUtil; @Resource private RuleEngineUtil ruleEngineUtil; @Autowired private RemoteTsdbProxyService remoteTsdbProxyService; @Resource private RuleEngineService ruleEngineService; public ConsumptionJob() { } public void execute(JobExecutionContext context) throws JobExecutionException { String[] times = new String[2]; LocalDateTime now = LocalDateTime.now(); for(CacheEntry> entry : this.consumptionTriggerCache.entries()) { log.info("consumptionTriggerCache: {}, now: {}", entry.getKey(), now); Long engineId = (Long)entry.getKey(); RuleEngine ruleEngine = (RuleEngine)this.ruleEngineService.getById(engineId); log.info("ruleEngine: {}", ruleEngine); if (ruleEngine == null || ruleEngine.getStatus() == null || ruleEngine.getStatus() != 1) { continue; } Long spaceId = ruleEngine.getSpaceId(); RuleEngineDetail ruleEngineDetail = (RuleEngineDetail) JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class); List actions = this.ruleEngineService.getActions(ruleEngineDetail.getActions()); if (!actions.isEmpty()) { RuleEngineDetailLog ruleEngineDetailLog = new RuleEngineDetailLog(); boolean spaceTriggerAction = false; label77: for(DeviceTrigger trigger : entry.getValue()) { if (!CollectionUtils.isEmpty(trigger.getDevices())) { label75: for(SimpleVO device : trigger.getDevices()) { List meetTriggerConditionList = new ArrayList(); List triggerConditions = trigger.getConditions(); StringBuilder boolConstraintExp = new StringBuilder(); Map valueMap = new HashMap(); boolConstraintExp.append("("); for(int i = 0; i < triggerConditions.size(); ++i) { Condition triggerCondition = (Condition)triggerConditions.get(i); String identifier = triggerCondition.getIdentifier(); Expression expression = triggerCondition.getExpression(); TimeRange timeRange = triggerCondition.getTimeRange(); String valueCondition = triggerCondition.getCondition(); String operator = triggerCondition.getOperator(); if (i != 0 && preAssertFalse(boolConstraintExp, operator)) { continue label75; } initStartTimeAndEndTime(now, times, timeRange); HistorysInnerRequestVO requestVO = new HistorysInnerRequestVO(); requestVO.setDeviceuuid(Collections.singletonList(device.getDeviceUuid())); requestVO.setMetrics(Collections.singletonList(identifier.toLowerCase())); requestVO.setStartTime(times[0]); requestVO.setEndTime(times[1]); ApiResult> historyApi = remoteTsdbProxyService.queryHistoryDeviceData(requestVO); List result = historyApi != null && historyApi.getData() != null ? historyApi.getData() : Collections.emptyList(); List dataPointVOList = findDataPointsForMetric(result, device.getDeviceUuid(), identifier.toLowerCase()); if (dataPointVOList.isEmpty()) { boolConstraintExp.append(false); } else { DataPointVO first = dataPointVOList.get(0); DataPointVO last = dataPointVOList.get(dataPointVOList.size() - 1); if (first.getValue() == null || last.getValue() == null) { boolConstraintExp.append(false); } else { BigDecimal currValue = last.getValue().subtract(first.getValue()); boolean meetCondition = TriggerDeviceUtil.isMeetConsumptionCondition(currValue, valueCondition, expression.getX()); if (meetCondition) { meetTriggerConditionList.add(triggerCondition); valueMap.put(identifier, currValue.toString()); } boolConstraintExp.append(meetCondition); } } } spaceTriggerAction = TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + ")"); if (spaceTriggerAction) { this.triggerDeviceUtil.setTriggerLog(now, ruleEngineDetailLog, device.getId(), device.getName(), TriggerValueTypeEnum.CONSUMPTION.getValue(), TriggerTypeEnum.DEVICE, meetTriggerConditionList, valueMap); break label77; } } } } if (spaceTriggerAction) { log.info("consumptionJob spaceTriggerAction true"); boolean cronOk = this.triggerDeviceUtil.meetCronConstraintAction( this.ruleEngineService.getCronConstraints(ruleEngineDetail.getConstraints()), ruleEngineDetailLog, now); List deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints()); boolean deviceOk = this.triggerDeviceUtil.meetConstraintAction(deviceConstraints, ruleEngineDetailLog); if (cronOk && deviceOk) { log.info("ConsumptionJob constraints satisfied engineId: {}, actions: {}", engineId, actions); this.ruleEngineUtil.performMultipleDevicesControl(engineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), spaceId, actions, ruleEngineDetailLog); } } } } } public static void initStartTimeAndEndTime(LocalDateTime now, String[] times, TimeRange timeRange) { Integer start = timeRange.getStart(); Integer end = timeRange.getEnd(); Assert.notNull(start, "start不能为空"); Assert.notNull(end, "end不能为空"); Assert.isTrue(end > start, "结束时间必须大于起始时间"); LocalDateTime startTime; LocalDateTime endTime; TimeTypeEnum typeEnum = TimeTypeEnum.get(timeRange.getType()); Assert.notNull(typeEnum, "不支持的时间类型"); // 老版本传统 switch 格式 switch (typeEnum) { case HOUR: // 支持跨天小时 22~26 LocalDateTime todayZeroHour = now.truncatedTo(ChronoUnit.DAYS); startTime = todayZeroHour.withHour(start); if (end >= 24) { endTime = todayZeroHour.plusDays(1).withHour(end - 24); } else { endTime = todayZeroHour.withHour(end); } if (now.isBefore(startTime)) { startTime = startTime.minusDays(1); endTime = endTime.minusDays(1); } break; case DAY: // 支持按当月实际天数跨月:start=5, end=33 LocalDateTime todayZeroDay = now.truncatedTo(ChronoUnit.DAYS); startTime = todayZeroDay.withDayOfMonth(start); // 结束时间 = 开始时间 + (end-1)天 endTime = startTime.plusDays(end - 1); if (now.isBefore(startTime)) { startTime = startTime.minusMonths(1); endTime = endTime.minusMonths(1); } break; case WEEK: LocalDateTime todayZeroWeek = now.truncatedTo(ChronoUnit.DAYS); // 开始时间:本周 星期start(1=周一) DayOfWeek todayWeek = todayZeroWeek.getDayOfWeek(); int weekVal = todayWeek.getValue(); startTime = todayZeroWeek.plusDays((long) start - weekVal); // 结束时间:从开始时间 直接往后加 (end - start) 天 // 支持 end > 7,自动跨周、跨N周 endTime = startTime.plusDays(end - start); // 如果还没到当前周期 → 取上一周 if (now.isBefore(startTime)) { startTime = startTime.minusWeeks(1); endTime = endTime.minusWeeks(1); } break; default: throw new IllegalArgumentException("不支持的时间类型: " + typeEnum); } times[0] = startTime.format(CONSUMPTION_TIME_DISPLAY); times[1] = endTime.format(CONSUMPTION_TIME_DISPLAY); log.debug("initStartTimeAndEndTime type={} startTime={} endTime={}", typeEnum, times[0], times[1]); } public static boolean preAssertFalse(StringBuilder boolConstraintExp, String operator) { if (TriggerDeviceUtil.checkOperator(operator)) { throw new RuntimeException("operator的值必须是 && or ||"); } else { boolConstraintExp.append(" ").append(operator).append(" "); return !TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + "true)"); } } private static List findDataPointsForMetric(List historyList, String deviceUuid, String metricIdentifier) { if (CollectionUtils.isEmpty(historyList)) { return Collections.emptyList(); } HistorysInnerResultVO row = null; for (HistorysInnerResultVO vo : historyList) { if (vo != null && deviceUuid != null && deviceUuid.equals(vo.getDeviceuuid())) { row = vo; break; } } if (row == null) { row = historyList.get(0); } List metrics = row.getMetrics(); if (CollectionUtils.isEmpty(metrics)) { return Collections.emptyList(); } for (MetricVO m : metrics) { if (m != null && metricIdentifier != null && metricIdentifier.equals(m.getMetric())) { return metricItemsToDataPoints(m.getMetricItems()); } } return Collections.emptyList(); } private static List metricItemsToDataPoints(List> metricItems) { if (CollectionUtils.isEmpty(metricItems)) { return Collections.emptyList(); } List out = new ArrayList<>(metricItems.size()); for (Map item : metricItems) { if (item == null) { continue; } Object ts = item.get("timestamp"); Object val = item.get("value"); if (ts == null || val == null) { continue; } BigDecimal num = toBigDecimal(val); if (num == null) { continue; } out.add(new DataPointVO(ts.toString(), num)); } return out; } private static BigDecimal toBigDecimal(Object val) { if (val instanceof BigDecimal) { return (BigDecimal) val; } if (val instanceof Number) { return BigDecimal.valueOf(((Number) val).doubleValue()); } try { return new BigDecimal(val.toString().trim()); } catch (Exception e) { return null; } } }