Browse Source

整合消息必答机制完成

yq 3 years ago
parent
commit
fa74a0b991

+ 13 - 5
src/main/java/com/usky/dxtop/service/config/rabbitmq/RabbitmqConfig.java

@@ -19,6 +19,8 @@ import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Optional;
+
 /**
  * @author yq
  * @date 2021/9/9 13:37
@@ -53,6 +55,8 @@ public class RabbitmqConfig {
         connectionFactory.setPort(rabbitProperties.getPort());
         connectionFactory.setUsername(rabbitProperties.getUsername());
         connectionFactory.setPassword(rabbitProperties.getPassword());
+        connectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
+        connectionFactory.setPublisherReturns(true);
         connectionFactory.setVirtualHost(vHost);
         return connectionFactory;
     }
@@ -68,11 +72,15 @@ public class RabbitmqConfig {
         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
             if (ack) {
                 log.info("消息成功发送到Exchange");
-                String msgId = correlationData.getId();
-                MsgLog msgLog = new MsgLog();
-                msgLog.setId(Long.parseLong(msgId));
-                msgLog.setMsgFlag(MsgLogStatus.DELIVER_SUCCESS.getCode());
-                msgLogService.updateById(msgLog);
+                Optional.ofNullable(correlationData)
+                        .ifPresent(corre -> Optional.ofNullable(corre.getId())
+                                .ifPresent(id -> {
+                                    String msgId = corre.getId();
+                                    MsgLog msgLog = new MsgLog();
+                                    msgLog.setId(Long.parseLong(msgId));
+                                    msgLog.setMsgFlag(MsgLogStatus.DELIVER_SUCCESS.getCode());
+                                    msgLogService.updateById(msgLog);
+                                }));
             } else {
                 log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
             }