Ver Fonte

添加消息必答机制

yq há 3 anos atrás
pai
commit
2647db347f

+ 8 - 4
src/main/java/com/usky/dxtop/model/MsgLog.java

@@ -5,7 +5,7 @@ import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
 
 import java.io.Serializable;
-import java.time.LocalDateTime;
+import java.util.Date;
 
 /**
  * <p>
@@ -50,12 +50,16 @@ public class MsgLog implements Serializable {
     /**
      * 下一次重试时间
      */
-    private LocalDateTime nextTryTime;
+    private Date nextTryTime;
+
+    private Integer msgFlag;
+
+    private Long businessId;
 
     /**
      * 创建时间
      */
-    private LocalDateTime createTime;
+    private Date createTime;
 
     /**
      * 更新者
@@ -65,7 +69,7 @@ public class MsgLog implements Serializable {
     /**
      * 更新时间
      */
-    private LocalDateTime updateTime;
+    private Date updateTime;
 
     /**
      * 备注

+ 14 - 0
src/main/java/com/usky/dxtop/service/MsgLogService.java

@@ -2,6 +2,7 @@ package com.usky.dxtop.service;
 
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.usky.dxtop.model.MsgLog;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
 /**
  * <p>
@@ -12,5 +13,18 @@ import com.usky.dxtop.model.MsgLog;
  * @since 2021-10-13
  */
 public interface MsgLogService extends IService<MsgLog> {
+    /**
+     * 发送消息
+     * @param msgLog
+     */
+    MsgLog sendMessage(MsgLog msgLog, RabbitTemplate rabbitTemplate);
 
+
+    /**
+     * 判断是否重复消费
+     */
+    Boolean isRepetition(MsgLog msgLog);
+
+
+    void consumerSuccess(Long id);
 }

+ 29 - 0
src/main/java/com/usky/dxtop/service/config/rabbitmq/RabbitmqConfig.java

@@ -1,6 +1,10 @@
 package com.usky.dxtop.service.config.rabbitmq;
 
+import com.usky.dxtop.model.MsgLog;
+import com.usky.dxtop.service.MsgLogService;
+import com.usky.dxtop.service.emun.MsgLogStatus;
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AcknowledgeMode;
 import org.springframework.amqp.core.AmqpAdmin;
 import org.springframework.amqp.core.DirectExchange;
@@ -19,6 +23,7 @@ import org.springframework.context.annotation.Configuration;
  * @author yq
  * @date 2021/9/9 13:37
  */
