package jnpf.event; import cn.hutool.core.text.StrPool; import com.alibaba.fastjson.JSON; import com.baomidou.lock.LockInfo; import com.baomidou.lock.LockTemplate; import jnpf.base.UserInfo; import jnpf.config.ConfigValueUtil; import jnpf.consts.ProjectEventConst; import jnpf.consts.RedisConst; import jnpf.database.util.TenantDataSourceUtil; import jnpf.module.ProjectEvent; import jnpf.module.ProjectEventInstance; import jnpf.util.StringUtil; import jnpf.util.UserProvider; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import java.util.List; import java.util.Objects; import java.util.function.Consumer; /** * 自定义事件执行 */ @Slf4j public class ProjectEventProccess { private final LockTemplate lockTemplate; private final ConfigValueUtil configValueUtil; /** * 集群模式锁定事件 */ private final long LOCK_TIME = 24 * 60 * 60000L; public ProjectEventProccess(LockTemplate lockTemplate, ConfigValueUtil configValueUtil) { this.lockTemplate = lockTemplate; this.configValueUtil = configValueUtil; } /** * 同步执行 * * @param event */ @EventListener(condition = "!#event.async") public void onApplicationEvent(ProjectEventInstance event) { processEvent(event); } /** * 异步执行 * * @param event */ @Async @EventListener(condition = "#event.async") public void onApplicationEventAsync(ProjectEventInstance event) { processEvent(event); } private void processEvent(ProjectEvent event) { String topic = event.getToptic(); List> eventListener = ProjectEventHolder.getEventListener(topic, event); LockInfo lock = null; // 集群消费模式, 添加Redis锁 if (!eventListener.isEmpty() && Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_CLUSTERING)) { // lock:服务名:事件编号 String key = RedisConst.REDIS_LOCK4J_PREFIX + StrPool.COLON + configValueUtil.getApplicationName() + StrPool.COLON + event.getEventId().toString(); lock = lockTemplate.lock(key, LOCK_TIME, 0L); if (lock == null) { if (log.isDebugEnabled()) { log.debug("事件在其他服务已经处理: {}", JSON.toJSONString(event)); } return; } } try { // 设置用户信息 if (!eventListener.isEmpty()) { UserInfo userInfo = null; // 初始化线程用户信息 if (event.getToken() != null) { userInfo = UserProvider.getUser(event.getToken()); UserProvider.setLocalLoginUser(userInfo); } // 开启多租户 切库 if (configValueUtil.isEnableLogicDelete()) { if (userInfo != null && StringUtil.isNotEmpty(userInfo.getTenantId())) { // 优先从用户获取多租户 TenantDataSourceUtil.switchTenant(userInfo.getTenantId()); } else if (event.getTenantId() != null) { // 从事件里获取多租户 TenantDataSourceUtil.switchTenant(event.getTenantId()); } } } for (Consumer consumer : eventListener) { consumer.accept(event); } } catch (Exception e) { lockTemplate.releaseLock(lock); log.error("事件处理失败: {}", event, e); throw e; } } }