TriggerCronTask.java 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package com.usky.rule.crons;
  2. import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
  3. import com.usky.rule.domain.RuleEngine;
  4. import com.usky.rule.service.RuleEngineCronService;
  5. import com.usky.rule.service.RuleEngineService;
  6. import com.usky.rule.util.CronUtil;
  7. import com.usky.rule.util.JsonUtil;
  8. import com.usky.rule.util.RuleEngineUtil;
  9. import com.usky.rule.vo.RuleEngineCronVO;
  10. import com.usky.rule.vo.RuleEngineDetail;
  11. import com.usky.rule.vo.action.RuleEngineAction;
  12. import com.usky.rule.vo.constraint.CronConstraint;
  13. import com.usky.rule.vo.constraint.DeviceConstraint;
  14. import com.usky.rule.vo.trigger.CronTrigger;
  15. import com.usky.rule.vo.trigger.DeviceTrigger;
  16. import com.usky.rule.vo.RuleEnginePageRequest;
  17. import com.usky.rule.vo.trigger.SpaceTrigger;
  18. import com.usky.rule.config.CronTaskManager;
  19. import com.usky.rule.jobs.CommonJob;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.function.Function;
  23. import java.util.stream.Collectors;
  24. import org.apache.commons.lang3.StringUtils;
  25. import org.cache2k.Cache;
  26. import org.jetbrains.annotations.NotNull;
  27. import org.springframework.beans.BeansException;
  28. import org.springframework.beans.factory.DisposableBean;
  29. import org.springframework.beans.factory.InitializingBean;
  30. import org.springframework.context.ApplicationContext;
  31. import org.springframework.context.ApplicationContextAware;
  32. import org.springframework.context.SmartLifecycle;
  33. import org.springframework.stereotype.Component;
  34. @Component
  35. public class TriggerCronTask implements ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
  36. private RuleEngineService ruleEngineService;
  37. private RuleEngineCronService ruleEngineCronService;
  38. private CronTaskManager cronTaskManager;
  39. private Cache<Long, List<DeviceTrigger>> consumptionTriggerCache;
  40. private Cache<Long, List<SpaceTrigger>> spaceTriggerCache;
  41. public void start() {
  42. List<RuleEngine> enabledRuleEngineList = this.getAllEnabledRuleEngine();
  43. enabledRuleEngineList = (List)enabledRuleEngineList.stream().filter((ruleEnginex) -> StringUtils.isNotBlank(ruleEnginex.getDetail())).collect(Collectors.toList());
  44. if (!enabledRuleEngineList.isEmpty()) {
  45. for(RuleEngine ruleEngine : enabledRuleEngineList) {
  46. RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class);
  47. List<CronTrigger> cronTriggers = this.ruleEngineService.getCronTriggers(ruleEngineDetail.getTriggers());
  48. if (cronTriggers != null) {
  49. cronTriggers = (List)cronTriggers.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.toList());
  50. if (!cronTriggers.isEmpty()) {
  51. this.processCronTask(ruleEngine.getId(), ruleEngineDetail, cronTriggers);
  52. }
  53. }
  54. this.setDeviceConsumptionCache(ruleEngine.getId(), ruleEngineDetail);
  55. }
  56. }
  57. this.cronTaskManager.performConsumptionTask();
  58. }
  59. private void startCronTask() {
  60. List<RuleEngineCronVO> ruleEngineCronList = this.ruleEngineCronService.getTurnedOnCron();
  61. Function<RuleEngineCronVO, String> ruleEngineCronFunction = (vo) -> vo.getRuleEngineId() + ":" + vo.getProjectId();
  62. Map<String, List<CronTrigger>> ruleEngineCronMap = (Map)ruleEngineCronList.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.groupingBy(ruleEngineCronFunction, Collectors.mapping((vo) -> new CronTrigger(vo.getCron()), Collectors.toList())));
  63. if (CollectionUtils.isNotEmpty(ruleEngineCronMap)) {
  64. ruleEngineCronMap.forEach((identifier, list) -> {
  65. String[] idAndProjectId = identifier.split(":");
  66. Long ruleEngineId = Long.valueOf(idAndProjectId[0]);
  67. Long projectId = Long.valueOf(idAndProjectId[1]);
  68. String detail = this.ruleEngineService.getDetail(Long.valueOf(idAndProjectId[0]));
  69. if (StringUtils.isNotEmpty(detail)) {
  70. RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(detail, RuleEngineDetail.class);
  71. this.processCronTask(ruleEngineId, ruleEngineDetail, list);
  72. }
  73. });
  74. }
  75. }
  76. /**
  77. * 与库里的规则详情对齐能耗类设备触发器。去掉/改掉 consumption 时必须 remove,否则 ConsumptionJob 仍用旧缓存。
  78. */
  79. public void setDeviceConsumptionCache(Long ruleEngineId, RuleEngineDetail ruleEngineDetail) {
  80. if (ruleEngineId == null) {
  81. return;
  82. }
  83. List<DeviceTrigger> deviceTriggers = this.ruleEngineService.getDeviceTriggers(ruleEngineDetail.getTriggers());
  84. if (deviceTriggers == null || deviceTriggers.isEmpty()) {
  85. this.consumptionTriggerCache.remove(ruleEngineId);
  86. return;
  87. }
  88. List<DeviceTrigger> consumptionTriggers = deviceTriggers.stream()
  89. .filter((t) -> "consumption".equals(t.getMethod()))
  90. .collect(Collectors.toList());
  91. if (consumptionTriggers.isEmpty()) {
  92. this.consumptionTriggerCache.remove(ruleEngineId);
  93. } else {
  94. this.consumptionTriggerCache.put(ruleEngineId, consumptionTriggers);
  95. }
  96. // List<SpaceTrigger> spaceTriggers = this.ruleEngineService.getSpaceTriggers(ruleEngineDetail.getTriggers());
  97. // if (deviceTriggers != null && !spaceTriggers.isEmpty()) {
  98. // this.spaceTriggerCache.put(ruleEngineId, spaceTriggers);
  99. // }
  100. }
  101. public void removeDeviceConsumptionCache(Long ruleEngineId) {
  102. if (ruleEngineId != null) {
  103. this.consumptionTriggerCache.remove(ruleEngineId);
  104. }
  105. }
  106. private void processCronTask(Long id, RuleEngineDetail engineDetail, List<CronTrigger> cronTriggers) {
  107. List<CronConstraint> cronConstraints = this.ruleEngineService.getCronConstraints(engineDetail.getConstraints());
  108. List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(engineDetail.getConstraints());
  109. List<RuleEngineAction> actions = this.ruleEngineService.getActions(engineDetail.getActions());
  110. if (actions != null && !actions.isEmpty()) {
  111. addCronJob(id, cronTriggers, cronConstraints, deviceConstraints, actions, this.cronTaskManager);
  112. }
  113. }
  114. public static void addCronJob(Long ruleEngineId, List<CronTrigger> cronTriggers, List<CronConstraint> cronConstraints, List<DeviceConstraint> deviceConstraints, List<RuleEngineAction> actions, CronTaskManager cronTaskManager) {
  115. String jobGroup = RuleEngineUtil.getJobGroup(ruleEngineId);
  116. for(int i = 0; i < cronTriggers.size(); ++i) {
  117. CronTrigger cronTrigger = (CronTrigger)cronTriggers.get(i);
  118. String jobName = RuleEngineUtil.getTriggerCronJobName(i);
  119. cronTaskManager.addJob(jobName, jobGroup, cronTrigger.getCron(), CommonJob.class, cronConstraints, deviceConstraints, actions, ruleEngineId);
  120. }
  121. }
  122. /** 查询所有已启用规则(status=1),使用 list(RuleEnginePageRequest) 替代 MyBatis-Plus list(Wrapper) */
  123. public List<RuleEngine> getAllEnabledRuleEngine() {
  124. RuleEnginePageRequest request = new RuleEnginePageRequest();
  125. request.setStatus(1);
  126. return this.ruleEngineService.list(request);
  127. }
  128. public void destroy() throws Exception {
  129. }
  130. public void afterPropertiesSet() throws Exception {
  131. }
  132. public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
  133. }
  134. public void stop() {
  135. }
  136. public boolean isRunning() {
  137. return false;
  138. }
  139. public TriggerCronTask(final RuleEngineService ruleEngineService, final RuleEngineCronService ruleEngineCronService, final CronTaskManager cronTaskManager, final Cache<Long, List<DeviceTrigger>> consumptionTriggerCache, final Cache<Long, List<SpaceTrigger>> spaceTriggerCache) {
  140. this.ruleEngineService = ruleEngineService;
  141. this.ruleEngineCronService = ruleEngineCronService;
  142. this.cronTaskManager = cronTaskManager;
  143. this.consumptionTriggerCache = consumptionTriggerCache;
  144. this.spaceTriggerCache = spaceTriggerCache;
  145. }
  146. }