| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- package com.usky.rule.jobs;
- import com.usky.common.security.utils.SecurityUtils;
- import com.usky.demo.RemoteTsdbProxyService;
- import com.usky.demo.domain.HistorysInnerRequestVO;
- import com.usky.demo.domain.HistorysInnerResultVO;
- import com.usky.demo.domain.MetricVO;
- import com.usky.rule.domain.RuleEngine;
- import com.usky.rule.vo.Result;
- import com.usky.rule.util.JsonUtil;
- import com.usky.rule.enums.TimeTypeEnum;
- import com.usky.rule.enums.TriggerTypeEnum;
- import com.usky.rule.enums.TriggerValueTypeEnum;
- import com.usky.rule.vo.DataPointVO;
- import com.usky.rule.vo.Condition;
- import com.usky.rule.vo.Expression;
- import com.usky.rule.vo.RuleEngineDetail;
- import com.usky.rule.vo.TimeRange;
- import com.usky.rule.vo.action.RuleEngineAction;
- import com.usky.rule.vo.constraint.DeviceConstraint;
- import com.usky.rule.vo.log.RuleEngineDetailLog;
- import com.usky.rule.vo.trigger.DeviceTrigger;
- import com.usky.rule.vo.visualization.SimpleVO;
- import com.usky.rule.subscribe.TriggerDeviceUtil;
- import com.usky.rule.util.RuleEngineUtil;
- //import com.usky.rule.DeviceService;
- import com.usky.rule.service.RuleEngineService;
- import java.math.BigDecimal;
- import java.time.LocalDateTime;
- import java.time.temporal.ChronoUnit;
- import java.util.*;
- import javax.annotation.Resource;
- import org.cache2k.Cache;
- import org.cache2k.CacheEntry;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.Job;
- import org.quartz.JobExecutionContext;
- import org.quartz.JobExecutionException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.Assert;
- import org.springframework.util.CollectionUtils;
- @DisallowConcurrentExecution
- public class ConsumptionJob implements Job {
- @Resource
- private Cache<Long, List<DeviceTrigger>> consumptionTriggerCache;
- @Resource
- private TriggerDeviceUtil triggerDeviceUtil;
- @Resource
- private RuleEngineUtil ruleEngineUtil;
- @Autowired
- private RemoteTsdbProxyService remoteTsdbProxyService;
- @Resource
- private RuleEngineService ruleEngineService;
- public ConsumptionJob() {
- }
- public void execute(JobExecutionContext context) throws JobExecutionException {
- LocalDateTime[] times = new LocalDateTime[2];
- LocalDateTime now = LocalDateTime.now();
- for(CacheEntry<Long, List<DeviceTrigger>> entry : this.consumptionTriggerCache.entries()) {
- Long engineId = (Long)entry.getKey();
- RuleEngine ruleEngine = (RuleEngine)this.ruleEngineService.getById(engineId);
- Long spaceId = ruleEngine.getSpaceId();
- RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class);
- List<RuleEngineAction> actions = this.ruleEngineService.getActions(ruleEngineDetail.getActions());
- if (!actions.isEmpty()) {
- RuleEngineDetailLog ruleEngineDetailLog = new RuleEngineDetailLog();
- boolean spaceTriggerAction = false;
- label77:
- for(DeviceTrigger trigger : entry.getValue()) {
- if (!CollectionUtils.isEmpty(trigger.getDevices())) {
- label75:
- for(SimpleVO device : trigger.getDevices()) {
- List<Condition> meetTriggerConditionList = new ArrayList();
- List<Condition> triggerConditions = trigger.getConditions();
- StringBuilder boolConstraintExp = new StringBuilder();
- Map<String, String> valueMap = new HashMap();
- boolConstraintExp.append("(");
- for(int i = 0; i < triggerConditions.size(); ++i) {
- Condition triggerCondition = (Condition)triggerConditions.get(i);
- String identifier = triggerCondition.getIdentifier();
- Expression expression = triggerCondition.getExpression();
- TimeRange timeRange = triggerCondition.getTimeRange();
- String valueCondition = triggerCondition.getCondition();
- String operator = triggerCondition.getOperator();
- if (i != 0 && preAssertFalse(boolConstraintExp, operator)) {
- continue label75;
- }
- initStartTimeAndEndTime(now, times, timeRange);
- HistorysInnerRequestVO requestVO = new HistorysInnerRequestVO();
- requestVO.setDeviceuuid(Collections.singletonList(device.getDeviceUuid()));
- requestVO.setMetrics(Collections.singletonList(identifier));
- requestVO.setStartTime(times[0].toString());
- requestVO.setEndTime(times[1].toString());
- List<HistorysInnerResultVO> result = remoteTsdbProxyService.queryHistoryDeviceData(requestVO);
- List<DataPointVO> dataPointVOList = findDataPointsForMetric(result, device.getDeviceUuid(), identifier);
- if (dataPointVOList.isEmpty()) {
- boolConstraintExp.append(false);
- } else {
- DataPointVO first = dataPointVOList.get(0);
- DataPointVO last = dataPointVOList.get(dataPointVOList.size() - 1);
- if (first.getValue() == null || last.getValue() == null) {
- boolConstraintExp.append(false);
- } else {
- BigDecimal currValue = last.getValue().subtract(first.getValue());
- boolean meetCondition = TriggerDeviceUtil.isMeetConsumptionCondition(currValue, valueCondition, expression.getX());
- if (meetCondition) {
- meetTriggerConditionList.add(triggerCondition);
- valueMap.put(identifier, currValue.toString());
- }
- boolConstraintExp.append(meetCondition);
- }
- }
- }
- spaceTriggerAction = TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + ")");
- if (spaceTriggerAction) {
- this.triggerDeviceUtil.setTriggerLog(now, ruleEngineDetailLog, device.getId(), device.getName(), TriggerValueTypeEnum.CONSUMPTION.getValue(), TriggerTypeEnum.DEVICE, meetTriggerConditionList, valueMap);
- break label77;
- }
- }
- }
- }
- if (spaceTriggerAction) {
- List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints());
- boolean constraintAction = this.triggerDeviceUtil.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
- if (constraintAction) {
- this.ruleEngineUtil.performMultipleDevicesControl(engineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), spaceId, actions, ruleEngineDetailLog);
- }
- }
- }
- }
- }
- public static void initStartTimeAndEndTime(LocalDateTime now, LocalDateTime[] times, TimeRange timeRange) {
- Integer start = timeRange.getStart();
- Integer end = timeRange.getEnd();
- Assert.notNull(start, "start不能为空");
- Assert.notNull(end, "end不能为空");
- Assert.isTrue(end - start > 0, "结束时间必须大于起始时间");
- LocalDateTime startTime = null;
- LocalDateTime endTime = null;
- switch (TimeTypeEnum.get(timeRange.getType())) {
- case HOUR:
- endTime = now.truncatedTo(ChronoUnit.HOURS);
- int hour = endTime.getHour();
- if (hour < start) {
- startTime = endTime.minusDays(1L).withHour(start);
- } else {
- startTime = endTime.withHour(start);
- }
- break;
- case DAY:
- case WEEK:
- endTime = now.truncatedTo(ChronoUnit.DAYS);
- int day = endTime.getDayOfMonth();
- if (day < start) {
- startTime = endTime.minusMonths(1L).withDayOfMonth(start);
- } else {
- startTime = endTime.withDayOfMonth(start);
- }
- }
- times[0] = startTime;
- times[1] = endTime;
- }
- public static boolean preAssertFalse(StringBuilder boolConstraintExp, String operator) {
- if (TriggerDeviceUtil.checkOperator(operator)) {
- throw new RuntimeException("operator的值必须是 && or ||");
- } else {
- boolConstraintExp.append(" ").append(operator).append(" ");
- return !TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + "true)");
- }
- }
- private static List<DataPointVO> findDataPointsForMetric(List<HistorysInnerResultVO> historyList, String deviceUuid, String metricIdentifier) {
- if (CollectionUtils.isEmpty(historyList)) {
- return Collections.emptyList();
- }
- HistorysInnerResultVO row = null;
- for (HistorysInnerResultVO vo : historyList) {
- if (vo != null && deviceUuid != null && deviceUuid.equals(vo.getDeviceuuid())) {
- row = vo;
- break;
- }
- }
- if (row == null) {
- row = historyList.get(0);
- }
- List<MetricVO> metrics = row.getMetrics();
- if (CollectionUtils.isEmpty(metrics)) {
- return Collections.emptyList();
- }
- for (MetricVO m : metrics) {
- if (m != null && metricIdentifier != null && metricIdentifier.equals(m.getMetric())) {
- return metricItemsToDataPoints(m.getMetricItems());
- }
- }
- return Collections.emptyList();
- }
- private static List<DataPointVO> metricItemsToDataPoints(List<Map<String, Object>> metricItems) {
- if (CollectionUtils.isEmpty(metricItems)) {
- return Collections.emptyList();
- }
- List<DataPointVO> out = new ArrayList<>(metricItems.size());
- for (Map<String, Object> item : metricItems) {
- if (item == null) {
- continue;
- }
- Object ts = item.get("timestamp");
- Object val = item.get("value");
- if (ts == null || val == null) {
- continue;
- }
- BigDecimal num = toBigDecimal(val);
- if (num == null) {
- continue;
- }
- out.add(new DataPointVO(ts.toString(), num));
- }
- return out;
- }
- private static BigDecimal toBigDecimal(Object val) {
- if (val instanceof BigDecimal) {
- return (BigDecimal) val;
- }
- if (val instanceof Number) {
- return BigDecimal.valueOf(((Number) val).doubleValue());
- }
- try {
- return new BigDecimal(val.toString().trim());
- } catch (Exception e) {
- return null;
- }
- }
- }
|