AsyncConfig.java 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package jnpf.config;
  2. import jnpf.base.UserInfo;
  3. import jnpf.model.tenant.TenantVO;
  4. import jnpf.util.TenantHolder;
  5. import jnpf.util.UserProvider;
  6. import lombok.AllArgsConstructor;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Primary;
  12. import org.springframework.scheduling.annotation.AsyncConfigurer;
  13. import org.springframework.scheduling.annotation.EnableAsync;
  14. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  15. import org.springframework.web.context.request.RequestAttributes;
  16. import org.springframework.web.context.request.RequestContextHolder;
  17. import java.util.concurrent.Executor;
  18. import java.util.concurrent.ThreadPoolExecutor;
  19. /**
  20. * 提供一个全局的Spring线程池对象
  21. *
  22. * @author JNPF开发平台组
  23. * @version V3.1.0
  24. * @copyright 引迈信息技术有限公司(https://www.jnpfsoft.com)
  25. * @date 2021-12-10
  26. */
  27. @Slf4j
  28. @Configuration
  29. @EnableAsync(proxyTargetClass = true)
  30. @AllArgsConstructor
  31. public class AsyncConfig implements AsyncConfigurer {
  32. // private final Map<Object, Integer> asyncTaskCount = new HashMap<>();
  33. @Primary
  34. @Bean("threadPoolTaskExecutor")
  35. @Override
  36. public Executor getAsyncExecutor() {
  37. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  38. // 设置线程池核心容量
  39. executor.setCorePoolSize(10);
  40. // 设置线程池最大容量
  41. executor.setMaxPoolSize(50);
  42. // 设置任务队列长度
  43. executor.setQueueCapacity(2000);
  44. // 设置线程超时时间
  45. executor.setKeepAliveSeconds(30);
  46. // 设置线程名称前缀
  47. executor.setThreadNamePrefix("sysTaskExecutor");
  48. // 设置任务丢弃后的处理策略
  49. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  50. executor.setTaskDecorator( r ->{
  51. //实现线程上下文穿透, 异步线程内无法获取之前的Request,租户信息等, 如有新的上下文对象在此处添加
  52. //此方法在请求结束后在无法获取request, 下方完整异步Servlet请求
  53. // RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
  54. TenantVO tenantVO = TenantHolder.getLocalTenantCache();
  55. UserInfo userInfo = UserProvider.getUser();
  56. return () -> {
  57. try {
  58. // if(attributes!= null) {
  59. // RequestContextHolder.setRequestAttributes(attributes);
  60. // }
  61. if(tenantVO != null){
  62. TenantHolder.setLocalTenantCache(tenantVO);
  63. }
  64. UserProvider.setLocalLoginUser(userInfo);
  65. r.run();
  66. } finally {
  67. UserProvider.clearLocalUser();
  68. RequestContextHolder.resetRequestAttributes();
  69. TenantHolder.clearLocalTenantCache();
  70. }
  71. };
  72. });
  73. /*
  74. //Tomcat异步Servlet只能增加吞吐量, 不能减少响应时间
  75. executor.setTaskDecorator( r ->{
  76. //实现线程上下文穿透, 异步线程内无法获取之前的Request,租户信息等, 如有新的上下文对象在此处添加
  77. RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
  78. String dataSourceId = DataSourceContextHolder.getDatasourceId();
  79. String dataSourceName = DataSourceContextHolder.getDatasourceName();
  80. final AsyncContext asyncContext = (AsyncContext) Optional.ofNullable(null).orElseGet(()->{
  81. if(attributes!= null) {
  82. HttpServletRequest request = ((ServletRequestAttributes) attributes).getRequest();
  83. HttpServletResponse response = ((ServletRequestAttributes) attributes).getResponse();
  84. synchronized (AsyncConfig.class) {
  85. //开启多个异步
  86. AsyncContext tmpAsyncContext = request.isAsyncStarted() ? request.getAsyncContext() : request.startAsync(request, response);
  87. asyncTaskCount.put(tmpAsyncContext, asyncTaskCount.getOrDefault(tmpAsyncContext, 0) +1);
  88. return tmpAsyncContext;
  89. }
  90. }
  91. return null;
  92. });
  93. return () -> {
  94. if (asyncContext != null) {
  95. asyncContext.start(() -> {
  96. if (dataSourceId != null || dataSourceName != null) {
  97. DataSourceContextHolder.setDatasource(dataSourceId, dataSourceName);
  98. }
  99. RequestContextHolder.setRequestAttributes(attributes);
  100. try {
  101. r.run();
  102. }finally{
  103. synchronized (AsyncConfig.class){
  104. //多个异步 最后一个执行关闭
  105. int count = asyncTaskCount.get(asyncContext)-1;
  106. if(count > 0){
  107. asyncTaskCount.put(asyncContext, count);
  108. }else{
  109. asyncTaskCount.remove(asyncContext);
  110. asyncContext.complete();
  111. }
  112. }
  113. RequestContextHolder.resetRequestAttributes();
  114. DataSourceContextHolder.clearDatasourceType();
  115. }
  116. });
  117. } else {
  118. if (dataSourceId != null || dataSourceName != null) {
  119. DataSourceContextHolder.setDatasource(dataSourceId, dataSourceName);
  120. }
  121. try {
  122. r.run();
  123. }finally{
  124. DataSourceContextHolder.clearDatasourceType();
  125. }
  126. }
  127. };
  128. });
  129. */
  130. return executor;
  131. }
  132. @Bean("defaultExecutor")
  133. public ThreadPoolTaskExecutor getAsyncExecutorDef(@Qualifier("threadPoolTaskExecutor") Executor executor) {
  134. return (ThreadPoolTaskExecutor) executor;
  135. }
  136. }