Explorar o código

全部整合完成

yq %!s(int64=3) %!d(string=hai) anos
pai
achega
0f79729531

+ 6 - 1
src/main/java/com/usky/dxtop/service/MsgLogService.java

@@ -3,6 +3,8 @@ package com.usky.dxtop.service;
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.usky.dxtop.model.MsgLog;
 
+import java.util.function.Consumer;
+
 /**
  * <p>
  *  服务类
@@ -19,7 +21,7 @@ public interface MsgLogService extends IService<MsgLog> {
     Boolean isRepetition(MsgLog msgLog);
 
 
-    void consumerSuccess(String consequence);
+    void consumerSuccess(String consequence, Consumer<MsgLog> consumer);
 
     /**
      * 判断是否同步成功
@@ -27,4 +29,7 @@ public interface MsgLogService extends IService<MsgLog> {
      * @return
      */
     boolean isSuccess(Long businessId);
+
+
+    void addOrUpdate(MsgLog msgLog);
 }

+ 25 - 5
src/main/java/com/usky/dxtop/service/impl/MsgLogServiceImpl.java

@@ -12,6 +12,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import java.util.Date;
+import java.util.function.Consumer;
 
 /**
  * <p>
@@ -31,27 +32,33 @@ public class MsgLogServiceImpl extends ServiceImpl<MsgLogMapper, MsgLog> impleme
     }
 
     @Override
-    public void consumerSuccess(String consequence) {
+    public void consumerSuccess(String consequence, Consumer<MsgLog> consumer) {
         JSONObject jsonObject = JSONObject.parseObject(consequence);
         Long msgId = Long.parseLong(jsonObject.get("seq").toString());
         MsgLog msgLog = this.getById(msgId);
         if (this.isRepetition(msgLog)){
-            if ("1".equals(jsonObject.get("code").toString())){
-                msgLog.setIsSuccess(true);
+            boolean isSuccess = "1".equals(jsonObject.get("code").toString());
+            //第一次同步
+            if (null == msgLog.getIsSuccess()){
+                msgLog.setIsSuccess(isSuccess);
             }else {
-                msgLog.setIsSuccess(false);
+                //如果以前成功过不做修改
+                if (!msgLog.getIsSuccess()){
+                    msgLog.setIsSuccess(isSuccess);
+                }
             }
             msgLog.setUpdateTime(new Date());
             msgLog.setConsequence(consequence);
             msgLog.setMsgFlag(MsgLogStatus.CONSUMER_SUCCESS.getCode());
             this.updateById(msgLog);
+            consumer.accept(msgLog);
         }
     }
 
     @Override
     public boolean isSuccess(Long businessId) {
         LambdaQueryWrapper<MsgLog> queryWrapper = Wrappers.lambdaQuery();
-        queryWrapper.eq(MsgLog::getMsgFlag,MsgLogStatus.CONSUMER_SUCCESS.getCode())
+        queryWrapper
                 .eq(MsgLog::getIsSuccess,true)
                 .eq(MsgLog::getBusinessId,businessId);
         int count = this.count(queryWrapper);
@@ -62,4 +69,17 @@ public class MsgLogServiceImpl extends ServiceImpl<MsgLogMapper, MsgLog> impleme
         }
     }
 
+    @Override
+    public void addOrUpdate(MsgLog msgLog) {
+        LambdaQueryWrapper<MsgLog> queryWrapper = Wrappers.lambdaQuery();
+        queryWrapper.eq(MsgLog::getBusinessId,msgLog.getBusinessId());
+        MsgLog msg = this.getOne(queryWrapper);
+        if (null == msg){
+            this.save(msgLog);
+        }else {
+            msgLog.setId(msg.getId());
+            this.updateById(msgLog);
+        }
+    }
+
 }

+ 1 - 1
src/main/java/com/usky/dxtop/service/impl/OrderServiceImpl.java

@@ -201,7 +201,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
             msgLog.setExchange(ChargeConsumeConfig.NAME);
             msgLog.setRoutingKey(ChargeConsumeConfig.NAME);
             msgLog.setMsg(JSON.toJSONString(chargeVo));
-            msgLogService.save(msgLog);
+            msgLogService.addOrUpdate(msgLog);
             chargeVo.setSeq(msgLog.getId());
             CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
             rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), chargeVo,correlationData);

+ 3 - 3
src/main/java/com/usky/dxtop/service/impl/StaffServiceImpl.java

@@ -79,7 +79,7 @@ public class StaffServiceImpl extends ServiceImpl<StaffMapper, Staff> implements
         Map<String,Object> map = new HashMap<>();
         map.put("card",card);
         msgLog.setMsg(JSON.toJSONString(map));
-        msgLogService.save(msgLog);
+        msgLogService.addOrUpdate(msgLog);
         map.put("seq",msgLog.getId());
         CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
         rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), map,correlationData);
@@ -115,7 +115,7 @@ public class StaffServiceImpl extends ServiceImpl<StaffMapper, Staff> implements
         msgLog.setExchange(ProfileConsumeConfig.NAME);
         msgLog.setRoutingKey(ProfileConsumeConfig.NAME);
         msgLog.setMsg(JSON.toJSONString(map));
-        msgLogService.save(msgLog);
+        msgLogService.addOrUpdate(msgLog);
         map.put("seq",msgLog.getId());
         CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
         profileRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), map,correlationData);
@@ -137,7 +137,7 @@ public class StaffServiceImpl extends ServiceImpl<StaffMapper, Staff> implements
         msgLog.setRoutingKey(FaceConsumeConfig.NAME);
         map.put("img",staff.getFaceId());
         msgLog.setMsg(JSON.toJSONString(map));
-        msgLogService.save(msgLog);
+        msgLogService.addOrUpdate(msgLog);
         map.put("seq",msgLog.getId());
         map.put("img",encode);
         CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());

+ 1 - 1
src/main/java/com/usky/dxtop/service/listener/CartGetMqListener.java

@@ -29,7 +29,7 @@ public class CartGetMqListener {
         try {
             //消息类型
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
-            msgLogService.consumerSuccess(s);
+            msgLogService.consumerSuccess(s,msgLog -> log.info("处理完成"));
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
             throw new CustomException(e.getMessage());

+ 3 - 21
src/main/java/com/usky/dxtop/service/listener/ChargeMqListener.java

@@ -1,15 +1,12 @@
 package com.usky.dxtop.service.listener;
 
-import com.alibaba.fastjson.JSONObject;
 import com.usky.dxtop.common.exception.CustomException;
 import com.usky.dxtop.common.utils.StringUtils;
-import com.usky.dxtop.model.MsgLog;
 import com.usky.dxtop.model.Order;
 import com.usky.dxtop.service.MsgLogService;
 import com.usky.dxtop.service.OrderService;
 import com.usky.dxtop.service.config.rabbitmq.charge.ChargeProduceConfig;
 import com.usky.dxtop.service.constant.MsgLogBusinessCode;
-import com.usky.dxtop.service.emun.MsgLogStatus;
 import com.usky.dxtop.service.emun.OrderStatus;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
@@ -19,7 +16,6 @@ import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Date;
 
 /**
  * @author yq
@@ -37,31 +33,17 @@ public class ChargeMqListener {
     @RabbitListener(queues = ChargeProduceConfig.NAME, containerFactory = ChargeProduceConfig.LISTENER)
     public void dealDeclareMessage(Message message) {
         try {
-            //消息类型
-            String routingKey = message.getMessageProperties().getReceivedRoutingKey();
-            log.info(routingKey);
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
-            log.info("charge_trans_produce:接受到的消息"+s);
-            JSONObject jsonObject = JSONObject.parseObject(s);
-            Long msgId = Long.parseLong(jsonObject.get("seq").toString());
-            MsgLog msgLog = msgLogService.getById(msgId);
-            if (msgLogService.isRepetition(msgLog)){
+            msgLogService.consumerSuccess(s,msgLog -> {
                 Order order = new Order();
                 String orderId = StringUtils.remove(msgLog.getBusinessId(), MsgLogBusinessCode.ORDER);
                 order.setId(Long.parseLong(orderId));
-                if ("1".equals(jsonObject.get("code").toString())) {
+                if (msgLog.getIsSuccess()){
                     order.setOrderFlag(OrderStatus.COMPLETE.getCode());
-                    msgLog.setIsSuccess(true);
                 }else {
                     order.setOrderFlag(OrderStatus.PAYMENT_ERROR_DEBIT.getCode());
-                    msgLog.setIsSuccess(false);
                 }
-                orderService.updateById(order);
-                msgLog.setUpdateTime(new Date());
-                msgLog.setConsequence(s);
-                msgLog.setMsgFlag(MsgLogStatus.CONSUMER_SUCCESS.getCode());
-                msgLogService.updateById(msgLog);
-            }
+            });
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
             throw new CustomException(e.getMessage());

+ 1 - 1
src/main/java/com/usky/dxtop/service/listener/FaceMqListener.java

@@ -24,7 +24,7 @@ public class FaceMqListener {
     public void dealDeclareMessage(Message message) {
         try {
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
-            msgLogService.consumerSuccess(s);
+            msgLogService.consumerSuccess(s,msgLog -> log.info("处理完成"));
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {

+ 1 - 1
src/main/java/com/usky/dxtop/service/listener/GroupMqListener.java

@@ -24,7 +24,7 @@ public class GroupMqListener {
     public void dealDeclareMessage(Message message) {
         try {
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
-            msgLogService.consumerSuccess(s);
+            msgLogService.consumerSuccess(s,msgLog -> log.info("处理完成"));
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {

+ 1 - 1
src/main/java/com/usky/dxtop/service/listener/PersonMqListener.java

@@ -24,7 +24,7 @@ public class PersonMqListener {
     public void dealDeclareMessage(Message message) {
         try {
             String s = new String(message.getBody(), StandardCharsets.UTF_8);
-            msgLogService.consumerSuccess(s);
+            msgLogService.consumerSuccess(s,msgLog -> log.info("处理完成"));
         } catch (Exception e) {
             log.info("charge_trans_produce"+"异常信息:" + e.getMessage());
         } finally {

+ 3 - 39
src/test/java/com/usky/dxtop/SmApiTest.java

@@ -1,16 +1,9 @@
 package com.usky.dxtop;
 
-import com.usky.dxtop.service.config.rabbitmq.ChargeTransConfig;
-import com.usky.dxtop.service.config.rabbitmq.DishTransConfig;
-import com.usky.dxtop.service.config.rabbitmq.charge.ChargeProduceConfig;
-import com.usky.dxtop.service.config.rabbitmq.face.FaceProduceConfig;
-import com.usky.dxtop.service.config.rabbitmq.group.GroupProduceConfig;
-import com.usky.dxtop.service.config.rabbitmq.profile.ProfileProduceConfig;
+import com.usky.dxtop.service.MsgLogService;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
@@ -24,38 +17,9 @@ public class SmApiTest {
 
 
     @Autowired
-    @Qualifier(ChargeProduceConfig.TEMPLATE)
-    private RabbitTemplate rabbitTemplate1;
-
-    @Autowired
-    @Qualifier(ChargeTransConfig.TEMPLATE)
-    private RabbitTemplate rabbitTemplate2;
-
-
-    @Autowired
-    @Qualifier(DishTransConfig.TEMPLATE)
-    private RabbitTemplate rabbitTemplate3;
-
-    @Autowired
-    @Qualifier(FaceProduceConfig.TEMPLATE)
-    private RabbitTemplate rabbitTemplate4;
-
-    @Autowired
-    @Qualifier(GroupProduceConfig.TEMPLATE)
-    private RabbitTemplate rabbitTemplate5;
-
-    @Autowired
-    @Qualifier(ProfileProduceConfig.TEMPLATE)
-    private RabbitTemplate rabbitTemplate6;
-
+    private MsgLogService msgLogService;
     @Test
-    public void test(){
-        rabbitTemplate1.convertAndSend(ChargeProduceConfig.NAME, ChargeProduceConfig.NAME, 11);
-        rabbitTemplate2.convertAndSend(ChargeTransConfig.NAME, ChargeProduceConfig.NAME, 11);
-        rabbitTemplate3.convertAndSend(DishTransConfig.NAME, ChargeProduceConfig.NAME, 11);
-        rabbitTemplate4.convertAndSend(FaceProduceConfig.NAME, ChargeProduceConfig.NAME, 11);
-        rabbitTemplate5.convertAndSend(GroupProduceConfig.NAME, ChargeProduceConfig.NAME, 11);
-        rabbitTemplate6.convertAndSend(ProfileProduceConfig.NAME, ChargeProduceConfig.NAME, 11);
+    public void test1(){
 
     }
 }