| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- package jnpf.integrate.job;
- import cn.hutool.http.HttpRequest;
- import cn.hutool.http.Method;
- import jnpf.base.UserInfo;
- import jnpf.config.ConfigValueUtil;
- import jnpf.database.util.TenantDataSourceUtil;
- import jnpf.exception.TenantInvalidException;
- import jnpf.integrate.entity.IntegrateQueueEntity;
- import jnpf.integrate.model.nodeJson.IntegrateModel;
- import jnpf.integrate.service.IntegrateQueueService;
- import jnpf.integrate.util.IntegrateHttpModel;
- import jnpf.util.*;
- import lombok.extern.slf4j.Slf4j;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.JobExecutionContext;
- import org.quartz.JobExecutionException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.scheduling.quartz.QuartzJobBean;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @DisallowConcurrentExecution
- public class IntegrateQueryJobUtil extends QuartzJobBean {
- @Autowired
- private RedisUtil redisUtil;
- @Autowired
- private RedisTemplate redisTemplate;
- @Autowired
- private ConfigValueUtil configValueUtil;
- @Autowired
- private IntegrateQueueService integrateQueueService;
- public static Map<String, ScheduledFuture> futureList = new HashMap<>();
- private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;
- IntegrateQueryJobUtil(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
- if(scheduledThreadPoolExecutor == null) {
- scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(20, threadPoolTaskExecutor.getThreadPoolExecutor().getThreadFactory());
- }
- }
- @Override
- protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
- List<String> hashValues = redisUtil.getHashValues(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY);
- for (String id : hashValues) {
- UserInfo userInfo = JsonUtil.getJsonToBean(id, UserInfo.class);
- String tenantId = StringUtil.isNotEmpty(userInfo.getTenantId()) ? userInfo.getTenantId() : "jnpf";
- boolean useSuccess = redisTemplate.opsForValue().setIfAbsent(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + tenantId, System.currentTimeMillis(), 3600, TimeUnit.SECONDS);
- if (!useSuccess) continue;
- if (futureList.get(tenantId) == null) {
- ScheduledFuture scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(new Task(userInfo), 0, 1, TimeUnit.SECONDS);
- futureList.put(tenantId, scheduledFuture);
- }
- }
- }
- class Task implements Runnable {
- private UserInfo userInfo;
- public Task(UserInfo userInfo) {
- this.userInfo = userInfo;
- }
- @Override
- public void run() {
- String tenantId = StringUtil.isNotEmpty(userInfo.getTenantId()) ? userInfo.getTenantId() : "jnpf";
- if (configValueUtil.isMultiTenancy()) {
- try {
- TenantDataSourceUtil.switchTenant(userInfo.getTenantId());
- }catch (TenantInvalidException e){
- // 租户无效 删除缓存, 删除任务
- log.error("Task, 租户无效, 删除任务:{}", userInfo.getTenantId());
- IntegrateJobUtil.removeTenant(userInfo.getTenantId(), redisUtil);
- ScheduledFuture scheduledFuture = futureList.get(userInfo.getTenantId());
- if(scheduledFuture != null) {
- scheduledFuture.cancel(true);
- }
- }
- }
- List<IntegrateQueueEntity> list = integrateQueueService.getList();
- if (list.size() > 0) {
- String token = AuthUtil.loginTempUser(userInfo.getUserId(), userInfo.getTenantId(), true);
- String url = configValueUtil.getApiDomain() + "/api/visualdev/Integrate/executeQuery";
- for (IntegrateQueueEntity entity : list) {
- IntegrateModel model = new IntegrateModel();
- model.setUserInfo(userInfo);
- model.setId(entity.getId());
- boolean integrate = IntegrateJobUtil.getIntegrate(model, redisUtil);
- boolean useSuccess = redisTemplate.opsForValue().setIfAbsent(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + entity.getId(), System.currentTimeMillis(), 3600, TimeUnit.SECONDS);
- if (integrate && useSuccess) {
- System.out.println("执行一组调度开始----------------------------");
- IntegrateHttpModel httpModel = new IntegrateHttpModel();
- httpModel.setUserInfo(UserProvider.getUser(token));
- httpModel.setId(entity.getId());
- HttpRequest request = HttpRequest.of(url).method(Method.POST).body(JsonUtil.getObjectToString(httpModel));
- request.header(Constants.AUTHORIZATION, token);
- request.execute().body();
- System.out.println("执行一组调度结束----------------------------");
- }
- }
- }
- //删除
- redisTemplate.delete(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + tenantId);
- }
- }
- }
|