TriggerCronTask.java 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package com.usky.rule.crons;
  2. import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
  3. import com.usky.rule.domain.RuleEngine;
  4. import com.usky.rule.service.RuleEngineCronService;
  5. import com.usky.rule.service.RuleEngineService;
  6. import com.usky.rule.util.CronUtil;
  7. import com.usky.rule.util.JsonUtil;
  8. import com.usky.rule.util.RuleEngineUtil;
  9. import com.usky.rule.vo.RuleEngineCronVO;
  10. import com.usky.rule.vo.RuleEngineDetail;
  11. import com.usky.rule.vo.action.RuleEngineAction;
  12. import com.usky.rule.vo.constraint.CronConstraint;
  13. import com.usky.rule.vo.constraint.DeviceConstraint;
  14. import com.usky.rule.vo.trigger.CronTrigger;
  15. import com.usky.rule.vo.trigger.DeviceTrigger;
  16. import com.usky.rule.vo.RuleEnginePageRequest;
  17. import com.usky.rule.vo.trigger.SpaceTrigger;
  18. import com.usky.rule.config.CronTaskManager;
  19. import com.usky.rule.jobs.CommonJob;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.function.Function;
  23. import java.util.stream.Collectors;
  24. import org.apache.commons.lang3.StringUtils;
  25. import org.cache2k.Cache;
  26. import org.jetbrains.annotations.NotNull;
  27. import org.springframework.beans.BeansException;
  28. import org.springframework.beans.factory.DisposableBean;
  29. import org.springframework.beans.factory.InitializingBean;
  30. import org.springframework.context.ApplicationContext;
  31. import org.springframework.context.ApplicationContextAware;
  32. import org.springframework.context.SmartLifecycle;
  33. import org.springframework.stereotype.Component;
  34. @Component
  35. public class TriggerCronTask implements ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
  36. private RuleEngineService ruleEngineService;
  37. private RuleEngineCronService ruleEngineCronService;
  38. private CronTaskManager cronTaskManager;
  39. private Cache<Long, List<DeviceTrigger>> consumptionTriggerCache;
  40. private Cache<Long, List<SpaceTrigger>> spaceTriggerCache;
  41. public void start() {
  42. List<RuleEngine> enabledRuleEngineList = this.getAllEnabledRuleEngine();
  43. enabledRuleEngineList = (List)enabledRuleEngineList.stream().filter((ruleEnginex) -> StringUtils.isNotBlank(ruleEnginex.getDetail())).collect(Collectors.toList());
  44. if (!enabledRuleEngineList.isEmpty()) {
  45. for(RuleEngine ruleEngine : enabledRuleEngineList) {
  46. RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class);
  47. List<CronTrigger> cronTriggers = this.ruleEngineService.getCronTriggers(ruleEngineDetail.getTriggers());
  48. if (cronTriggers != null) {
  49. cronTriggers = (List)cronTriggers.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.toList());
  50. if (!cronTriggers.isEmpty()) {
  51. this.processCronTask(ruleEngine.getId(), ruleEngine.getProjectId(), ruleEngine.getSpaceId(), ruleEngineDetail, cronTriggers);
  52. }
  53. }
  54. this.setDeviceConsumptionCache(ruleEngine.getId(), ruleEngineDetail);
  55. }
  56. }
  57. this.cronTaskManager.performConsumptionTask();
  58. }
  59. private void startCronTask() {
  60. List<RuleEngineCronVO> ruleEngineCronList = this.ruleEngineCronService.getTurnedOnCron();
  61. Function<RuleEngineCronVO, String> ruleEngineCronFunction = (vo) -> vo.getRuleEngineId() + ":" + vo.getProjectId() + ":" + vo.getSpaceId();
  62. Map<String, List<CronTrigger>> ruleEngineCronMap = (Map)ruleEngineCronList.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.groupingBy(ruleEngineCronFunction, Collectors.mapping((vo) -> new CronTrigger(vo.getCron()), Collectors.toList())));
  63. if (CollectionUtils.isNotEmpty(ruleEngineCronMap)) {
  64. ruleEngineCronMap.forEach((identifier, list) -> {
  65. String[] idAndProjectId = identifier.split(":");
  66. Long ruleEngineId = Long.valueOf(idAndProjectId[0]);
  67. Long projectId = Long.valueOf(idAndProjectId[1]);
  68. Long spaceId = Long.valueOf(idAndProjectId[2]);
  69. String detail = this.ruleEngineService.getDetail(Long.valueOf(idAndProjectId[0]));
  70. if (StringUtils.isNotEmpty(detail)) {
  71. RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(detail, RuleEngineDetail.class);
  72. this.processCronTask(ruleEngineId, projectId, spaceId, ruleEngineDetail, list);
  73. }
  74. });
  75. }
  76. }
  77. /**
  78. * 与库里的规则详情对齐能耗类设备触发器。去掉/改掉 consumption 时必须 remove,否则 ConsumptionJob 仍用旧缓存。
  79. */
  80. public void setDeviceConsumptionCache(Long ruleEngineId, RuleEngineDetail ruleEngineDetail) {
  81. if (ruleEngineId == null) {
  82. return;
  83. }
  84. List<DeviceTrigger> deviceTriggers = this.ruleEngineService.getDeviceTriggers(ruleEngineDetail.getTriggers());
  85. if (deviceTriggers == null || deviceTriggers.isEmpty()) {
  86. this.consumptionTriggerCache.remove(ruleEngineId);
  87. return;
  88. }
  89. List<DeviceTrigger> consumptionTriggers = deviceTriggers.stream()
  90. .filter((t) -> "consumption".equals(t.getMethod()))
  91. .collect(Collectors.toList());
  92. if (consumptionTriggers.isEmpty()) {
  93. this.consumptionTriggerCache.remove(ruleEngineId);
  94. } else {
  95. this.consumptionTriggerCache.put(ruleEngineId, consumptionTriggers);
  96. }
  97. // List<SpaceTrigger> spaceTriggers = this.ruleEngineService.getSpaceTriggers(ruleEngineDetail.getTriggers());
  98. // if (deviceTriggers != null && !spaceTriggers.isEmpty()) {
  99. // this.spaceTriggerCache.put(ruleEngineId, spaceTriggers);
  100. // }
  101. }
  102. public void removeDeviceConsumptionCache(Long ruleEngineId) {
  103. if (ruleEngineId != null) {
  104. this.consumptionTriggerCache.remove(ruleEngineId);
  105. }
  106. }
  107. private void processCronTask(Long id, Long projectId, Long spaceId, RuleEngineDetail engineDetail, List<CronTrigger> cronTriggers) {
  108. List<CronConstraint> cronConstraints = this.ruleEngineService.getCronConstraints(engineDetail.getConstraints());
  109. List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(engineDetail.getConstraints());
  110. List<RuleEngineAction> actions = this.ruleEngineService.getActions(engineDetail.getActions());
  111. if (actions != null && !actions.isEmpty()) {
  112. addCronJob(id, projectId, spaceId, cronTriggers, cronConstraints, deviceConstraints, actions, this.cronTaskManager);
  113. }
  114. }
  115. public static void addCronJob(Long ruleEngineId, Long projectId, Long spaceId, List<CronTrigger> cronTriggers, List<CronConstraint> cronConstraints, List<DeviceConstraint> deviceConstraints, List<RuleEngineAction> actions, CronTaskManager cronTaskManager) {
  116. String jobGroup = RuleEngineUtil.getJobGroup(ruleEngineId);
  117. for(int i = 0; i < cronTriggers.size(); ++i) {
  118. CronTrigger cronTrigger = (CronTrigger)cronTriggers.get(i);
  119. String jobName = RuleEngineUtil.getTriggerCronJobName(i);
  120. cronTaskManager.addJob(jobName, jobGroup, cronTrigger.getCron(), CommonJob.class, cronConstraints, deviceConstraints, actions, ruleEngineId, projectId, spaceId);
  121. }
  122. }
  123. /** 查询所有已启用规则(status=1),使用 list(RuleEnginePageRequest) 替代 MyBatis-Plus list(Wrapper) */
  124. public List<RuleEngine> getAllEnabledRuleEngine() {
  125. RuleEnginePageRequest request = new RuleEnginePageRequest();
  126. request.setStatus(1);
  127. return this.ruleEngineService.list(request);
  128. }
  129. public void destroy() throws Exception {
  130. }
  131. public void afterPropertiesSet() throws Exception {
  132. }
  133. public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
  134. }
  135. public void stop() {
  136. }
  137. public boolean isRunning() {
  138. return false;
  139. }
  140. public TriggerCronTask(final RuleEngineService ruleEngineService, final RuleEngineCronService ruleEngineCronService, final CronTaskManager cronTaskManager, final Cache<Long, List<DeviceTrigger>> consumptionTriggerCache, final Cache<Long, List<SpaceTrigger>> spaceTriggerCache) {
  141. this.ruleEngineService = ruleEngineService;
  142. this.ruleEngineCronService = ruleEngineCronService;
  143. this.cronTaskManager = cronTaskManager;
  144. this.consumptionTriggerCache = consumptionTriggerCache;
  145. this.spaceTriggerCache = spaceTriggerCache;
  146. }
  147. }