ConsumptionJob.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package com.usky.rule.jobs;
  2. import com.usky.common.core.bean.ApiResult;
  3. import com.usky.common.security.utils.SecurityUtils;
  4. import com.usky.demo.RemoteTsdbProxyService;
  5. import com.usky.demo.domain.HistorysInnerRequestVO;
  6. import com.usky.demo.domain.HistorysInnerResultVO;
  7. import com.usky.demo.domain.MetricVO;
  8. import com.usky.rule.domain.RuleEngine;
  9. import com.usky.rule.enums.TimeTypeEnum;
  10. import com.usky.rule.enums.TriggerTypeEnum;
  11. import com.usky.rule.enums.TriggerValueTypeEnum;
  12. import com.usky.rule.util.JsonUtil;
  13. import com.usky.rule.vo.DataPointVO;
  14. import com.usky.rule.vo.Condition;
  15. import com.usky.rule.vo.Expression;
  16. import com.usky.rule.vo.RuleEngineDetail;
  17. import com.usky.rule.vo.TimeRange;
  18. import com.usky.rule.vo.action.RuleEngineAction;
  19. import com.usky.rule.vo.constraint.DeviceConstraint;
  20. import com.usky.rule.vo.log.RuleEngineDetailLog;
  21. import com.usky.rule.vo.trigger.DeviceTrigger;
  22. import com.usky.rule.vo.visualization.SimpleVO;
  23. import com.usky.rule.subscribe.TriggerDeviceUtil;
  24. import com.usky.rule.util.RuleEngineUtil;
  25. //import com.usky.rule.DeviceService;
  26. import com.usky.rule.service.RuleEngineService;
  27. import java.math.BigDecimal;
  28. import java.time.DayOfWeek;
  29. import java.time.LocalDateTime;
  30. import java.time.format.DateTimeFormatter;
  31. import java.time.temporal.ChronoUnit;
  32. import java.util.*;
  33. import javax.annotation.Resource;
  34. import lombok.extern.slf4j.Slf4j;
  35. import org.cache2k.Cache;
  36. import org.cache2k.CacheEntry;
  37. import org.quartz.DisallowConcurrentExecution;
  38. import org.quartz.Job;
  39. import org.quartz.JobExecutionContext;
  40. import org.quartz.JobExecutionException;
  41. import org.springframework.beans.factory.annotation.Autowired;
  42. import org.springframework.util.Assert;
  43. import org.springframework.util.CollectionUtils;
  44. @DisallowConcurrentExecution
  45. @Slf4j
  46. public class ConsumptionJob implements Job {
  47. private static final DateTimeFormatter CONSUMPTION_TIME_DISPLAY = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  48. @Resource
  49. private Cache<Long, List<DeviceTrigger>> consumptionTriggerCache;
  50. @Resource
  51. private TriggerDeviceUtil triggerDeviceUtil;
  52. @Resource
  53. private RuleEngineUtil ruleEngineUtil;
  54. @Autowired
  55. private RemoteTsdbProxyService remoteTsdbProxyService;
  56. @Resource
  57. private RuleEngineService ruleEngineService;
  58. public ConsumptionJob() {
  59. }
  60. public void execute(JobExecutionContext context) throws JobExecutionException {
  61. String[] times = new String[2];
  62. LocalDateTime now = LocalDateTime.now();
  63. for(CacheEntry<Long, List<DeviceTrigger>> entry : this.consumptionTriggerCache.entries()) {
  64. log.info("consumptionTriggerCache: {}, now: {}", entry.getKey(), now);
  65. Long engineId = (Long)entry.getKey();
  66. RuleEngine ruleEngine = (RuleEngine)this.ruleEngineService.getById(engineId);
  67. log.info("ruleEngine: {}", ruleEngine);
  68. if (ruleEngine == null || ruleEngine.getStatus() == null || ruleEngine.getStatus() != 1) {
  69. continue;
  70. }
  71. Long spaceId = ruleEngine.getSpaceId();
  72. RuleEngineDetail ruleEngineDetail = (RuleEngineDetail) JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class);
  73. List<RuleEngineAction> actions = this.ruleEngineService.getActions(ruleEngineDetail.getActions());
  74. if (!actions.isEmpty()) {
  75. RuleEngineDetailLog ruleEngineDetailLog = new RuleEngineDetailLog();
  76. boolean spaceTriggerAction = false;
  77. label77:
  78. for(DeviceTrigger trigger : entry.getValue()) {
  79. if (!CollectionUtils.isEmpty(trigger.getDevices())) {
  80. label75:
  81. for(SimpleVO device : trigger.getDevices()) {
  82. List<Condition> meetTriggerConditionList = new ArrayList();
  83. List<Condition> triggerConditions = trigger.getConditions();
  84. StringBuilder boolConstraintExp = new StringBuilder();
  85. Map<String, String> valueMap = new HashMap();
  86. boolConstraintExp.append("(");
  87. for(int i = 0; i < triggerConditions.size(); ++i) {
  88. Condition triggerCondition = (Condition)triggerConditions.get(i);
  89. String identifier = triggerCondition.getIdentifier();
  90. Expression expression = triggerCondition.getExpression();
  91. TimeRange timeRange = triggerCondition.getTimeRange();
  92. String valueCondition = triggerCondition.getCondition();
  93. String operator = triggerCondition.getOperator();
  94. if (i != 0 && preAssertFalse(boolConstraintExp, operator)) {
  95. continue label75;
  96. }
  97. initStartTimeAndEndTime(now, times, timeRange);
  98. HistorysInnerRequestVO requestVO = new HistorysInnerRequestVO();
  99. requestVO.setDeviceuuid(Collections.singletonList(device.getDeviceUuid()));
  100. requestVO.setMetrics(Collections.singletonList(identifier.toLowerCase()));
  101. requestVO.setStartTime(times[0]);
  102. requestVO.setEndTime(times[1]);
  103. ApiResult<List<HistorysInnerResultVO>> historyApi = remoteTsdbProxyService.queryHistoryDeviceData(requestVO);
  104. List<HistorysInnerResultVO> result = historyApi != null && historyApi.getData() != null
  105. ? historyApi.getData()
  106. : Collections.emptyList();
  107. List<DataPointVO> dataPointVOList = findDataPointsForMetric(result, device.getDeviceUuid(), identifier.toLowerCase());
  108. if (dataPointVOList.isEmpty()) {
  109. boolConstraintExp.append(false);
  110. } else {
  111. DataPointVO first = dataPointVOList.get(0);
  112. DataPointVO last = dataPointVOList.get(dataPointVOList.size() - 1);
  113. if (first.getValue() == null || last.getValue() == null) {
  114. boolConstraintExp.append(false);
  115. } else {
  116. BigDecimal currValue = last.getValue().subtract(first.getValue());
  117. boolean meetCondition = TriggerDeviceUtil.isMeetConsumptionCondition(currValue, valueCondition, expression.getX());
  118. if (meetCondition) {
  119. meetTriggerConditionList.add(triggerCondition);
  120. valueMap.put(identifier, currValue.toString());
  121. }
  122. boolConstraintExp.append(meetCondition);
  123. }
  124. }
  125. }
  126. spaceTriggerAction = TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + ")");
  127. if (spaceTriggerAction) {
  128. this.triggerDeviceUtil.setTriggerLog(now, ruleEngineDetailLog, device.getId(), device.getName(), TriggerValueTypeEnum.CONSUMPTION.getValue(), TriggerTypeEnum.DEVICE, meetTriggerConditionList, valueMap);
  129. break label77;
  130. }
  131. }
  132. }
  133. }
  134. if (spaceTriggerAction) {
  135. log.info("consumptionJob spaceTriggerAction true");
  136. boolean cronOk = this.triggerDeviceUtil.meetCronConstraintAction(
  137. this.ruleEngineService.getCronConstraints(ruleEngineDetail.getConstraints()),
  138. ruleEngineDetailLog,
  139. now);
  140. List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(ruleEngineDetail.getConstraints());
  141. boolean deviceOk = this.triggerDeviceUtil.meetConstraintAction(deviceConstraints, ruleEngineDetailLog);
  142. if (cronOk && deviceOk) {
  143. log.info("ConsumptionJob constraints satisfied engineId: {}, actions: {}", engineId, actions);
  144. this.ruleEngineUtil.performMultipleDevicesControl(engineId, true, TriggerTypeEnum.DEVICE.getType(), ruleEngine.getProjectId(), spaceId, actions, ruleEngineDetailLog);
  145. }
  146. }
  147. }
  148. }
  149. }
  150. public static void initStartTimeAndEndTime(LocalDateTime now, String[] times, TimeRange timeRange) {
  151. Integer start = timeRange.getStart();
  152. Integer end = timeRange.getEnd();
  153. Assert.notNull(start, "start不能为空");
  154. Assert.notNull(end, "end不能为空");
  155. Assert.isTrue(end > start, "结束时间必须大于起始时间");
  156. LocalDateTime startTime;
  157. LocalDateTime endTime;
  158. TimeTypeEnum typeEnum = TimeTypeEnum.get(timeRange.getType());
  159. Assert.notNull(typeEnum, "不支持的时间类型");
  160. // 老版本传统 switch 格式
  161. switch (typeEnum) {
  162. case HOUR:
  163. // 支持跨天小时 22~26
  164. LocalDateTime todayZeroHour = now.truncatedTo(ChronoUnit.DAYS);
  165. startTime = todayZeroHour.withHour(start);
  166. if (end >= 24) {
  167. endTime = todayZeroHour.plusDays(1).withHour(end - 24);
  168. } else {
  169. endTime = todayZeroHour.withHour(end);
  170. }
  171. if (now.isBefore(startTime)) {
  172. startTime = startTime.minusDays(1);
  173. endTime = endTime.minusDays(1);
  174. }
  175. break;
  176. case DAY:
  177. // 支持按当月实际天数跨月:start=5, end=33
  178. LocalDateTime todayZeroDay = now.truncatedTo(ChronoUnit.DAYS);
  179. startTime = todayZeroDay.withDayOfMonth(start);
  180. // 结束时间 = 开始时间 + (end-1)天
  181. endTime = startTime.plusDays(end - 1);
  182. if (now.isBefore(startTime)) {
  183. startTime = startTime.minusMonths(1);
  184. endTime = endTime.minusMonths(1);
  185. }
  186. break;
  187. case WEEK:
  188. LocalDateTime todayZeroWeek = now.truncatedTo(ChronoUnit.DAYS);
  189. // 开始时间:本周 星期start(1=周一)
  190. DayOfWeek todayWeek = todayZeroWeek.getDayOfWeek();
  191. int weekVal = todayWeek.getValue();
  192. startTime = todayZeroWeek.plusDays((long) start - weekVal);
  193. // 结束时间:从开始时间 直接往后加 (end - start) 天
  194. // 支持 end > 7,自动跨周、跨N周
  195. endTime = startTime.plusDays(end - start);
  196. // 如果还没到当前周期 → 取上一周
  197. if (now.isBefore(startTime)) {
  198. startTime = startTime.minusWeeks(1);
  199. endTime = endTime.minusWeeks(1);
  200. }
  201. break;
  202. default:
  203. throw new IllegalArgumentException("不支持的时间类型: " + typeEnum);
  204. }
  205. times[0] = startTime.format(CONSUMPTION_TIME_DISPLAY);
  206. times[1] = endTime.format(CONSUMPTION_TIME_DISPLAY);
  207. log.debug("initStartTimeAndEndTime type={} startTime={} endTime={}", typeEnum, times[0], times[1]);
  208. }
  209. public static boolean preAssertFalse(StringBuilder boolConstraintExp, String operator) {
  210. if (TriggerDeviceUtil.checkOperator(operator)) {
  211. throw new RuntimeException("operator的值必须是 && or ||");
  212. } else {
  213. boolConstraintExp.append(" ").append(operator).append(" ");
  214. return !TriggerDeviceUtil.getBooleanExpressionValue(boolConstraintExp + "true)");
  215. }
  216. }
  217. private static List<DataPointVO> findDataPointsForMetric(List<HistorysInnerResultVO> historyList, String deviceUuid, String metricIdentifier) {
  218. if (CollectionUtils.isEmpty(historyList)) {
  219. return Collections.emptyList();
  220. }
  221. HistorysInnerResultVO row = null;
  222. for (HistorysInnerResultVO vo : historyList) {
  223. if (vo != null && deviceUuid != null && deviceUuid.equals(vo.getDeviceuuid())) {
  224. row = vo;
  225. break;
  226. }
  227. }
  228. if (row == null) {
  229. row = historyList.get(0);
  230. }
  231. List<MetricVO> metrics = row.getMetrics();
  232. if (CollectionUtils.isEmpty(metrics)) {
  233. return Collections.emptyList();
  234. }
  235. for (MetricVO m : metrics) {
  236. if (m != null && metricIdentifier != null && metricIdentifier.equals(m.getMetric())) {
  237. return metricItemsToDataPoints(m.getMetricItems());
  238. }
  239. }
  240. return Collections.emptyList();
  241. }
  242. private static List<DataPointVO> metricItemsToDataPoints(List<Map<String, Object>> metricItems) {
  243. if (CollectionUtils.isEmpty(metricItems)) {
  244. return Collections.emptyList();
  245. }
  246. List<DataPointVO> out = new ArrayList<>(metricItems.size());
  247. for (Map<String, Object> item : metricItems) {
  248. if (item == null) {
  249. continue;
  250. }
  251. Object ts = item.get("timestamp");
  252. Object val = item.get("value");
  253. if (ts == null || val == null) {
  254. continue;
  255. }
  256. BigDecimal num = toBigDecimal(val);
  257. if (num == null) {
  258. continue;
  259. }
  260. out.add(new DataPointVO(ts.toString(), num));
  261. }
  262. return out;
  263. }
  264. private static BigDecimal toBigDecimal(Object val) {
  265. if (val instanceof BigDecimal) {
  266. return (BigDecimal) val;
  267. }
  268. if (val instanceof Number) {
  269. return BigDecimal.valueOf(((Number) val).doubleValue());
  270. }
  271. try {
  272. return new BigDecimal(val.toString().trim());
  273. } catch (Exception e) {
  274. return null;
  275. }
  276. }
  277. }