ProjectEventProccess.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package jnpf.event;
  2. import cn.hutool.core.text.StrPool;
  3. import com.alibaba.fastjson.JSON;
  4. import com.baomidou.lock.LockInfo;
  5. import com.baomidou.lock.LockTemplate;
  6. import jnpf.base.UserInfo;
  7. import jnpf.config.ConfigValueUtil;
  8. import jnpf.consts.ProjectEventConst;
  9. import jnpf.consts.RedisConst;
  10. import jnpf.database.util.TenantDataSourceUtil;
  11. import jnpf.module.ProjectEvent;
  12. import jnpf.module.ProjectEventInstance;
  13. import jnpf.util.StringUtil;
  14. import jnpf.util.UserProvider;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.context.event.EventListener;
  17. import org.springframework.scheduling.annotation.Async;
  18. import java.util.List;
  19. import java.util.Objects;
  20. import java.util.function.Consumer;
  21. /**
  22. * 自定义事件执行
  23. */
  24. @Slf4j
  25. public class ProjectEventProccess {
  26. private final LockTemplate lockTemplate;
  27. private final ConfigValueUtil configValueUtil;
  28. /**
  29. * 集群模式锁定事件
  30. */
  31. private final long LOCK_TIME = 24 * 60 * 60000L;
  32. public ProjectEventProccess(LockTemplate lockTemplate, ConfigValueUtil configValueUtil) {
  33. this.lockTemplate = lockTemplate;
  34. this.configValueUtil = configValueUtil;
  35. }
  36. /**
  37. * 同步执行
  38. *
  39. * @param event
  40. */
  41. @EventListener(condition = "!#event.async")
  42. public void onApplicationEvent(ProjectEventInstance event) {
  43. processEvent(event);
  44. }
  45. /**
  46. * 异步执行
  47. *
  48. * @param event
  49. */
  50. @Async
  51. @EventListener(condition = "#event.async")
  52. public void onApplicationEventAsync(ProjectEventInstance event) {
  53. processEvent(event);
  54. }
  55. private void processEvent(ProjectEvent event) {
  56. String topic = event.getToptic();
  57. List<Consumer<ProjectEvent>> eventListener = ProjectEventHolder.getEventListener(topic, event);
  58. LockInfo lock = null;
  59. // 集群消费模式, 添加Redis锁
  60. if (!eventListener.isEmpty() && Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_CLUSTERING)) {
  61. // lock:服务名:事件编号
  62. String key = RedisConst.REDIS_LOCK4J_PREFIX + StrPool.COLON + configValueUtil.getApplicationName() + StrPool.COLON + event.getEventId().toString();
  63. lock = lockTemplate.lock(key, LOCK_TIME, 0L);
  64. if (lock == null) {
  65. if (log.isDebugEnabled()) {
  66. log.debug("事件在其他服务已经处理: {}", JSON.toJSONString(event));
  67. }
  68. return;
  69. }
  70. }
  71. try {
  72. // 设置用户信息
  73. if (!eventListener.isEmpty()) {
  74. UserInfo userInfo = null;
  75. // 初始化线程用户信息
  76. if (event.getToken() != null) {
  77. userInfo = UserProvider.getUser(event.getToken());
  78. UserProvider.setLocalLoginUser(userInfo);
  79. }
  80. // 开启多租户 切库
  81. if (configValueUtil.isEnableLogicDelete()) {
  82. if (userInfo != null && StringUtil.isNotEmpty(userInfo.getTenantId())) {
  83. // 优先从用户获取多租户
  84. TenantDataSourceUtil.switchTenant(userInfo.getTenantId());
  85. } else if (event.getTenantId() != null) {
  86. // 从事件里获取多租户
  87. TenantDataSourceUtil.switchTenant(event.getTenantId());
  88. }
  89. }
  90. }
  91. for (Consumer<ProjectEvent> consumer : eventListener) {
  92. consumer.accept(event);
  93. }
  94. } catch (Exception e) {
  95. lockTemplate.releaseLock(lock);
  96. log.error("事件处理失败: {}", event, e);
  97. throw e;
  98. }
  99. }
  100. }