IntegrateQueryJobUtil.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package jnpf.integrate.job;
  2. import cn.hutool.http.HttpRequest;
  3. import cn.hutool.http.Method;
  4. import jnpf.base.UserInfo;
  5. import jnpf.config.ConfigValueUtil;
  6. import jnpf.database.util.TenantDataSourceUtil;
  7. import jnpf.exception.TenantInvalidException;
  8. import jnpf.integrate.entity.IntegrateQueueEntity;
  9. import jnpf.integrate.model.nodeJson.IntegrateModel;
  10. import jnpf.integrate.service.IntegrateQueueService;
  11. import jnpf.integrate.util.IntegrateHttpModel;
  12. import jnpf.util.*;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.quartz.DisallowConcurrentExecution;
  15. import org.quartz.JobExecutionContext;
  16. import org.quartz.JobExecutionException;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.data.redis.core.RedisTemplate;
  19. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  20. import org.springframework.scheduling.quartz.QuartzJobBean;
  21. import java.util.HashMap;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.concurrent.ScheduledFuture;
  25. import java.util.concurrent.ScheduledThreadPoolExecutor;
  26. import java.util.concurrent.TimeUnit;
  27. @Slf4j
  28. @DisallowConcurrentExecution
  29. public class IntegrateQueryJobUtil extends QuartzJobBean {
  30. @Autowired
  31. private RedisUtil redisUtil;
  32. @Autowired
  33. private RedisTemplate redisTemplate;
  34. @Autowired
  35. private ConfigValueUtil configValueUtil;
  36. @Autowired
  37. private IntegrateQueueService integrateQueueService;
  38. public static Map<String, ScheduledFuture> futureList = new HashMap<>();
  39. private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;
  40. IntegrateQueryJobUtil(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
  41. if(scheduledThreadPoolExecutor == null) {
  42. scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(20, threadPoolTaskExecutor.getThreadPoolExecutor().getThreadFactory());
  43. }
  44. }
  45. @Override
  46. protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
  47. List<String> hashValues = redisUtil.getHashValues(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY);
  48. for (String id : hashValues) {
  49. UserInfo userInfo = JsonUtil.getJsonToBean(id, UserInfo.class);
  50. String tenantId = StringUtil.isNotEmpty(userInfo.getTenantId()) ? userInfo.getTenantId() : "jnpf";
  51. boolean useSuccess = redisTemplate.opsForValue().setIfAbsent(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + tenantId, System.currentTimeMillis(), 3600, TimeUnit.SECONDS);
  52. if (!useSuccess) continue;
  53. if (futureList.get(tenantId) == null) {
  54. ScheduledFuture scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(new Task(userInfo), 0, 1, TimeUnit.SECONDS);
  55. futureList.put(tenantId, scheduledFuture);
  56. }
  57. }
  58. }
  59. class Task implements Runnable {
  60. private UserInfo userInfo;
  61. public Task(UserInfo userInfo) {
  62. this.userInfo = userInfo;
  63. }
  64. @Override
  65. public void run() {
  66. String tenantId = StringUtil.isNotEmpty(userInfo.getTenantId()) ? userInfo.getTenantId() : "jnpf";
  67. if (configValueUtil.isMultiTenancy()) {
  68. try {
  69. TenantDataSourceUtil.switchTenant(userInfo.getTenantId());
  70. }catch (TenantInvalidException e){
  71. // 租户无效 删除缓存, 删除任务
  72. log.error("Task, 租户无效, 删除任务:{}", userInfo.getTenantId());
  73. IntegrateJobUtil.removeTenant(userInfo.getTenantId(), redisUtil);
  74. ScheduledFuture scheduledFuture = futureList.get(userInfo.getTenantId());
  75. if(scheduledFuture != null) {
  76. scheduledFuture.cancel(true);
  77. }
  78. }
  79. }
  80. List<IntegrateQueueEntity> list = integrateQueueService.getList();
  81. if (list.size() > 0) {
  82. String token = AuthUtil.loginTempUser(userInfo.getUserId(), userInfo.getTenantId(), true);
  83. String url = configValueUtil.getApiDomain() + "/api/visualdev/Integrate/executeQuery";
  84. for (IntegrateQueueEntity entity : list) {
  85. IntegrateModel model = new IntegrateModel();
  86. model.setUserInfo(userInfo);
  87. model.setId(entity.getId());
  88. boolean integrate = IntegrateJobUtil.getIntegrate(model, redisUtil);
  89. boolean useSuccess = redisTemplate.opsForValue().setIfAbsent(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + entity.getId(), System.currentTimeMillis(), 3600, TimeUnit.SECONDS);
  90. if (integrate && useSuccess) {
  91. System.out.println("执行一组调度开始----------------------------");
  92. IntegrateHttpModel httpModel = new IntegrateHttpModel();
  93. httpModel.setUserInfo(UserProvider.getUser(token));
  94. httpModel.setId(entity.getId());
  95. HttpRequest request = HttpRequest.of(url).method(Method.POST).body(JsonUtil.getObjectToString(httpModel));
  96. request.header(Constants.AUTHORIZATION, token);
  97. request.execute().body();
  98. System.out.println("执行一组调度结束----------------------------");
  99. }
  100. }
  101. }
  102. //删除
  103. redisTemplate.delete(IntegrateJobUtil.WORKTIMEOUT_REDIS_KEY + "_key:" + tenantId);
  104. }
  105. }
  106. }