123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package com.usky.dxtop.service.config.rabbitmq;
- import com.usky.dxtop.model.MsgLog;
- import com.usky.dxtop.service.MsgLogService;
- import com.usky.dxtop.service.emun.MsgLogStatus;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.AcknowledgeMode;
- import org.springframework.amqp.core.AmqpAdmin;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Configuration;
- import java.util.Optional;
- /**
- * @author yq
- * @date 2021/9/9 13:37
- */
- @Slf4j
- @Data
- @Configuration
- public class RabbitmqConfig {
- public final static String CONNECTION = "ConnectionFactory";
- public final static String TEMPLATE = "RabbitTemplate";
- public final static String LISTENER = "ListenerFactory";
- public final static String ADMIN = "Admin";
- public final static String EXCHANGE = "Exchange";
- public final static String QUEUE = "Queue";
- public final static String BINDING = "Bin";
- @Autowired
- private RabbitProperties rabbitProperties;
- @Autowired
- private MsgLogService msgLogService;
- /**
- * 连接工厂
- * @param vHost
- * @return
- */
- public ConnectionFactory connectionFactory(String vHost) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(rabbitProperties.getHost());
- connectionFactory.setPort(rabbitProperties.getPort());
- connectionFactory.setUsername(rabbitProperties.getUsername());
- connectionFactory.setPassword(rabbitProperties.getPassword());
- connectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
- connectionFactory.setPublisherReturns(true);
- connectionFactory.setVirtualHost(vHost);
- return connectionFactory;
- }
- /**
- * 发送消息模版
- * @param connectionFactory
- * @return
- */
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(this.messageConverter());
- // 消息是否成功发送到Exchange
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (ack) {
- Optional.ofNullable(correlationData)
- .ifPresent(corre -> Optional.ofNullable(corre.getId())
- .ifPresent(id -> {
- String msgId = corre.getId();
- MsgLog msgLog = new MsgLog();
- msgLog.setId(Long.parseLong(msgId));
- msgLog.setMsgFlag(MsgLogStatus.DELIVER_SUCCESS.getCode());
- msgLogService.updateById(msgLog);
- }));
- } else {
- log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
- }
- });
- // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
- rabbitTemplate.setMandatory(true);
- // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
- });
- return rabbitTemplate;
- }
- /**
- * 监听工厂
- * @param configurer
- * @param connectionFactory
- * @return
- */
- public SimpleRabbitListenerContainerFactory listenerFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
- //设置手动ack
- listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
- configurer.configure(listenerContainerFactory, connectionFactory);
- return listenerContainerFactory;
- }
- /**
- * 序列化
- * @return
- */
- public Jackson2JsonMessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- /**
- * 交换机
- * @param name
- * @param amqpAdmin
- * @return
- */
- public DirectExchange exchange(String name, AmqpAdmin amqpAdmin){
- DirectExchange exchange = new DirectExchange(name, true, false);
- exchange.setAdminsThatShouldDeclare(amqpAdmin);
- return exchange;
- }
- /**
- * 队列
- * @param name
- * @param amqpAdmin
- * @return
- */
- public Queue queue(String name, AmqpAdmin amqpAdmin){
- Queue queue = new Queue(name, true);
- queue.setAdminsThatShouldDeclare(amqpAdmin);
- return queue;
- }
- }
|