Pārlūkot izejas kodu

修改同步用户+部门,排菜+消费rabbit

he.dujuan 3 gadi atpakaļ
vecāks
revīzija
072402d2c0
1 mainītis faili ar 128 papildinājumiem un 0 dzēšanām
  1. 128 0
      eladmin-system/src/test/java/me/zhengjie/TestRabbit.java

+ 128 - 0
eladmin-system/src/test/java/me/zhengjie/TestRabbit.java

@@ -0,0 +1,128 @@
+package me.zhengjie;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.rabbitmq.client.*;
+import lombok.SneakyThrows;
+import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
+import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
+import me.zhengjie.modules.dm.food.service.DmFoodService;
+import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
+import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
+import me.zhengjie.utils.ConnectionUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class TestRabbit {
+    DmFoodService dmFoodService;
+    DmDayPcService dmDayPcService;
+    DmOrderRecordService dmOrderRecordService;
+
+    @Test
+    public void FoodPj(){
+        //获取连接
+        Connection connection = ConnectionUtil.getConnection();
+        try {
+            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
+            String userString = JSON.toJSONString(dmDayPc);
+            Channel channel = connection.createChannel();
+            String exchangeName = "schedule_consume";//交换机
+            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+            String queue1Name = "schedule_consume";//队列名称
+            channel.queueDeclare(queue1Name, true, false, false, null);
+            channel.queueBind(queue1Name, exchangeName, "schedule_consume");// 队列绑定hello路由
+            channel.basicPublish(exchangeName, "schedule_consume", null, userString.getBytes());
+            System.out.println("sned排菜:"+dmDayPc);
+            channel.close();
+            connection.close();
+        } catch (IOException | TimeoutException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void receive(){
+        Connection connection = ConnectionUtil.getConnection();
+        try {
+            Channel channel = connection.createChannel();
+            //通过consumer来处理数据
+            Consumer consumer = new DefaultConsumer(channel){
+                @SneakyThrows
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    //body就是从队列中获取的数据
+                    String str = new String(body);
+                    List<DmDayPc>  list = JSONArray.parseArray(str,DmDayPc.class);
+                    for (DmDayPc dmDayPc : list) {
+                        dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
+                        dmDayPcService.create(dmDayPc);
+                        System.out.println("新增food==="+dmDayPc.getFood());
+                        dmFoodService.create(dmDayPc.getFood());
+                    }
+                }
+            };
+            //参数1:接收哪个队列的数据
+            //参数2:消息确认 是否应答,收到消息是否回复
+            //参数3:
+            channel.basicConsume("schedule_produce",true,consumer);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void send(){
+        //获取连接
+        Connection connection = ConnectionUtil.getConnection();
+        try {
+            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
+            JSONArray jsonArray = new JSONArray();
+            jsonArray.addAll(map);
+            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
+            String userString2 = JSON.toJSONString(list);
+            Channel channel = connection.createChannel();
+            String exchangeName = "dish_trade_produce";//交换机
+            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+            String queue1Name = "dish_trade_produce";//队列名称
+            channel.queueDeclare(queue1Name, true, false, false, null);
+            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
+            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
+            System.out.println("消费记录:"+userString2);
+
+            channel.close();
+            connection.close();
+        } catch (IOException | TimeoutException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 消费者
+     */
+    public void receive1(){
+        Connection connection = ConnectionUtil.getConnection();
+        try {
+            Channel channel = connection.createChannel();
+            //通过consumer来处理数据
+            Consumer consumer = new DefaultConsumer(channel){
+                @SneakyThrows
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    //body就是从队列中获取的数据
+                    String str = new String(body);
+                    List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str,DmExpenseCalendar.class);
+                    for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
+                        dmOrderRecordService.createOrderItem(dmExpenseCalendar);
+                        dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
+                    }
+                }
+            };
+            channel.basicConsume("dish_trade_produce",true,consumer);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}