PublishEventUtil.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package jnpf.util;
  2. import com.baomidou.dynamic.datasource.annotation.DsTxEventListener;
  3. import com.baomidou.dynamic.datasource.tx.TransactionContext;
  4. import jnpf.base.UserInfo;
  5. import jnpf.consts.ProjectEventConst;
  6. import jnpf.event.ProjectEventSender;
  7. import jnpf.module.*;
  8. import lombok.Getter;
  9. import org.springframework.context.ApplicationEventPublisher;
  10. import org.springframework.context.ApplicationEventPublisherAware;
  11. import org.springframework.context.event.EventListener;
  12. import org.springframework.transaction.event.TransactionalEventListener;
  13. import org.springframework.transaction.support.TransactionSynchronizationManager;
  14. import org.springframework.util.Assert;
  15. import org.springframework.util.StringUtils;
  16. import java.util.Objects;
  17. public class PublishEventUtil implements ApplicationEventPublisherAware {
  18. @Getter
  19. private static ApplicationEventPublisher publisher;
  20. private static ProjectEventSender eventSender = null;
  21. public PublishEventUtil(ProjectEventSender eventSender) {
  22. PublishEventUtil.eventSender = eventSender;
  23. }
  24. /**
  25. * 发布自定义事件
  26. * async: 是否异步 | 默认异步
  27. * messageModel: 1:集群(相同的服务中只有一个服务消费), 2:广播(所有服务都消费), 3:本地消息(当前进程消费) | 默认集群模式
  28. * afterCommitTransaction: 如果当前存在事务, 通知在事务提交成功后 |默认否
  29. * Redis渠道不支持重试, 禁用同步模式
  30. * QUEUE渠道同步模式如果报错可以触发MQ重试
  31. */
  32. public static void publish(ProjectEventBuilder eventBuilder) {
  33. ProjectEvent newEvent = null;
  34. if (eventBuilder.isAfterCommitTransaction()) {
  35. // 开启提交事务后发送, 且当前存在事务
  36. if (!TransactionSynchronizationManager.isActualTransactionActive() && !StringUtils.hasText(TransactionContext.getXID())) {
  37. newEvent = ProjectEventPublishTransaction.parseEvent(eventBuilder);
  38. }
  39. }
  40. if(newEvent == null){
  41. newEvent = ProjectEventPublish.parseEvent(eventBuilder);
  42. }
  43. PublishEventUtil.getPublisher().publishEvent(newEvent);
  44. }
  45. /**
  46. * 发送本地事件
  47. * @see #publish(ProjectEventBuilder)
  48. */
  49. public static void publishLocalEvent(ProjectEventBuilder eventBuilder) {
  50. eventBuilder.setMessageModel(ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL);
  51. publish(eventBuilder);
  52. }
  53. @EventListener
  54. public void onPublishEvent(ProjectEventPublish event) {
  55. processPublishEvent(event);
  56. }
  57. @TransactionalEventListener
  58. @DsTxEventListener
  59. public void onPublishTransactionEvent(ProjectEventPublishTransaction event) {
  60. processPublishEvent(event);
  61. }
  62. private void processPublishEvent(ProjectEvent event) {
  63. Assert.notNull(eventSender, "当前发布渠道未配置");
  64. if(event.getToken() == null){
  65. String token = UserProvider.getToken();
  66. if(token == null){
  67. UserInfo userInfo = UserProvider.getLocalLoginUser();
  68. if(userInfo != null){
  69. token = userInfo.getToken();
  70. }
  71. }
  72. event.setToken(token);
  73. }
  74. if(event.getTenantId() == null){
  75. event.setTenantId(TenantHolder.getDatasourceId());
  76. }
  77. if(Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL)){
  78. PublishEventUtil.getPublisher().publishEvent(ProjectEventInstance.parseEvent(event));
  79. }else {
  80. eventSender.send(event);
  81. }
  82. }
  83. @Override
  84. public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
  85. PublishEventUtil.publisher = applicationEventPublisher;
  86. }
  87. }