package jnpf.integrate.job; import cn.hutool.http.HttpRequest; import cn.hutool.http.Method; import jnpf.base.UserInfo; import jnpf.config.ConfigValueUtil; import jnpf.database.util.TenantDataSourceUtil; import jnpf.exception.TenantInvalidException; import jnpf.integrate.entity.IntegrateQueueEntity; import jnpf.integrate.model.nodeJson.IntegrateModel; import jnpf.integrate.service.IntegrateQueueService; import jnpf.integrate.util.IntegrateHttpModel; import jnpf.util.*; import lombok.extern.slf4j.Slf4j; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.quartz.QuartzJobBean; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @DisallowConcurrentExecution public class IntegrateQueryJobUtil extends QuartzJobBean { @Autowired private RedisUtil redisUtil; @Autowired private RedisTemplate redisTemplate; @Autowired private ConfigValueUtil configValueUtil; @Autowired private IntegrateQueueService integrateQueueService; public static Map futureList = new HashMap<>(); private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null; IntegrateQueryJobUtil(ThreadPoolTaskExecutor threadPoolTaskExecutor) { if(scheduledThreadPoolExecutor == null) { scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(20, threadPoolTaskExecutor.getThreadPoolExecutor().getThreadFactory()); } } @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { List hashValues = redisUtil.getHashValues(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY); for (String id : hashValues) { UserInfo userInfo = JsonUtil.getJsonToBean(id, UserInfo.class); String tenantId = StringUtil.isNotEmpty(userInfo.getTenantId()) ? userInfo.getTenantId() : "jnpf"; boolean useSuccess = redisTemplate.opsForValue().setIfAbsent(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + tenantId, System.currentTimeMillis(), 3600, TimeUnit.SECONDS); if (!useSuccess) continue; if (futureList.get(tenantId) == null) { ScheduledFuture scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(new Task(userInfo), 0, 1, TimeUnit.SECONDS); futureList.put(tenantId, scheduledFuture); } } } class Task implements Runnable { private UserInfo userInfo; public Task(UserInfo userInfo) { this.userInfo = userInfo; } @Override public void run() { String tenantId = StringUtil.isNotEmpty(userInfo.getTenantId()) ? userInfo.getTenantId() : "jnpf"; if (configValueUtil.isMultiTenancy()) { try { TenantDataSourceUtil.switchTenant(userInfo.getTenantId()); }catch (TenantInvalidException e){ // 租户无效 删除缓存, 删除任务 log.error("Task, 租户无效, 删除任务:{}", userInfo.getTenantId()); IntegrateJobUtil.removeTenant(userInfo.getTenantId(), redisUtil); ScheduledFuture scheduledFuture = futureList.get(userInfo.getTenantId()); if(scheduledFuture != null) { scheduledFuture.cancel(true); } } } List list = integrateQueueService.getList(); if (list.size() > 0) { String token = AuthUtil.loginTempUser(userInfo.getUserId(), userInfo.getTenantId(), true); String url = configValueUtil.getApiDomain() + "/api/visualdev/Integrate/executeQuery"; for (IntegrateQueueEntity entity : list) { IntegrateModel model = new IntegrateModel(); model.setUserInfo(userInfo); model.setId(entity.getId()); boolean integrate = IntegrateJobUtil.getIntegrate(model, redisUtil); boolean useSuccess = redisTemplate.opsForValue().setIfAbsent(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + entity.getId(), System.currentTimeMillis(), 3600, TimeUnit.SECONDS); if (integrate && useSuccess) { System.out.println("执行一组调度开始----------------------------"); IntegrateHttpModel httpModel = new IntegrateHttpModel(); httpModel.setUserInfo(UserProvider.getUser(token)); httpModel.setId(entity.getId()); HttpRequest request = HttpRequest.of(url).method(Method.POST).body(JsonUtil.getObjectToString(httpModel)); request.header(Constants.AUTHORIZATION, token); request.execute().body(); System.out.println("执行一组调度结束----------------------------"); } } } //删除 redisTemplate.delete(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + tenantId); } } }