| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package com.usky.rule.crons;
- import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
- import com.usky.rule.domain.RuleEngine;
- import com.usky.rule.service.RuleEngineCronService;
- import com.usky.rule.service.RuleEngineService;
- import com.usky.rule.util.CronUtil;
- import com.usky.rule.util.JsonUtil;
- import com.usky.rule.util.RuleEngineUtil;
- import com.usky.rule.vo.RuleEngineCronVO;
- import com.usky.rule.vo.RuleEngineDetail;
- import com.usky.rule.vo.action.RuleEngineAction;
- import com.usky.rule.vo.constraint.CronConstraint;
- import com.usky.rule.vo.constraint.DeviceConstraint;
- import com.usky.rule.vo.trigger.CronTrigger;
- import com.usky.rule.vo.trigger.DeviceTrigger;
- import com.usky.rule.vo.RuleEnginePageRequest;
- import com.usky.rule.vo.trigger.SpaceTrigger;
- import com.usky.rule.config.CronTaskManager;
- import com.usky.rule.jobs.CommonJob;
- import java.util.List;
- import java.util.Map;
- import java.util.function.Function;
- import java.util.stream.Collectors;
- import org.apache.commons.lang3.StringUtils;
- import org.cache2k.Cache;
- import org.jetbrains.annotations.NotNull;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.DisposableBean;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.SmartLifecycle;
- import org.springframework.stereotype.Component;
- @Component
- public class TriggerCronTask implements ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle {
- private RuleEngineService ruleEngineService;
- private RuleEngineCronService ruleEngineCronService;
- private CronTaskManager cronTaskManager;
- private Cache<Long, List<DeviceTrigger>> consumptionTriggerCache;
- private Cache<Long, List<SpaceTrigger>> spaceTriggerCache;
- public void start() {
- List<RuleEngine> enabledRuleEngineList = this.getAllEnabledRuleEngine();
- enabledRuleEngineList = (List)enabledRuleEngineList.stream().filter((ruleEnginex) -> StringUtils.isNotBlank(ruleEnginex.getDetail())).collect(Collectors.toList());
- if (!enabledRuleEngineList.isEmpty()) {
- for(RuleEngine ruleEngine : enabledRuleEngineList) {
- RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(ruleEngine.getDetail(), RuleEngineDetail.class);
- List<CronTrigger> cronTriggers = this.ruleEngineService.getCronTriggers(ruleEngineDetail.getTriggers());
- if (cronTriggers != null) {
- cronTriggers = (List)cronTriggers.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.toList());
- if (!cronTriggers.isEmpty()) {
- this.processCronTask(ruleEngine.getId(), ruleEngine.getProjectId(), ruleEngine.getSpaceId(), ruleEngineDetail, cronTriggers);
- }
- }
- this.setDeviceConsumptionCache(ruleEngine.getId(), ruleEngineDetail);
- }
- }
- this.cronTaskManager.performConsumptionTask();
- }
- private void startCronTask() {
- List<RuleEngineCronVO> ruleEngineCronList = this.ruleEngineCronService.getTurnedOnCron();
- Function<RuleEngineCronVO, String> ruleEngineCronFunction = (vo) -> vo.getRuleEngineId() + ":" + vo.getProjectId() + ":" + vo.getSpaceId();
- Map<String, List<CronTrigger>> ruleEngineCronMap = (Map)ruleEngineCronList.stream().filter((cronTrigger) -> CronUtil.isCronMatched(cronTrigger.getCron())).collect(Collectors.groupingBy(ruleEngineCronFunction, Collectors.mapping((vo) -> new CronTrigger(vo.getCron()), Collectors.toList())));
- if (CollectionUtils.isNotEmpty(ruleEngineCronMap)) {
- ruleEngineCronMap.forEach((identifier, list) -> {
- String[] idAndProjectId = identifier.split(":");
- Long ruleEngineId = Long.valueOf(idAndProjectId[0]);
- Long projectId = Long.valueOf(idAndProjectId[1]);
- Long spaceId = Long.valueOf(idAndProjectId[2]);
- String detail = this.ruleEngineService.getDetail(Long.valueOf(idAndProjectId[0]));
- if (StringUtils.isNotEmpty(detail)) {
- RuleEngineDetail ruleEngineDetail = (RuleEngineDetail)JsonUtil.toObject(detail, RuleEngineDetail.class);
- this.processCronTask(ruleEngineId, projectId, spaceId, ruleEngineDetail, list);
- }
- });
- }
- }
- /**
- * 与库里的规则详情对齐能耗类设备触发器。去掉/改掉 consumption 时必须 remove,否则 ConsumptionJob 仍用旧缓存。
- */
- public void setDeviceConsumptionCache(Long ruleEngineId, RuleEngineDetail ruleEngineDetail) {
- if (ruleEngineId == null) {
- return;
- }
- List<DeviceTrigger> deviceTriggers = this.ruleEngineService.getDeviceTriggers(ruleEngineDetail.getTriggers());
- if (deviceTriggers == null || deviceTriggers.isEmpty()) {
- this.consumptionTriggerCache.remove(ruleEngineId);
- return;
- }
- List<DeviceTrigger> consumptionTriggers = deviceTriggers.stream()
- .filter((t) -> "consumption".equals(t.getMethod()))
- .collect(Collectors.toList());
- if (consumptionTriggers.isEmpty()) {
- this.consumptionTriggerCache.remove(ruleEngineId);
- } else {
- this.consumptionTriggerCache.put(ruleEngineId, consumptionTriggers);
- }
- // List<SpaceTrigger> spaceTriggers = this.ruleEngineService.getSpaceTriggers(ruleEngineDetail.getTriggers());
- // if (deviceTriggers != null && !spaceTriggers.isEmpty()) {
- // this.spaceTriggerCache.put(ruleEngineId, spaceTriggers);
- // }
- }
- public void removeDeviceConsumptionCache(Long ruleEngineId) {
- if (ruleEngineId != null) {
- this.consumptionTriggerCache.remove(ruleEngineId);
- }
- }
- private void processCronTask(Long id, Long projectId, Long spaceId, RuleEngineDetail engineDetail, List<CronTrigger> cronTriggers) {
- List<CronConstraint> cronConstraints = this.ruleEngineService.getCronConstraints(engineDetail.getConstraints());
- List<DeviceConstraint> deviceConstraints = this.ruleEngineService.getDeviceConstraints(engineDetail.getConstraints());
- List<RuleEngineAction> actions = this.ruleEngineService.getActions(engineDetail.getActions());
- if (actions != null && !actions.isEmpty()) {
- addCronJob(id, projectId, spaceId, cronTriggers, cronConstraints, deviceConstraints, actions, this.cronTaskManager);
- }
- }
- public static void addCronJob(Long ruleEngineId, Long projectId, Long spaceId, List<CronTrigger> cronTriggers, List<CronConstraint> cronConstraints, List<DeviceConstraint> deviceConstraints, List<RuleEngineAction> actions, CronTaskManager cronTaskManager) {
- String jobGroup = RuleEngineUtil.getJobGroup(ruleEngineId);
- for(int i = 0; i < cronTriggers.size(); ++i) {
- CronTrigger cronTrigger = (CronTrigger)cronTriggers.get(i);
- String jobName = RuleEngineUtil.getTriggerCronJobName(i);
- cronTaskManager.addJob(jobName, jobGroup, cronTrigger.getCron(), CommonJob.class, cronConstraints, deviceConstraints, actions, ruleEngineId, projectId, spaceId);
- }
- }
- /** 查询所有已启用规则(status=1),使用 list(RuleEnginePageRequest) 替代 MyBatis-Plus list(Wrapper) */
- public List<RuleEngine> getAllEnabledRuleEngine() {
- RuleEnginePageRequest request = new RuleEnginePageRequest();
- request.setStatus(1);
- return this.ruleEngineService.list(request);
- }
- public void destroy() throws Exception {
- }
- public void afterPropertiesSet() throws Exception {
- }
- public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
- }
- public void stop() {
- }
- public boolean isRunning() {
- return false;
- }
- public TriggerCronTask(final RuleEngineService ruleEngineService, final RuleEngineCronService ruleEngineCronService, final CronTaskManager cronTaskManager, final Cache<Long, List<DeviceTrigger>> consumptionTriggerCache, final Cache<Long, List<SpaceTrigger>> spaceTriggerCache) {
- this.ruleEngineService = ruleEngineService;
- this.ruleEngineCronService = ruleEngineCronService;
- this.cronTaskManager = cronTaskManager;
- this.consumptionTriggerCache = consumptionTriggerCache;
- this.spaceTriggerCache = spaceTriggerCache;
- }
- }
|