浏览代码

整合rabbitmq完成

yq 3 年之前
父节点
当前提交
f855d1b04b

+ 3 - 0
src/main/java/com/usky/dxtop/model/Order.java

@@ -102,4 +102,7 @@ public class Order extends BaseEntity implements Serializable {
     @TableField(exist = false)
     @TableField(exist = false)
     private String openId;
     private String openId;
 
 
+    @TableField(exist = false)
+    private String card;
+
 }
 }

+ 0 - 7
src/main/java/com/usky/dxtop/service/MsgLogService.java

@@ -2,7 +2,6 @@ package com.usky.dxtop.service;
 
 
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.usky.dxtop.model.MsgLog;
 import com.usky.dxtop.model.MsgLog;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
 
 /**
 /**
  * <p>
  * <p>
@@ -13,12 +12,6 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
  * @since 2021-10-13
  * @since 2021-10-13
  */
  */
 public interface MsgLogService extends IService<MsgLog> {
 public interface MsgLogService extends IService<MsgLog> {
-    /**
-     * 发送消息
-     * @param msgLog
-     */
-    MsgLog sendMessage(MsgLog msgLog, RabbitTemplate rabbitTemplate);
-
 
 
     /**
     /**
      * 判断是否重复消费
      * 判断是否重复消费

+ 4 - 0
src/main/java/com/usky/dxtop/service/config/rabbitmq/group/GroupProduceConfig.java

@@ -1,6 +1,7 @@
 package com.usky.dxtop.service.config.rabbitmq.group;
 package com.usky.dxtop.service.config.rabbitmq.group;
 
 
 import com.usky.dxtop.service.config.rabbitmq.RabbitmqConfig;
 import com.usky.dxtop.service.config.rabbitmq.RabbitmqConfig;
+import lombok.Data;
 import org.springframework.amqp.core.*;
 import org.springframework.amqp.core.*;
 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -10,11 +11,14 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 
 
 /**
 /**
  * @author yq
  * @author yq
  * @date 2021/9/15 13:16
  * @date 2021/9/15 13:16
  */
  */
+@Configuration
+@Data
 public class GroupProduceConfig {
 public class GroupProduceConfig {
 
 
     @Autowired
     @Autowired

+ 0 - 10
src/main/java/com/usky/dxtop/service/impl/MsgLogServiceImpl.java

@@ -22,16 +22,6 @@ import org.springframework.stereotype.Service;
 @Service
 @Service
 public class MsgLogServiceImpl extends ServiceImpl<MsgLogMapper, MsgLog> implements MsgLogService {
 public class MsgLogServiceImpl extends ServiceImpl<MsgLogMapper, MsgLog> implements MsgLogService {
 
 
-    @Override
-    public MsgLog sendMessage(MsgLog msgLog, RabbitTemplate rabbitTemplate) {
-        //入库
-        this.save(msgLog);
-        //发送消息
-        CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
-        rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),correlationData);
-        return msgLog;
-    }
-
     @Override
     @Override
     public Boolean isRepetition(MsgLog msgLog) {
     public Boolean isRepetition(MsgLog msgLog) {
         return !(null == msgLog || msgLog.getMsgFlag().equals(MsgLogStatus.CONSUMER_SUCCESS.getCode()));
         return !(null == msgLog || msgLog.getMsgFlag().equals(MsgLogStatus.CONSUMER_SUCCESS.getCode()));

+ 15 - 4
src/main/java/com/usky/dxtop/service/impl/OrderServiceImpl.java

@@ -32,6 +32,7 @@ import ma.glasnost.orika.MapperFacade;
 import ma.glasnost.orika.MapperFactory;
 import ma.glasnost.orika.MapperFactory;
 import ma.glasnost.orika.impl.DefaultMapperFactory;
 import ma.glasnost.orika.impl.DefaultMapperFactory;
 import org.apache.poi.ss.usermodel.Workbook;
 import org.apache.poi.ss.usermodel.Workbook;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -82,6 +83,9 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
     @Autowired
     @Autowired
     private MsgLogService msgLogService;
     private MsgLogService msgLogService;
 
 
+    @Autowired
+    private StaffService staffService;
+
     @Transactional(rollbackFor = Exception.class)
     @Transactional(rollbackFor = Exception.class)
     @Override
     @Override
     public boolean add(Order order) {
     public boolean add(Order order) {
@@ -202,15 +206,18 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
                         order.setMoney(BigDecimal.valueOf(mul));
                         order.setMoney(BigDecimal.valueOf(mul));
                     });
                     });
             chargeVo.setAmt(order.getMoney());
             chargeVo.setAmt(order.getMoney());
-            chargeVo.setCard("h");
+            chargeVo.setCard(order.getCard());
             chargeVo.setMob(order.getOrderNumber());
             chargeVo.setMob(order.getOrderNumber());
-            chargeVo.setName("test");
             MsgLog msgLog = new MsgLog();
             MsgLog msgLog = new MsgLog();
             msgLog.setBusinessId(order.getId());
             msgLog.setBusinessId(order.getId());
             msgLog.setExchange(ChargeConsumeConfig.NAME);
             msgLog.setExchange(ChargeConsumeConfig.NAME);
             msgLog.setRoutingKey(ChargeConsumeConfig.NAME);
             msgLog.setRoutingKey(ChargeConsumeConfig.NAME);
+            msgLogService.save(msgLog);
+            chargeVo.setSeq(msgLog.getId());
             msgLog.setMsg(JSON.toJSONString(chargeVo));
             msgLog.setMsg(JSON.toJSONString(chargeVo));
-            msgLogService.sendMessage(msgLog,rabbitTemplate);
+            msgLogService.updateById(msgLog);
+            CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
+            rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),correlationData);
         }catch (Exception e){
         }catch (Exception e){
             b = false;
             b = false;
             log.error("---order---卡充值消息发送异常"+ e.getMessage());
             log.error("---order---卡充值消息发送异常"+ e.getMessage());
@@ -568,7 +575,11 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements
                     order.setTopRadio(tr.getProportion());
                     order.setTopRadio(tr.getProportion());
                 });
                 });
             });
             });
