package com.usky.rule.crons; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.usky.rule.domain.RuleEngine; import com.usky.rule.service.RuleEngineCronService; import com.usky.rule.service.RuleEngineService; import com.usky.rule.util.CronUtil; import com.usky.rule.util.JsonUtil; import com.usky.rule.util.RuleEngineUtil; import com.usky.rule.vo.RuleEngineCronVO; import com.usky.rule.vo.RuleEngineDetail; import com.usky.rule.vo.action.RuleEngineAction; import com.usky.rule.vo.constraint.CronConstraint; import com.usky.rule.vo.constraint.DeviceConstraint; import com.usky.rule.vo.trigger.CronTrigger; import com.usky.rule.vo.trigger.DeviceTrigger; import com.usky.rule.vo.RuleEnginePageRequest; import com.usky.rule.vo.trigger.SpaceTrigger; import com.usky.rule.config.CronTaskManager; import com.usky.rule.jobs.CommonJob; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.cache2k.Cache; import org.jetbrains.annotations.NotNull; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; @Component public class TriggerCronTask implements ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle { private RuleEngineService ruleEngineService; private RuleEngineCronService ruleEngineCronService; private CronTaskManager cronTaskManager; private Cache> consumptionTriggerCache; private Cache> spaceTriggerCache; public void start() { List enabledRuleEngineList = this.getAllEnabledRuleEngine(); enabledRuleEngineList = (List)enabledRuleEngineList.stream().filter((ruleEnginex) -> StringUtils.isNotBlank(ruleEnginex.getDetail())).collect(Collectors.toList()); if (!enabledRuleEngineList.isEmpty()) { for(RuleEngine ruleEngine : enabledRuleEngineList) { RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class); List cronTriggers = this.ruleEngineService.getCronTriggers(ruleEngineDetail.getTriggers()); if (cronTriggers != null) { cronTriggers = (List)cronTriggers.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.toList()); if (!cronTriggers.isEmpty()) { this.processCronTask(ruleEngine.getId(), ruleEngine.getProjectId(), ruleEngine.getSpaceId(), ruleEngineDetail, cronTriggers); } } this.setDeviceConsumptionCache(ruleEngine.getId(), ruleEngineDetail); } } this.cronTaskManager.performConsumptionTask(); } private void startCronTask() { List ruleEngineCronList = this.ruleEngineCronService.getTurnedOnCron(); Function ruleEngineCronFunction = (vo) -> vo.getRuleEngineId() + ":" + vo.getProjectId() + ":" + vo.getSpaceId(); Map> ruleEngineCronMap = (Map)ruleEngineCronList.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.groupingBy(ruleEngineCronFunction, Collectors.mapping((vo) -> new CronTrigger(vo.getCron()), Collectors.toList()))); if (CollectionUtils.isNotEmpty(ruleEngineCronMap)) { ruleEngineCronMap.forEach((identifier, list) -> { String[] idAndProjectId = identifier.split(":"); Long ruleEngineId = Long.valueOf(idAndProjectId[0]); Long projectId = Long.valueOf(idAndProjectId[1]); Long spaceId = Long.valueOf(idAndProjectId[2]); String detail = this.ruleEngineService.getDetail(Long.valueOf(idAndProjectId[0])); if (StringUtils.isNotEmpty(detail)) { RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(detail, RuleEngineDetail.class); this.processCronTask(ruleEngineId, projectId, spaceId, ruleEngineDetail, list); } }); } } /** * 与库里的规则详情对齐能耗类设备触发器。去掉/改掉 consumption 时必须 remove,否则 ConsumptionJob 仍用旧缓存。 */ public void setDeviceConsumptionCache(Long ruleEngineId, RuleEngineDetail ruleEngineDetail) { if (ruleEngineId == null) { return; } List deviceTriggers = this.ruleEngineService.getDeviceTriggers(ruleEngineDetail.getTriggers()); if (deviceTriggers == null || deviceTriggers.isEmpty()) { this.consumptionTriggerCache.remove(ruleEngineId); return; } List consumptionTriggers = deviceTriggers.stream() .filter((t) -> "consumption".equals(t.getMethod())) .collect(Collectors.toList()); if (consumptionTriggers.isEmpty()) { this.consumptionTriggerCache.remove(ruleEngineId); } else { this.consumptionTriggerCache.put(ruleEngineId, consumptionTriggers); } // List spaceTriggers = this.ruleEngineService.getSpaceTriggers(ruleEngineDetail.getTriggers()); // if (deviceTriggers != null && !spaceTriggers.isEmpty()) { // this.spaceTriggerCache.put(ruleEngineId, spaceTriggers); // } } public void removeDeviceConsumptionCache(Long ruleEngineId) { if (ruleEngineId != null) { this.consumptionTriggerCache.remove(ruleEngineId); } } private void processCronTask(Long id, Long projectId, Long spaceId, RuleEngineDetail engineDetail, List cronTriggers) { List cronConstraints = this.ruleEngineService.getCronConstraints(engineDetail.getConstraints()); List deviceConstraints = this.ruleEngineService.getDeviceConstraints(engineDetail.getConstraints()); List actions = this.ruleEngineService.getActions(engineDetail.getActions()); if (actions != null && !actions.isEmpty()) { addCronJob(id, projectId, spaceId, cronTriggers, cronConstraints, deviceConstraints, actions, this.cronTaskManager); } } public static void addCronJob(Long ruleEngineId, Long projectId, Long spaceId, List cronTriggers, List cronConstraints, List deviceConstraints, List actions, CronTaskManager cronTaskManager) { String jobGroup = RuleEngineUtil.getJobGroup(ruleEngineId); for(int i = 0; i < cronTriggers.size(); ++i) { CronTrigger cronTrigger = (CronTrigger)cronTriggers.get(i); String jobName = RuleEngineUtil.getTriggerCronJobName(i); cronTaskManager.addJob(jobName, jobGroup, cronTrigger.getCron(), CommonJob.class, cronConstraints, deviceConstraints, actions, ruleEngineId, projectId, spaceId); } } /** 查询所有已启用规则(status=1),使用 list(RuleEnginePageRequest) 替代 MyBatis-Plus list(Wrapper) */ public List getAllEnabledRuleEngine() { RuleEnginePageRequest request = new RuleEnginePageRequest(); request.setStatus(1); return this.ruleEngineService.list(request); } public void destroy() throws Exception { } public void afterPropertiesSet() throws Exception { } public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException { } public void stop() { } public boolean isRunning() { return false; } public TriggerCronTask(final RuleEngineService ruleEngineService, final RuleEngineCronService ruleEngineCronService, final CronTaskManager cronTaskManager, final Cache> consumptionTriggerCache, final Cache> spaceTriggerCache) { this.ruleEngineService = ruleEngineService; this.ruleEngineCronService = ruleEngineCronService; this.cronTaskManager = cronTaskManager; this.consumptionTriggerCache = consumptionTriggerCache; this.spaceTriggerCache = spaceTriggerCache; } }