+@Slf4j
 @Data
 @Configuration
 public class RabbitmqConfig {
@@ -34,6 +39,9 @@ public class RabbitmqConfig {
     @Autowired
     private RabbitProperties rabbitProperties;
 
+    @Autowired
+    private MsgLogService msgLogService;
+
     /**
      * 连接工厂
      * @param vHost
@@ -56,6 +64,27 @@ public class RabbitmqConfig {
     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
         rabbitTemplate.setMessageConverter(this.messageConverter());
+        // 消息是否成功发送到Exchange
+        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
+            if (ack) {
+                log.info("消息成功发送到Exchange");
+                String msgId = correlationData.getId();
+                MsgLog msgLog = new MsgLog();
+                msgLog.setId(Long.parseLong(msgId));
+                msgLog.setMsgFlag(MsgLogStatus.DELIVER_SUCCESS.getCode());
+                msgLogService.updateById(msgLog);
+            } else {
+                log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
+            }
+        });
+
+        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
+        rabbitTemplate.setMandatory(true);
+        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
+        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
+            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
+        });
+
         return rabbitTemplate;
     }
 

+ 43 - 0
src/main/java/com/usky/dxtop/service/emun/MsgLogStatus.java

@@ -0,0 +1,43 @@
+package com.usky.dxtop.service.emun;
+
+/**
+ * @author yq
+ * @date 2021/10/13 10:58
+ */
+public enum MsgLogStatus {
+
+    LOADING(1,"投递中"),
+
+    DELIVER_SUCCESS(2,"投递成功"),
+
+    DELIVER_ERROR(3,"投递失败"),
+
+    CONSUMER_SUCCESS(4,"消费成功");
+    private Integer code;
+
+    private String name;
+
+
+    MsgLogStatus(Integer code,String name){
+        this.code = code;
+        this.name = name;
+    }
+
+    public static MsgLogStatus parse(Integer code){
+        MsgLogStatus msgLogStatus = null;
+        for (MsgLogStatus m:MsgLogStatus.values()) {
+            if (m.getCode().equals(code)){
+                msgLogStatus = m;
+                break;
+            }
+        }
+        return msgLogStatus;
+    }
+
+    public Integer getCode(){
+        return code;
+    }
+    public String getName(){
+        return name;
+    }
+}

+ 29 - 0
src/main/java/com/usky/dxtop/service/impl/MsgLogServiceImpl.java

@@ -4,6 +4,10 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.usky.dxtop.mapper.MsgLogMapper;
 import com.usky.dxtop.model.MsgLog;
 import com.usky.dxtop.service.MsgLogService;
+import com.usky.dxtop.service.emun.MsgLogStatus;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.stereotype.Service;
 
 /**
@@ -14,7 +18,32 @@ import org.springframework.stereotype.Service;
  * @author yq
  * @since 2021-10-13
  */
+@Slf4j
 @Service
 public class MsgLogServiceImpl extends ServiceImpl<MsgLogMapper, MsgLog> implements MsgLogService {
 
+    @Override
+    public MsgLog sendMessage(MsgLog msgLog, RabbitTemplate rabbitTemplate) {
+        //入库
+        this.save(msgLog);
+        //发送消息
+        CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
+        rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),correlationData);
+        return msgLog;
+    }
+
+    @Override
+    public Boolean isRepetition(MsgLog msgLog) {
+        return !(null == msgLog || msgLog.getMsgFlag().equals(MsgLogStatus.CONSUMER_SUCCESS.getCode()));
+    }
+
+    @Override
+    public void consumerSuccess(Long id) {
+        MsgLog msgLog = this.getById(id);
+        if (this.isRepetition(msgLog)){
+            msgLog.setMsgFlag(MsgLogStatus.CONSUMER_SUCCESS.getCode());
+            this.updateById(msgLog);
+        }
+    }
+
 }

+ 18 - 7
src/main/java/com/usky/dxtop/service/listener/ChargeMqListener.java

@@ -2,9 +2,12 @@ package com.usky.dxtop.service.listener;
 
 import com.alibaba.fastjson.JSONObject;
 import com.usky.dxtop.common.exception.CustomException;
+import com.usky.dxtop.model.MsgLog;
 import com.usky.dxtop.model.Order;
+import com.usky.dxtop.service.MsgLogService;
 import com.usky.dxtop.service.OrderService;
 import com.usky.dxtop.service.config.rabbitmq.charge.ChargeProduceConfig;
+import com.usky.dxtop.service.emun.MsgLogStatus;
 import com.usky.dxtop.service.emun.OrderStatus;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