-
+            //添加卡号
+            Staff staff = staffService.getById(order.getUserId());
+            Optional.ofNullable(staff).orElseThrow(() -> new CustomException("卡号不能为空"));
+            Optional.ofNullable(staff.getCard()).orElseThrow(() -> new CustomException("卡号不能为空"));
+            order.setCard(staff.getCard());
         }else {
         }else {
             Assert.check(null != order.getChannelId() && 0 != order.getChannelId(),"渠道编号不能为空");
             Assert.check(null != order.getChannelId() && 0 != order.getChannelId(),"渠道编号不能为空");
             TopChannel topChannel = topChannelService.getById(order.getChannelId());
             TopChannel topChannel = topChannelService.getById(order.getChannelId());

+ 17 - 6
src/main/java/com/usky/dxtop/service/job/SmJob.java

@@ -18,6 +18,7 @@ import com.usky.dxtop.service.config.rabbitmq.group.GroupConsumeConfig;
 import com.usky.dxtop.service.config.rabbitmq.profile.ProfileConsumeConfig;
 import com.usky.dxtop.service.config.rabbitmq.profile.ProfileConsumeConfig;
 import lombok.Data;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -56,6 +57,7 @@ public class SmJob {
     @Qualifier(GroupConsumeConfig.TEMPLATE)
     @Qualifier(GroupConsumeConfig.TEMPLATE)
     private RabbitTemplate groupRabbitTemplate;
     private RabbitTemplate groupRabbitTemplate;
 
 
+
     @Autowired
     @Autowired
     private MsgLogService msgLogService;
     private MsgLogService msgLogService;
 
 
@@ -166,7 +168,6 @@ public class SmJob {
     private void personSendMessage(Staff staff,Integer type){
     private void personSendMessage(Staff staff,Integer type){
         Map<String,Object> map = new HashMap<>();
         Map<String,Object> map = new HashMap<>();
         map.put("type",type);
         map.put("type",type);
-        map.put("seq",staff.getSId());
         map.put("card_type",1);
         map.put("card_type",1);
         map.put("card",staff.getCard());
         map.put("card",staff.getCard());
         map.put("name",staff.getName());
         map.put("name",staff.getName());
@@ -178,8 +179,12 @@ public class SmJob {
         msgLog.setBusinessId(staff.getSId());
         msgLog.setBusinessId(staff.getSId());
         msgLog.setExchange(ProfileConsumeConfig.NAME);
         msgLog.setExchange(ProfileConsumeConfig.NAME);
         msgLog.setRoutingKey(ProfileConsumeConfig.NAME);
         msgLog.setRoutingKey(ProfileConsumeConfig.NAME);
+        msgLogService.save(msgLog);
+        map.put("seq",msgLog.getId());
         msgLog.setMsg(JSON.toJSONString(map));
         msgLog.setMsg(JSON.toJSONString(map));
-        msgLogService.sendMessage(msgLog,profileRabbitTemplate);
+        msgLogService.updateById(msgLog);
+        CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
+        profileRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),correlationData);
     }
     }
 
 
     /**
     /**
@@ -189,15 +194,18 @@ public class SmJob {
     private void deptSendMessage(Dept dept){
     private void deptSendMessage(Dept dept){
         Map<String,Object> map = new HashMap<>();
         Map<String,Object> map = new HashMap<>();
         map.put("parentid",dept.getPid());
         map.put("parentid",dept.getPid());
-        map.put("seq",dept.getDId());
         map.put("id",dept.getId());
         map.put("id",dept.getId());
         map.put("name",dept.getName());
         map.put("name",dept.getName());
         MsgLog msgLog = new MsgLog();
         MsgLog msgLog = new MsgLog();
         msgLog.setBusinessId(dept.getDId());
         msgLog.setBusinessId(dept.getDId());
         msgLog.setExchange(GroupConsumeConfig.NAME);
         msgLog.setExchange(GroupConsumeConfig.NAME);
         msgLog.setRoutingKey(GroupConsumeConfig.NAME);
         msgLog.setRoutingKey(GroupConsumeConfig.NAME);
+        msgLogService.save(msgLog);
+        map.put("seq",msgLog.getId());
         msgLog.setMsg(JSON.toJSONString(map));
         msgLog.setMsg(JSON.toJSONString(map));
-        msgLogService.sendMessage(msgLog,groupRabbitTemplate);
+        msgLogService.updateById(msgLog);
+        CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
+        groupRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),correlationData);
     }
     }
 
 
     /**
     /**
@@ -206,7 +214,6 @@ public class SmJob {
      */
      */
     private void faceSendMessage(Staff staff){
     private void faceSendMessage(Staff staff){
         Map<String,Object> map = new HashMap<>();
         Map<String,Object> map = new HashMap<>();
-        map.put("seq",staff.getSId());
         map.put("mob",staff.getId());
         map.put("mob",staff.getId());
         map.put("image",staff.getFace());
         map.put("image",staff.getFace());
         faceRabbitTemplate.convertAndSend(FaceConsumeConfig.NAME, FaceConsumeConfig.NAME, map);
         faceRabbitTemplate.convertAndSend(FaceConsumeConfig.NAME, FaceConsumeConfig.NAME, map);
@@ -214,8 +221,12 @@ public class SmJob {
         msgLog.setBusinessId(staff.getSId());
         msgLog.setBusinessId(staff.getSId());
         msgLog.setExchange(FaceConsumeConfig.NAME);
         msgLog.setExchange(FaceConsumeConfig.NAME);
         msgLog.setRoutingKey(FaceConsumeConfig.NAME);
         msgLog.setRoutingKey(FaceConsumeConfig.NAME);
+        msgLogService.save(msgLog);
+        map.put("seq",msgLog.getId());
         msgLog.setMsg(JSON.toJSONString(map));
         msgLog.setMsg(JSON.toJSONString(map));
-        msgLogService.sendMessage(msgLog,faceRabbitTemplate);
+        msgLogService.updateById(msgLog);
+        CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
+        faceRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),correlationData);
     }
     }
 
 
     @Data
     @Data

+ 3 - 1
src/main/java/com/usky/dxtop/service/listener/FaceMqListener.java

@@ -2,8 +2,10 @@ package com.usky.dxtop.service.listener;
 
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.usky.dxtop.service.MsgLogService;
 import com.usky.dxtop.service.MsgLogService;
+import com.usky.dxtop.service.config.rabbitmq.face.FaceProduceConfig;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
@@ -19,7 +21,7 @@ public class FaceMqListener {
 
 
     @Autowired
     @Autowired
     private MsgLogService msgLogService;
     private MsgLogService msgLogService;
-//    @RabbitListener(queues = FaceProduceConfig.QUEUE, containerFactory = FaceProduceConfig.CONNECTION)
+    @RabbitListener(queues = FaceProduceConfig.NAME, containerFactory = FaceProduceConfig.LISTENER)
     public void dealDeclareMessage(Message message) {
     public void dealDeclareMessage(Message message) {
         try {
         try {
             //消息类型
             //消息类型

+ 3 - 1
src/main/java/com/usky/dxtop/service/listener/GroupMqListener.java

@@ -2,8 +2,10 @@ package com.usky.dxtop.service.listener;
 
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.usky.dxtop.service.MsgLogService;
 import com.usky.dxtop.service.MsgLogService;
+import com.usky.dxtop.service.config.rabbitmq.group.GroupProduceConfig;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
@@ -19,7 +21,7 @@ public class GroupMqListener {
 
 
     @Autowired
     @Autowired
     private MsgLogService msgLogService;
     private MsgLogService msgLogService;
-//    @RabbitListener(queues = GroupProduceConfig.QUEUE, containerFactory = GroupProduceConfig.CONNECTION)
+    @RabbitListener(queues = GroupProduceConfig.NAME, containerFactory = GroupProduceConfig.LISTENER)
     public void dealDeclareMessage(Message message) {
     public void dealDeclareMessage(Message message) {
         try {
         try {
             //消息类型
             //消息类型

+ 3 - 1
src/main/java/com/usky/dxtop/service/listener/PersonMqListener.java

@@ -2,8 +2,10 @@ package com.usky.dxtop.service.listener;
 
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.usky.dxtop.service.MsgLogService;
 import com.usky.dxtop.service.MsgLogService;
+import com.usky.dxtop.service.config.rabbitmq.profile.ProfileProduceConfig;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
@@ -19,7 +21,7 @@ public class PersonMqListener {
     @Autowired
     @Autowired
     private MsgLogService msgLogService;
     private MsgLogService msgLogService;
 
 
-//    @RabbitListener(queues = ProfileProduceConfig.QUEUE, containerFactory = ProfileProduceConfig.CONNECTION)
+    @RabbitListener(queues = ProfileProduceConfig.NAME, containerFactory = ProfileProduceConfig.LISTENER)
     public void dealDeclareMessage(Message message) {
     public void dealDeclareMessage(Message message) {
         try {
         try {
             //消息类型
             //消息类型

+ 50 - 7
src/test/java/com/usky/dxtop/DxtopApplicationTests.java

@@ -1,15 +1,20 @@
 package com.usky.dxtop;
 package com.usky.dxtop;
 
 
-import com.usky.dxtop.model.SysFile;
 import com.usky.dxtop.service.SysFileService;
 import com.usky.dxtop.service.SysFileService;
+import com.usky.dxtop.service.config.rabbitmq.charge.ChargeProduceConfig;
+import com.usky.dxtop.service.config.rabbitmq.face.FaceProduceConfig;
+import com.usky.dxtop.service.config.rabbitmq.group.GroupProduceConfig;
+import com.usky.dxtop.service.config.rabbitmq.profile.ProfileProduceConfig;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
 import org.junit.runner.RunWith;
 import org.junit.runner.RunWith;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.context.junit4.SpringRunner;
 
 
-import java.util.Date;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 
 @RunWith(SpringRunner.class)
 @RunWith(SpringRunner.class)
 @SpringBootTest
 @SpringBootTest
@@ -18,11 +23,49 @@ class DxtopApplicationTests {
     @Autowired
     @Autowired
     private SysFileService sysFileService;
     private SysFileService sysFileService;
 
 
+    @Autowired
+    @Qualifier(ProfileProduceConfig.TEMPLATE)
+    private RabbitTemplate profileRabbitTemplate;
+
+
+    @Autowired
+    @Qualifier(FaceProduceConfig.TEMPLATE)
+    private RabbitTemplate faceRabbitTemplate;
+
+    @Autowired
+    @Qualifier(GroupProduceConfig.TEMPLATE)
+    private RabbitTemplate groupRabbitTemplate;
+
+    @Autowired
+    @Qualifier(ChargeProduceConfig.TEMPLATE)
+    private RabbitTemplate chargeRabbitTemplate;
+
     @Test
     @Test
     public void test(){
     public void test(){
-        List<SysFile> fileByExpireAt = sysFileService.getFileByExpireAt(new Date());
-        for (SysFile sysFile:fileByExpireAt) {
-            System.out.println(sysFile);
-        }
+        Map<String,Object> map = new HashMap<>();
+        map.put("seq",1);
+        profileRabbitTemplate.convertAndSend(ProfileProduceConfig.NAME,ProfileProduceConfig.NAME, map);
+    }
+
+    @Test
+    public void test1(){
+        Map<String,Object> map = new HashMap<>();
+        map.put("seq",1);
+        faceRabbitTemplate.convertAndSend(FaceProduceConfig.NAME,FaceProduceConfig.NAME, map);
+    }
+
+    @Test
+    public void test2(){
+        Map<String,Object> map = new HashMap<>();
+        map.put("seq",1);
+        groupRabbitTemplate.convertAndSend(GroupProduceConfig.NAME,GroupProduceConfig.NAME, map);
+    }
+
+
+    @Test
+    public void test3(){
+        Map<String,Object> map = new HashMap<>();
+        map.put("seq",2511);
+        chargeRabbitTemplate.convertAndSend(ChargeProduceConfig.NAME,ChargeProduceConfig.NAME, map);
     }
     }
 }
 }