package jnpf.handler; import com.alibaba.fastjson.JSON; import jakarta.validation.constraints.NotNull; import jnpf.constant.GlobalConst; import jnpf.consts.ProjectEventConst; import jnpf.consts.RedisConst; import jnpf.module.ProjectEvent; import jnpf.module.ProjectEventInstance; import jnpf.properties.EventProperty; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyspaceEventMessageListener; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.lang.Nullable; import java.util.Objects; /** * 自定义事件监听, Redis渠道, 收到通知后发送Spring事件(RedisEventInstance) */ @Slf4j public class ProjectEventRedisMessageHandler extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { private static Topic TOPIC_ALL_KEYEVENTS; private String keyprefix; private @Nullable ApplicationEventPublisher publisher; /** * Creates new {@link KeyspaceEventMessageListener}. * * @param listenerContainer must not be {@literal null}. */ public ProjectEventRedisMessageHandler(RedisMessageListenerContainer listenerContainer, EventProperty eventProperties) { super(listenerContainer); if(Objects.equals(eventProperties.getRedisPublishType(), ProjectEventConst.REDIS_PUBLISH_TYPE_ALL)){ // 只订阅当前配置的库索引 keyprefix = ProjectEventConst.DEFAULT_CHANNEL_PREFIX; }else{ // 订阅同一个Redis里的所有消息 keyprefix = RedisConst.REDIS_EVENT_KEY + ProjectEventConst.DEFAULT_CHANNEL_PREFIX; } TOPIC_ALL_KEYEVENTS = new PatternTopic(keyprefix + "*"); log.info("初始化自定义事件Redis监听器"); } @Override protected void doRegister(RedisMessageListenerContainer container) { container.addMessageListener(this, TOPIC_ALL_KEYEVENTS); } @Override protected void doHandleMessage(@NotNull Message message) { String channel = new String(message.getChannel(), GlobalConst.DEFAULT_CHARSET); if (log.isDebugEnabled()) { log.debug("事件监听收到Redis消息, channel: {}, body: {}", channel, new String(message.getBody(), GlobalConst.DEFAULT_CHARSET)); } publishEvent(JSON.parseObject(message.getBody(), ProjectEventInstance.class)); } /** * Publish the event in case an {@link ApplicationEventPublisher} is set. * * @param event can be {@literal null}. */ protected void publishEvent(ProjectEvent event) { if (publisher != null) { event.setAsync(true); this.publisher.publishEvent(event); } } @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { publisher = applicationEventPublisher; } }