ProjectEventRedisMessageHandler.java 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package jnpf.handler;
  2. import com.alibaba.fastjson.JSON;
  3. import jakarta.validation.constraints.NotNull;
  4. import jnpf.constant.GlobalConst;
  5. import jnpf.consts.ProjectEventConst;
  6. import jnpf.consts.RedisConst;
  7. import jnpf.module.ProjectEvent;
  8. import jnpf.module.ProjectEventInstance;
  9. import jnpf.properties.EventProperty;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.context.ApplicationEventPublisher;
  12. import org.springframework.context.ApplicationEventPublisherAware;
  13. import org.springframework.data.redis.connection.Message;
  14. import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
  15. import org.springframework.data.redis.listener.PatternTopic;
  16. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  17. import org.springframework.data.redis.listener.Topic;
  18. import org.springframework.lang.Nullable;
  19. import java.util.Objects;
  20. /**
  21. * 自定义事件监听, Redis渠道, 收到通知后发送Spring事件(RedisEventInstance)
  22. */
  23. @Slf4j
  24. public class ProjectEventRedisMessageHandler extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
  25. private static Topic TOPIC_ALL_KEYEVENTS;
  26. private String keyprefix;
  27. private @Nullable ApplicationEventPublisher publisher;
  28. /**
  29. * Creates new {@link KeyspaceEventMessageListener}.
  30. *
  31. * @param listenerContainer must not be {@literal null}.
  32. */
  33. public ProjectEventRedisMessageHandler(RedisMessageListenerContainer listenerContainer, EventProperty eventProperties) {
  34. super(listenerContainer);
  35. if(Objects.equals(eventProperties.getRedisPublishType(), ProjectEventConst.REDIS_PUBLISH_TYPE_ALL)){
  36. // 只订阅当前配置的库索引
  37. keyprefix = ProjectEventConst.DEFAULT_CHANNEL_PREFIX;
  38. }else{
  39. // 订阅同一个Redis里的所有消息
  40. keyprefix = RedisConst.REDIS_EVENT_KEY + ProjectEventConst.DEFAULT_CHANNEL_PREFIX;
  41. }
  42. TOPIC_ALL_KEYEVENTS = new PatternTopic(keyprefix + "*");
  43. log.info("初始化自定义事件Redis监听器");
  44. }
  45. @Override
  46. protected void doRegister(RedisMessageListenerContainer container) {
  47. container.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
  48. }
  49. @Override
  50. protected void doHandleMessage(@NotNull Message message) {
  51. String channel = new String(message.getChannel(), GlobalConst.DEFAULT_CHARSET);
  52. if (log.isDebugEnabled()) {
  53. log.debug("事件监听收到Redis消息, channel: {}, body: {}", channel, new String(message.getBody(), GlobalConst.DEFAULT_CHARSET));
  54. }
  55. publishEvent(JSON.parseObject(message.getBody(), ProjectEventInstance.class));
  56. }
  57. /**
  58. * Publish the event in case an {@link ApplicationEventPublisher} is set.
  59. *
  60. * @param event can be {@literal null}.
  61. */
  62. protected void publishEvent(ProjectEvent event) {
  63. if (publisher != null) {
  64. event.setAsync(true);
  65. this.publisher.publishEvent(event);
  66. }
  67. }
  68. @Override
  69. public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
  70. publisher = applicationEventPublisher;
  71. }
  72. }