| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- package com.usky.rule.jobs;
- import com.usky.common.core.bean.ApiResult;
- import com.usky.common.security.utils.SecurityUtils;
- import com.usky.demo.RemoteTsdbProxyService;
- import com.usky.demo.domain.HistorysInnerRequestVO;
- import com.usky.demo.domain.HistorysInnerResultVO;
- import com.usky.demo.domain.MetricVO;
- import com.usky.rule.domain.RuleEngine;
- import com.usky.rule.enums.TimeTypeEnum;
- import com.usky.rule.enums.TriggerTypeEnum;
- import com.usky.rule.enums.TriggerValueTypeEnum;
- import com.usky.rule.util.JsonUtil;
- import com.usky.rule.vo.DataPointVO;
- import com.usky.rule.vo.Condition;
- import com.usky.rule.vo.Expression;
- import com.usky.rule.vo.RuleEngineDetail;
- import com.usky.rule.vo.TimeRange;
- import com.usky.rule.vo.action.RuleEngineAction;
- import com.usky.rule.vo.constraint.DeviceConstraint;
- import com.usky.rule.vo.log.RuleEngineDetailLog;
- import com.usky.rule.vo.trigger.DeviceTrigger;
- import com.usky.rule.vo.visualization.SimpleVO;
- import com.usky.rule.subscribe.TriggerDeviceUtil;
- import com.usky.rule.util.RuleEngineUtil;
- //import com.usky.rule.DeviceService;
- import com.usky.rule.service.RuleEngineService;
- import java.math.BigDecimal;
- import java.time.DayOfWeek;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.time.temporal.ChronoUnit;
- import java.util.*;
- import javax.annotation.Resource;
- import lombok.extern.slf4j.Slf4j;
- import org.cache2k.Cache;
- import org.cache2k.CacheEntry;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.Job;
- import org.quartz.JobExecutionContext;
- import org.quartz.JobExecutionException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.Assert;
- import org.springframework.util.CollectionUtils;
- @DisallowConcurrentExecution
- @Slf4j
- public class ConsumptionJob implements Job {
- private static final DateTimeFormatter CONSUMPTION_TIME_DISPLAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- @Resource
- private Cache<Long, List<DeviceTrigger>> consumptionTriggerCache;
- @Resource
- private TriggerDeviceUtil triggerDeviceUtil;
- @Resource
- private RuleEngineUtil ruleEngineUtil;
- @Autowired
- private RemoteTsdbProxyService remoteTsdbProxyService;
- @Resource
- private RuleEngineService ruleEngineService;
- public ConsumptionJob() {
- }
- public void execute(JobExecutionContext context) throws JobExecutionException {
- String[] times = new String[2];
- LocalDateTime now = LocalDateTime.now();
- for(CacheEntry<Long, List<DeviceTrigger>> entry : this.consumptionTriggerCache.entries()) {
- log.info("consumptionTriggerCache: {}, now: {}", entry.getKey(), now);
- Long engineId = (Long)entry.getKey();
- RuleEngine ruleEngine = (RuleEngine)this.ruleEngineService.getById(engineId);
- log.info("ruleEngine: {}", ruleEngine);
- if (ruleEngine == null || ruleEngine.getStatus() == null || ruleEngine.getStatus() != 1) {
- continue;
- }
- Long spaceId = ruleEngine.getSpaceId();
- RuleEngineDetail ruleEngineDetail = (RuleEngineDetail) JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class);
- List<RuleEngineAction> actions = this.ruleEngineService.getActions(ruleEngineDetail.getActions());
- if (!actions.isEmpty()) {
- RuleEngineDetailLog ruleEngineDetailLog = new RuleEngineDetailLog();
- boolean spaceTriggerAction = false;
- label77:
- for(DeviceTrigger trigger : entry.getValue()) {
- if (!CollectionUtils.isEmpty(trigger.getDevices())) {
- label75:
- for(SimpleVO device : trigger.getDevices()) {
- List<Condition> meetTriggerConditionList = new ArrayList();
- List<Condition> triggerConditions = trigger.getConditions();
- StringBuilder boolConstraintExp = new StringBuilder();
- Map<String, String> valueMap = new HashMap();
- boolConstraintExp.append("(");
- for(int i = 0; i < triggerConditions.size(); ++i) {
- Condition triggerCondition = (Condition)triggerConditions.get(i);
- String identifier = triggerCondition.getIdentifier();
- Expression expression = triggerCondition.getExpression();
- TimeRange timeRange = triggerCondition.getTimeRange();
- String valueCondition = triggerCondition.getCondition();
- String operator = triggerCondition.getOperator();
- if (i != 0 && preAssertFalse(boolConstraintExp, operator)) {
- continue label75;
- }
- initStartTimeAndEndTime(now, times, timeRange);
- HistorysInnerRequestVO requestVO = new HistorysInnerRequestVO();
- requestVO.setDeviceuuid(Collections.singletonList(device.getDeviceUuid()));
- requestVO.setMetrics(Collections.singletonList(identifier.toLowerCase()));
- requestVO.setStartTime(times[0]);
- requestVO.setEndTime(times[1]);
- ApiResult<List<HistorysInnerResultVO>> historyApi = remoteTsdbProxyService.queryHistoryDeviceData(requestVO);
- List<HistorysInnerResultVO> result = historyApi != null && historyApi.getData() != null
- ? historyApi.getData()
- : Collections.emptyList();
- List<DataPointVO> dataPointVOList = findDataPointsForMetric(result, device.getDeviceUuid(), identifier.toLowerCase());
- if (dataPointVOList.isEmpty()) {
- boolConstraintExp.append(false);
- } else {
- DataPointVO first = dataPointVOList.get(0);
- DataPointVO last = dataPointVOList.get(dataPointVOList.size() - 1);
- if (first.getValue() == null || last.getValue() == null) {
- boolConstraintExp.append(false);
- } else {
- BigDecimal currValue = last.getValue().subtract(first.getValue());
- boolean meetCondition = TriggerDeviceUtil.isMeetConsumptionCondition(currValue, valueCondition, expression.getX());
- if (meetCondition) {
- meetTriggerConditionList.add(triggerCondition);
- valueMap.put(identifier, currValue.toString());
- }
- boolConstraintExp.append(meetCondition);
- }
- }
- }
- spaceTriggerAction = TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + ")");
- if (spaceTriggerAction) {
- this.triggerDeviceUtil.setTriggerLog(now, ruleEngineDetailLog, device.getId(), device.getName(), TriggerValueTypeEnum.CONSUMPTION.getValue(), TriggerTypeEnum.DEVICE, meetTriggerConditionList, valueMap);
- break label77;
- }
- }
- }
- }
- if (spaceTriggerAction) {
- log.info("consumptionJob spaceTriggerAction true");
- boolean cronOk = this.triggerDeviceUtil.meetCronConstraintAction(
- this.ruleEngineService.getCronConstraints(ruleEngineDetail.getConstraints()),
- ruleEngineDetailLog,
- now);
- List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints());
- boolean deviceOk = this.triggerDeviceUtil.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
- if (cronOk && deviceOk) {
- log.info("ConsumptionJob constraints satisfied engineId: {}, actions: {}", engineId, actions);
- this.ruleEngineUtil.performMultipleDevicesControl(engineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), spaceId, actions, ruleEngineDetailLog);
- }
- }
- }
- }
- }
- public static void initStartTimeAndEndTime(LocalDateTime now, String[] times, TimeRange timeRange) {
- Integer start = timeRange.getStart();
- Integer end = timeRange.getEnd();
- Assert.notNull(start, "start不能为空");
- Assert.notNull(end, "end不能为空");
- Assert.isTrue(end > start, "结束时间必须大于起始时间");
- LocalDateTime startTime;
- LocalDateTime endTime;
- TimeTypeEnum typeEnum = TimeTypeEnum.get(timeRange.getType());
- Assert.notNull(typeEnum, "不支持的时间类型");
- // 老版本传统 switch 格式
- switch (typeEnum) {
- case HOUR:
- // 支持跨天小时 22~26
- LocalDateTime todayZeroHour = now.truncatedTo(ChronoUnit.DAYS);
- startTime = todayZeroHour.withHour(start);
- if (end >= 24) {
- endTime = todayZeroHour.plusDays(1).withHour(end - 24);
- } else {
- endTime = todayZeroHour.withHour(end);
- }
- if (now.isBefore(startTime)) {
- startTime = startTime.minusDays(1);
- endTime = endTime.minusDays(1);
- }
- break;
- case DAY:
- // 支持按当月实际天数跨月:start=5, end=33
- LocalDateTime todayZeroDay = now.truncatedTo(ChronoUnit.DAYS);
- startTime = todayZeroDay.withDayOfMonth(start);
- // 结束时间 = 开始时间 + (end-1)天
- endTime = startTime.plusDays(end - 1);
- if (now.isBefore(startTime)) {
- startTime = startTime.minusMonths(1);
- endTime = endTime.minusMonths(1);
- }
- break;
- case WEEK:
- LocalDateTime todayZeroWeek = now.truncatedTo(ChronoUnit.DAYS);
- // 开始时间:本周 星期start(1=周一)
- DayOfWeek todayWeek = todayZeroWeek.getDayOfWeek();
- int weekVal = todayWeek.getValue();
- startTime = todayZeroWeek.plusDays((long) start - weekVal);
- // 结束时间:从开始时间 直接往后加 (end - start) 天
- // 支持 end > 7,自动跨周、跨N周
- endTime = startTime.plusDays(end - start);
- // 如果还没到当前周期 → 取上一周
- if (now.isBefore(startTime)) {
- startTime = startTime.minusWeeks(1);
- endTime = endTime.minusWeeks(1);
- }
- break;
- default:
- throw new IllegalArgumentException("不支持的时间类型: " + typeEnum);
- }
- times[0] = startTime.format(CONSUMPTION_TIME_DISPLAY);
- times[1] = endTime.format(CONSUMPTION_TIME_DISPLAY);
- log.debug("initStartTimeAndEndTime type={} startTime={} endTime={}", typeEnum, times[0], times[1]);
- }
- public static boolean preAssertFalse(StringBuilder boolConstraintExp, String operator) {
- if (TriggerDeviceUtil.checkOperator(operator)) {
- throw new RuntimeException("operator的值必须是 && or ||");
- } else {
- boolConstraintExp.append(" ").append(operator).append(" ");
- return !TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + "true)");
- }
- }
- private static List<DataPointVO> findDataPointsForMetric(List<HistorysInnerResultVO> historyList, String deviceUuid, String metricIdentifier) {
- if (CollectionUtils.isEmpty(historyList)) {
- return Collections.emptyList();
- }
- HistorysInnerResultVO row = null;
- for (HistorysInnerResultVO vo : historyList) {
- if (vo != null && deviceUuid != null && deviceUuid.equals(vo.getDeviceuuid())) {
- row = vo;
- break;
- }
- }
- if (row == null) {
- row = historyList.get(0);
- }
- List<MetricVO> metrics = row.getMetrics();
- if (CollectionUtils.isEmpty(metrics)) {
- return Collections.emptyList();
- }
- for (MetricVO m : metrics) {
- if (m != null && metricIdentifier != null && metricIdentifier.equals(m.getMetric())) {
- return metricItemsToDataPoints(m.getMetricItems());
- }
- }
- return Collections.emptyList();
- }
- private static List<DataPointVO> metricItemsToDataPoints(List<Map<String, Object>> metricItems) {
- if (CollectionUtils.isEmpty(metricItems)) {
- return Collections.emptyList();
- }
- List<DataPointVO> out = new ArrayList<>(metricItems.size());
- for (Map<String, Object> item : metricItems) {
- if (item == null) {
- continue;
- }
- Object ts = item.get("timestamp");
- Object val = item.get("value");
- if (ts == null || val == null) {
- continue;
- }
- BigDecimal num = toBigDecimal(val);
- if (num == null) {
- continue;
- }
- out.add(new DataPointVO(ts.toString(), num));
- }
- return out;
- }
- private static BigDecimal toBigDecimal(Object val) {
- if (val instanceof BigDecimal) {
- return (BigDecimal) val;
- }
- if (val instanceof Number) {
- return BigDecimal.valueOf(((Number) val).doubleValue());
- }
- try {
- return new BigDecimal(val.toString().trim());
- } catch (Exception e) {
- return null;
- }
- }
- }
|