package jnpf.util; import com.baomidou.dynamic.datasource.annotation.DsTxEventListener; import com.baomidou.dynamic.datasource.tx.TransactionContext; import jnpf.base.UserInfo; import jnpf.consts.ProjectEventConst; import jnpf.event.ProjectEventSender; import jnpf.module.*; import lombok.Getter; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.event.EventListener; import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import java.util.Objects; public class PublishEventUtil implements ApplicationEventPublisherAware { @Getter private static ApplicationEventPublisher publisher; private static ProjectEventSender eventSender = null; public PublishEventUtil(ProjectEventSender eventSender) { PublishEventUtil.eventSender = eventSender; } /** * 发布自定义事件 * async: 是否异步 | 默认异步 * messageModel: 1:集群(相同的服务中只有一个服务消费), 2:广播(所有服务都消费), 3:本地消息(当前进程消费) | 默认集群模式 * afterCommitTransaction: 如果当前存在事务, 通知在事务提交成功后 |默认否 * Redis渠道不支持重试, 禁用同步模式 * QUEUE渠道同步模式如果报错可以触发MQ重试 */ public static void publish(ProjectEventBuilder eventBuilder) { ProjectEvent newEvent = null; if (eventBuilder.isAfterCommitTransaction()) { // 开启提交事务后发送, 且当前存在事务 if (!TransactionSynchronizationManager.isActualTransactionActive() && !StringUtils.hasText(TransactionContext.getXID())) { newEvent = ProjectEventPublishTransaction.parseEvent(eventBuilder); } } if(newEvent == null){ newEvent = ProjectEventPublish.parseEvent(eventBuilder); } PublishEventUtil.getPublisher().publishEvent(newEvent); } /** * 发送本地事件 * @see #publish(ProjectEventBuilder) */ public static void publishLocalEvent(ProjectEventBuilder eventBuilder) { eventBuilder.setMessageModel(ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL); publish(eventBuilder); } @EventListener public void onPublishEvent(ProjectEventPublish event) { processPublishEvent(event); } @TransactionalEventListener @DsTxEventListener public void onPublishTransactionEvent(ProjectEventPublishTransaction event) { processPublishEvent(event); } private void processPublishEvent(ProjectEvent event) { Assert.notNull(eventSender, "当前发布渠道未配置"); if(event.getToken() == null){ String token = UserProvider.getToken(); if(token == null){ UserInfo userInfo = UserProvider.getLocalLoginUser(); if(userInfo != null){ token = userInfo.getToken(); } } event.setToken(token); } if(event.getTenantId() == null){ event.setTenantId(TenantHolder.getDatasourceId()); } if(Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_LOCAL)){ PublishEventUtil.getPublisher().publishEvent(ProjectEventInstance.parseEvent(event)); }else { eventSender.send(event); } } @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { PublishEventUtil.publisher = applicationEventPublisher; } }