| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package jnpf.util;
- import com.baomidou.dynamic.datasource.annotation.DsTxEventListener;
- import com.baomidou.dynamic.datasource.tx.TransactionContext;
- import jnpf.base.UserInfo;
- import jnpf.consts.ProjectEventConst;
- import jnpf.event.ProjectEventSender;
- import jnpf.module.*;
- import lombok.Getter;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.context.ApplicationEventPublisherAware;
- import org.springframework.context.event.EventListener;
- import org.springframework.transaction.event.TransactionalEventListener;
- import org.springframework.transaction.support.TransactionSynchronizationManager;
- import org.springframework.util.Assert;
- import org.springframework.util.StringUtils;
- import java.util.Objects;
- public class PublishEventUtil implements ApplicationEventPublisherAware {
- @Getter
- private static ApplicationEventPublisher publisher;
- private static ProjectEventSender eventSender = null;
- public PublishEventUtil(ProjectEventSender eventSender) {
- PublishEventUtil.eventSender = eventSender;
- }
- /**
- * 发布自定义事件
- * async: 是否异步 | 默认异步
- * messageModel: 1:集群(相同的服务中只有一个服务消费), 2:广播(所有服务都消费), 3:本地消息(当前进程消费) | 默认集群模式
- * afterCommitTransaction: 如果当前存在事务, 通知在事务提交成功后 |默认否
- * Redis渠道不支持重试, 禁用同步模式
- * QUEUE渠道同步模式如果报错可以触发MQ重试
- */
- public static void publish(ProjectEventBuilder eventBuilder) {
- ProjectEvent newEvent = null;
- if (eventBuilder.isAfterCommitTransaction()) {
- // 开启提交事务后发送, 且当前存在事务
- if (!TransactionSynchronizationManager.isActualTransactionActive() && !StringUtils.hasText(TransactionContext.getXID())) {
- newEvent = ProjectEventPublishTransaction.parseEvent(eventBuilder);
- }
- }
- if(newEvent == null){
- newEvent = ProjectEventPublish.parseEvent(eventBuilder);
- }
- PublishEventUtil.getPublisher().publishEvent(newEvent);
- }
- /**
- * 发送本地事件
- * @see #publish(ProjectEventBuilder)
- */
- public static void publishLocalEvent(ProjectEventBuilder eventBuilder) {
- eventBuilder.setMessageModel(ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL);
- publish(eventBuilder);
- }
- @EventListener
- public void onPublishEvent(ProjectEventPublish event) {
- processPublishEvent(event);
- }
- @TransactionalEventListener
- @DsTxEventListener
- public void onPublishTransactionEvent(ProjectEventPublishTransaction event) {
- processPublishEvent(event);
- }
- private void processPublishEvent(ProjectEvent event) {
- Assert.notNull(eventSender, "当前发布渠道未配置");
- if(event.getToken() == null){
- String token = UserProvider.getToken();
- if(token == null){
- UserInfo userInfo = UserProvider.getLocalLoginUser();
- if(userInfo != null){
- token = userInfo.getToken();
- }
- }
- event.setToken(token);
- }
- if(event.getTenantId() == null){
- event.setTenantId(TenantHolder.getDatasourceId());
- }
- if(Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL)){
- PublishEventUtil.getPublisher().publishEvent(ProjectEventInstance.parseEvent(event));
- }else {
- eventSender.send(event);
- }
- }
- @Override
- public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
- PublishEventUtil.publisher = applicationEventPublisher;
- }
- }
|