@@ -25,6 +28,8 @@ public class ChargeMqListener {
 
     @Autowired
     private OrderService orderService;
+    @Autowired
+    private MsgLogService msgLogService;
     @Transactional(rollbackFor = Exception.class)
     @RabbitListener(queues = ChargeProduceConfig.NAME, containerFactory = ChargeProduceConfig.LISTENER)
     public void dealDeclareMessage(Message message) {
@@ -35,14 +40,20 @@ public class ChargeMqListener {
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
             log.info("charge_trans_produce:接受到的消息"+s);
             JSONObject jsonObject = JSONObject.parseObject(s);
-            Order order = new Order();
-            order.setId(Long.parseLong(jsonObject.get("seq").toString()));
-            if ("1".equals(jsonObject.get("code"))) {
-                order.setOrderFlag(OrderStatus.COMPLETE.getCode());
-            }else {
-                order.setOrderFlag(OrderStatus.PAYMENT_ERROR_DEBIT.getCode());
+            Long msgId = Long.parseLong(jsonObject.get("seq").toString());
+            MsgLog msgLog = msgLogService.getById(msgId);
+            if (msgLogService.isRepetition(msgLog)){
+                Order order = new Order();
+                order.setId(msgLog.getBusinessId());
+                if ("1".equals(jsonObject.get("code"))) {
+                    order.setOrderFlag(OrderStatus.COMPLETE.getCode());
+                }else {
+                    order.setOrderFlag(OrderStatus.PAYMENT_ERROR_DEBIT.getCode());
+                }
+                orderService.updateById(order);
+                msgLog.setMsgFlag(MsgLogStatus.CONSUMER_SUCCESS.getCode());
+                msgLogService.updateById(msgLog);
             }
-            orderService.updateById(order);
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
             throw new CustomException(e.getMessage());

+ 2 - 1
src/main/java/com/usky/dxtop/service/listener/ChargeTransMqListener.java

@@ -31,7 +31,8 @@ public class ChargeTransMqListener {
             log.info(routingKey);
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
             log.info("charge_trans_produce:接受到的消息"+s);
-            chargeService.saveOrUpdate(JSONObject.parseObject(s, Charge.class));
+            Charge charge = JSONObject.parseObject(s, Charge.class);
+            chargeService.saveOrUpdate(charge);
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {

+ 0 - 2
src/main/java/com/usky/dxtop/service/listener/DishTransMqListener.java

@@ -22,8 +22,6 @@ public class DishTransMqListener {
 
     @Autowired
     private DishService dishService;
-
-
     @RabbitListener(queues = DishTransConfig.NAME, containerFactory = DishTransConfig.LISTENER)
     public void dealDeclareMessage(Message message) {
         try {

+ 8 - 1
src/main/java/com/usky/dxtop/service/listener/FaceMqListener.java

@@ -1,7 +1,10 @@
 package com.usky.dxtop.service.listener;
 
+import com.alibaba.fastjson.JSONObject;
+import com.usky.dxtop.service.MsgLogService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.nio.charset.StandardCharsets;
@@ -14,6 +17,8 @@ import java.nio.charset.StandardCharsets;
 @Component
 public class FaceMqListener {
 
+    @Autowired
+    private MsgLogService msgLogService;
 //    @RabbitListener(queues = FaceProduceConfig.QUEUE, containerFactory = FaceProduceConfig.CONNECTION)
     public void dealDeclareMessage(Message message) {
         try {
@@ -22,7 +27,9 @@ public class FaceMqListener {
             log.info(routingKey);
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
             log.info("charge_trans_produce:接受到的消息"+s);
-            //消息必答机制
+            JSONObject jsonObject = JSONObject.parseObject(s);
+            Long msgId = Long.parseLong(jsonObject.get("seq").toString());
+            msgLogService.consumerSuccess(msgId);
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {

+ 8 - 1
src/main/java/com/usky/dxtop/service/listener/GroupMqListener.java

@@ -1,7 +1,10 @@
 package com.usky.dxtop.service.listener;
 
+import com.alibaba.fastjson.JSONObject;
+import com.usky.dxtop.service.MsgLogService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.nio.charset.StandardCharsets;
@@ -14,6 +17,8 @@ import java.nio.charset.StandardCharsets;
 @Component
 public class GroupMqListener {
 
+    @Autowired
+    private MsgLogService msgLogService;
 //    @RabbitListener(queues = GroupProduceConfig.QUEUE, containerFactory = GroupProduceConfig.CONNECTION)
     public void dealDeclareMessage(Message message) {
         try {
@@ -22,7 +27,9 @@ public class GroupMqListener {
             log.info(routingKey);
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
             log.info("charge_trans_produce:接受到的消息"+s);
-            //消息必答机制
+            JSONObject jsonObject = JSONObject.parseObject(s);
+            Long msgId = Long.parseLong(jsonObject.get("seq").toString());
+            msgLogService.consumerSuccess(msgId);
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {

+ 8 - 1
src/main/java/com/usky/dxtop/service/listener/PersonMqListener.java

@@ -1,7 +1,10 @@
 package com.usky.dxtop.service.listener;
 
+import com.alibaba.fastjson.JSONObject;
+import com.usky.dxtop.service.MsgLogService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.nio.charset.StandardCharsets;
@@ -13,6 +16,8 @@ import java.nio.charset.StandardCharsets;
 @Slf4j
 @Component
 public class PersonMqListener {
+    @Autowired
+    private MsgLogService msgLogService;
 
 //    @RabbitListener(queues = ProfileProduceConfig.QUEUE, containerFactory = ProfileProduceConfig.CONNECTION)
     public void dealDeclareMessage(Message message) {
@@ -22,7 +27,9 @@ public class PersonMqListener {
             log.info(routingKey);
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
             log.info("charge_trans_produce:接受到的消息"+s);
-            //消息必答机制
+            JSONObject jsonObject = JSONObject.parseObject(s);
+            Long msgId = Long.parseLong(jsonObject.get("seq").toString());
+            msgLogService.consumerSuccess(msgId);
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {