| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package jnpf.event;
- import cn.hutool.core.text.StrPool;
- import com.alibaba.fastjson.JSON;
- import com.baomidou.lock.LockInfo;
- import com.baomidou.lock.LockTemplate;
- import jnpf.base.UserInfo;
- import jnpf.config.ConfigValueUtil;
- import jnpf.consts.ProjectEventConst;
- import jnpf.consts.RedisConst;
- import jnpf.database.util.TenantDataSourceUtil;
- import jnpf.module.ProjectEvent;
- import jnpf.module.ProjectEventInstance;
- import jnpf.util.StringUtil;
- import jnpf.util.UserProvider;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.event.EventListener;
- import org.springframework.scheduling.annotation.Async;
- import java.util.List;
- import java.util.Objects;
- import java.util.function.Consumer;
- /**
- * 自定义事件执行
- */
- @Slf4j
- public class ProjectEventProccess {
- private final LockTemplate lockTemplate;
- private final ConfigValueUtil configValueUtil;
- /**
- * 集群模式锁定事件
- */
- private final long LOCK_TIME = 24 * 60 * 60000L;
- public ProjectEventProccess(LockTemplate lockTemplate, ConfigValueUtil configValueUtil) {
- this.lockTemplate = lockTemplate;
- this.configValueUtil = configValueUtil;
- }
- /**
- * 同步执行
- *
- * @param event
- */
- @EventListener(condition = "!#event.async")
- public void onApplicationEvent(ProjectEventInstance event) {
- processEvent(event);
- }
- /**
- * 异步执行
- *
- * @param event
- */
- @Async
- @EventListener(condition = "#event.async")
- public void onApplicationEventAsync(ProjectEventInstance event) {
- processEvent(event);
- }
- private void processEvent(ProjectEvent event) {
- String topic = event.getToptic();
- List<Consumer<ProjectEvent>> eventListener = ProjectEventHolder.getEventListener(topic, event);
- LockInfo lock = null;
- // 集群消费模式, 添加Redis锁
- if (!eventListener.isEmpty() && Objects.equals(event.getMessageModel(), ProjectEventConst.EVENT_PUBLISH_MODEL_CLUSTERING)) {
- // lock:服务名:事件编号
- String key = RedisConst.REDIS_LOCK4J_PREFIX + StrPool.COLON + configValueUtil.getApplicationName() + StrPool.COLON + event.getEventId().toString();
- lock = lockTemplate.lock(key, LOCK_TIME, 0L);
- if (lock == null) {
- if (log.isDebugEnabled()) {
- log.debug("事件在其他服务已经处理: {}", JSON.toJSONString(event));
- }
- return;
- }
- }
- try {
- // 设置用户信息
- if (!eventListener.isEmpty()) {
- UserInfo userInfo = null;
- // 初始化线程用户信息
- if (event.getToken() != null) {
- userInfo = UserProvider.getUser(event.getToken());
- UserProvider.setLocalLoginUser(userInfo);
- }
- // 开启多租户 切库
- if (configValueUtil.isEnableLogicDelete()) {
- if (userInfo != null && StringUtil.isNotEmpty(userInfo.getTenantId())) {
- // 优先从用户获取多租户
- TenantDataSourceUtil.switchTenant(userInfo.getTenantId());
- } else if (event.getTenantId() != null) {
- // 从事件里获取多租户
- TenantDataSourceUtil.switchTenant(event.getTenantId());
- }
- }
- }
- for (Consumer<ProjectEvent> consumer : eventListener) {
- consumer.accept(event);
- }
- } catch (Exception e) {
- lockTemplate.releaseLock(lock);
- log.error("事件处理失败: {}", event, e);
- throw e;
- }
- }
- }
|