|
@@ -1,112 +1,122 @@
|
|
|
-package me.zhengjie.modules.quartz.task;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
-import com.rabbitmq.client.*;
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
-import lombok.SneakyThrows;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
|
|
|
-import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
|
|
|
-import me.zhengjie.modules.dm.food.service.DmFoodService;
|
|
|
-import me.zhengjie.utils.ConnectionUtil;
|
|
|
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
-import org.springframework.scheduling.annotation.Scheduled;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.io.ByteArrayOutputStream;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.ObjectOutputStream;
|
|
|
-import java.io.Serializable;
|
|
|
-import java.sql.Timestamp;
|
|
|
-import java.util.List;
|
|
|
-import java.util.concurrent.TimeoutException;
|
|
|
-
|
|
|
-@Slf4j
|
|
|
-@RequiredArgsConstructor
|
|
|
-@Component
|
|
|
-public class DayPcDataTask {
|
|
|
- private final DmFoodService dmFoodService;
|
|
|
- private final DmDayPcService dmDayPcService;
|
|
|
- private static String isEable = "0";
|
|
|
-
|
|
|
- public void runSend1(){
|
|
|
- isEable = "1";
|
|
|
- }
|
|
|
-
|
|
|
- public void runSend2(){
|
|
|
- if(isEable!=null&&isEable=="1"){
|
|
|
- send();
|
|
|
- isEable = "0";
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void runReceive1(){
|
|
|
- isEable = "1";
|
|
|
- }
|
|
|
-
|
|
|
- public void runReceive2(){
|
|
|
- if(isEable!=null&&isEable=="1"){
|
|
|
- receive();
|
|
|
- isEable = "0";
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 生产者
|
|
|
- */
|
|
|
- public void send(){
|
|
|
- //获取连接
|
|
|
- Connection connection = ConnectionUtil.getConnection();
|
|
|
- try {
|
|
|
- List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
|
|
|
- String userString = JSON.toJSONString(dmDayPc);
|
|
|
- Channel channel = connection.createChannel();
|
|
|
- String exchangeName = "schedule_produce";//交换机
|
|
|
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
|
|
|
- String queue1Name = "schedule_produce";//队列名称
|
|
|
- channel.queueDeclare(queue1Name, true, false, false, null);
|
|
|
- channel.queueBind(queue1Name, exchangeName, "schedule_produce");// 队列绑定hello路由
|
|
|
- channel.basicPublish(exchangeName, "schedule_produce", null, userString.getBytes());
|
|
|
- System.out.println("sned排菜:"+dmDayPc);
|
|
|
- channel.close();
|
|
|
- connection.close();
|
|
|
- } catch (IOException | TimeoutException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 消费者
|
|
|
- */
|
|
|
- public void receive(){
|
|
|
- Connection connection = ConnectionUtil.getConnection();
|
|
|
- try {
|
|
|
- Channel channel = connection.createChannel();
|
|
|
- //通过consumer来处理数据
|
|
|
- Consumer consumer = new DefaultConsumer(channel){
|
|
|
- @SneakyThrows
|
|
|
- @Override
|
|
|
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
|
|
|
- //body就是从队列中获取的数据
|
|
|
- String str = new String(body);
|
|
|
- List<DmDayPc> list = JSONArray.parseArray(str,DmDayPc.class);
|
|
|
- for (DmDayPc dmDayPc : list) {
|
|
|
- dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
|
|
|
- dmDayPcService.create(dmDayPc);
|
|
|
- System.out.println("新增food==="+dmDayPc.getFood());
|
|
|
- dmFoodService.create(dmDayPc.getFood());
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
- //参数1:接收哪个队列的数据
|
|
|
- //参数2:消息确认 是否应答,收到消息是否回复
|
|
|
- //参数3:
|
|
|
- channel.basicConsume("schedule_produce",true,consumer);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
+//package me.zhengjie.modules.quartz.task;
|
|
|
+//
|
|
|
+//import com.alibaba.fastjson.JSON;
|
|
|
+//import com.alibaba.fastjson.JSONArray;
|
|
|
+//import com.rabbitmq.client.*;
|
|
|
+//import lombok.RequiredArgsConstructor;
|
|
|
+//import lombok.SneakyThrows;
|
|
|
+//import lombok.extern.slf4j.Slf4j;
|
|
|
+//import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
|
|
|
+//import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
|
|
|
+//import me.zhengjie.modules.dm.food.service.DmFoodService;
|
|
|
+//import me.zhengjie.utils.ConnectionUtil;
|
|
|
+//import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+//import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+//import org.springframework.scheduling.annotation.Async;
|
|
|
+//import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+//import org.springframework.stereotype.Component;
|
|
|
+//
|
|
|
+//import java.io.ByteArrayOutputStream;
|
|
|
+//import java.io.IOException;
|
|
|
+//import java.io.ObjectOutputStream;
|
|
|
+//import java.io.Serializable;
|
|
|
+//import java.sql.Timestamp;
|
|
|
+//import java.util.List;
|
|
|
+//import java.util.concurrent.TimeoutException;
|
|
|
+//
|
|
|
+//@Slf4j
|
|
|
+//@RequiredArgsConstructor
|
|
|
+//@Component
|
|
|
+//public class DayPcDataTask {
|
|
|
+// private final DmFoodService dmFoodService;
|
|
|
+// private final DmDayPcService dmDayPcService;
|
|
|
+// private static String isEable = "0";
|
|
|
+////
|
|
|
+//// public void runSend1(){
|
|
|
+//// isEable = "1";
|
|
|
+//// }
|
|
|
+////
|
|
|
+//// public void runSend2(){
|
|
|
+//// if(isEable!=null&&isEable=="1"){
|
|
|
+//// send();
|
|
|
+//// isEable = "0";
|
|
|
+//// }
|
|
|
+//// }
|
|
|
+////
|
|
|
+//// public void runReceive1(){
|
|
|
+//// isEable = "1";
|
|
|
+//// }
|
|
|
+////
|
|
|
+// @Scheduled(cron = "0 */5 * * * ?")
|
|
|
+// @Async("threadPoolTaskExecutor2")
|
|
|
+// public void runReceive2(){
|
|
|
+//// if(isEable!=null&&isEable=="1"){
|
|
|
+// receive();
|
|
|
+//// isEable = "0";
|
|
|
+//// }
|
|
|
+// }
|
|
|
+////
|
|
|
+//// /**
|
|
|
+//// * 生产者
|
|
|
+//// */
|
|
|
+//// public void send(){
|
|
|
+//// //获取连接
|
|
|
+//// Connection connection = ConnectionUtil.getConnection();
|
|
|
+//// try {
|
|
|
+//// List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
|
|
|
+//// String userString = JSON.toJSONString(dmDayPc);
|
|
|
+//// Channel channel = connection.createChannel();
|
|
|
+//// String exchangeName = "schedule_consume";//交换机
|
|
|
+//// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
|
|
|
+//// String queue1Name = "schedule_consume";//队列名称
|
|
|
+//// channel.queueDeclare(queue1Name, true, false, false, null);
|
|
|
+//// channel.queueBind(queue1Name, exchangeName, "schedule_consume");// 队列绑定hello路由
|
|
|
+//// channel.basicPublish(exchangeName, "schedule_consume", null, userString.getBytes());
|
|
|
+//// System.out.println("sned排菜:"+dmDayPc);
|
|
|
+//// channel.close();
|
|
|
+//// connection.close();
|
|
|
+//// } catch (IOException | TimeoutException e) {
|
|
|
+//// e.printStackTrace();
|
|
|
+//// }
|
|
|
+//// }
|
|
|
+////
|
|
|
+// /**
|
|
|
+// * 消费者
|
|
|
+// */
|
|
|
+// public void receive(){
|
|
|
+// Connection connection = ConnectionUtil.getConnection();
|
|
|
+// try {
|
|
|
+// Channel channel = connection.createChannel();
|
|
|
+// //通过consumer来处理数据
|
|
|
+// Consumer consumer = new DefaultConsumer(channel){
|
|
|
+// @SneakyThrows
|
|
|
+// @Override
|
|
|
+// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
|
|
|
+// //body就是从队列中获取的数据
|
|
|
+// String str = new String(body);
|
|
|
+// if (str==null){
|
|
|
+// System.out.println("erropc"+str);
|
|
|
+// System.out.println("erropc"+body);
|
|
|
+// }else {
|
|
|
+// System.out.println("sucesspc"+str);
|
|
|
+// System.out.println("sucesspc"+body);
|
|
|
+// }
|
|
|
+// System.out.println("dingshipc"+str);
|
|
|
+// List<DmDayPc> list = JSONArray.parseArray(str,DmDayPc.class);
|
|
|
+// for (DmDayPc dmDayPc : list) {
|
|
|
+// dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
|
|
|
+// dmDayPcService.create(dmDayPc);
|
|
|
+// System.out.println("新增food==="+dmDayPc.getFood());
|
|
|
+// dmFoodService.create(dmDayPc.getFood());
|
|
|
+// }
|
|
|
+// }
|
|
|
+// };
|
|
|
+// //参数1:接收哪个队列的数据
|
|
|
+// //参数2:消息确认 是否应答,收到消息是否回复
|
|
|
+// //参数3:
|
|
|
+// channel.basicConsume("schedule_produce",true,consumer);
|
|
|
+// } catch (IOException e) {
|
|
|
+// e.printStackTrace();
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|