RabbitmqConfig.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package com.usky.dxtop.service.config.rabbitmq;
  2. import com.usky.dxtop.model.MsgLog;
  3. import com.usky.dxtop.service.MsgLogService;
  4. import com.usky.dxtop.service.emun.MsgLogStatus;
  5. import lombok.Data;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.amqp.core.AcknowledgeMode;
  8. import org.springframework.amqp.core.AmqpAdmin;
  9. import org.springframework.amqp.core.DirectExchange;
  10. import org.springframework.amqp.core.Queue;
  11. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  12. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  13. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  14. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  15. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
  18. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  19. import org.springframework.context.annotation.Configuration;
  20. import java.util.Optional;
  21. /**
  22. * @author yq
  23. * @date 2021/9/9 13:37
  24. */
  25. @Slf4j
  26. @Data
  27. @Configuration
  28. public class RabbitmqConfig {
  29. public final static String CONNECTION = "ConnectionFactory";
  30. public final static String TEMPLATE = "RabbitTemplate";
  31. public final static String LISTENER = "ListenerFactory";
  32. public final static String ADMIN = "Admin";
  33. public final static String EXCHANGE = "Exchange";
  34. public final static String QUEUE = "Queue";
  35. public final static String BINDING = "Bin";
  36. @Autowired
  37. private RabbitProperties rabbitProperties;
  38. @Autowired
  39. private MsgLogService msgLogService;
  40. /**
  41. * 连接工厂
  42. * @param vHost
  43. * @return
  44. */
  45. public ConnectionFactory connectionFactory(String vHost) {
  46. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  47. connectionFactory.setHost(rabbitProperties.getHost());
  48. connectionFactory.setPort(rabbitProperties.getPort());
  49. connectionFactory.setUsername(rabbitProperties.getUsername());
  50. connectionFactory.setPassword(rabbitProperties.getPassword());
  51. connectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
  52. connectionFactory.setPublisherReturns(true);
  53. connectionFactory.setVirtualHost(vHost);
  54. return connectionFactory;
  55. }
  56. /**
  57. * 发送消息模版
  58. * @param connectionFactory
  59. * @return
  60. */
  61. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  62. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  63. rabbitTemplate.setMessageConverter(this.messageConverter());
  64. // 消息是否成功发送到Exchange
  65. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  66. if (ack) {
  67. Optional.ofNullable(correlationData)
  68. .ifPresent(corre -> Optional.ofNullable(corre.getId())
  69. .ifPresent(id -> {
  70. String msgId = corre.getId();
  71. MsgLog msgLog = new MsgLog();
  72. msgLog.setId(Long.parseLong(msgId));
  73. msgLog.setMsgFlag(MsgLogStatus.DELIVER_SUCCESS.getCode());
  74. msgLogService.updateById(msgLog);
  75. }));
  76. } else {
  77. log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
  78. }
  79. });
  80. // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
  81. rabbitTemplate.setMandatory(true);
  82. // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
  83. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  84. log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
  85. });
  86. return rabbitTemplate;
  87. }
  88. /**
  89. * 监听工厂
  90. * @param configurer
  91. * @param connectionFactory
  92. * @return
  93. */
  94. public SimpleRabbitListenerContainerFactory listenerFactory(
  95. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  96. ConnectionFactory connectionFactory) {
  97. SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
  98. //设置手动ack
  99. listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
  100. configurer.configure(listenerContainerFactory, connectionFactory);
  101. return listenerContainerFactory;
  102. }
  103. /**
  104. * 序列化
  105. * @return
  106. */
  107. public Jackson2JsonMessageConverter messageConverter() {
  108. return new Jackson2JsonMessageConverter();
  109. }
  110. /**
  111. * 交换机
  112. * @param name
  113. * @param amqpAdmin
  114. * @return
  115. */
  116. public DirectExchange exchange(String name, AmqpAdmin amqpAdmin){
  117. DirectExchange exchange = new DirectExchange(name, true, false);
  118. exchange.setAdminsThatShouldDeclare(amqpAdmin);
  119. return exchange;
  120. }
  121. /**
  122. * 队列
  123. * @param name
  124. * @param amqpAdmin
  125. * @return
  126. */
  127. public Queue queue(String name, AmqpAdmin amqpAdmin){
  128. Queue queue = new Queue(name, true);
  129. queue.setAdminsThatShouldDeclare(amqpAdmin);
  130. return queue;
  131. }
  132. }