|
@@ -1,114 +1,123 @@
|
|
|
-//package me.zhengjie.modules.dm.daypc.rest;
|
|
|
-//
|
|
|
-//import com.alibaba.fastjson.JSON;
|
|
|
-//import com.alibaba.fastjson.JSONArray;
|
|
|
-//import com.alibaba.fastjson.JSONObject;
|
|
|
-//import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
-//import com.rabbitmq.client.*;
|
|
|
-//import com.rabbitmq.client.Connection;
|
|
|
-//import lombok.SneakyThrows;
|
|
|
-//import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
|
|
|
-//import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
|
|
|
-//import me.zhengjie.modules.dm.food.domain.DmFood;
|
|
|
-//import me.zhengjie.modules.dm.food.service.DmFoodService;
|
|
|
-//import me.zhengjie.utils.ConnectionUtil;
|
|
|
-//import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
-//import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-//import org.springframework.scheduling.annotation.Async;
|
|
|
-//import org.springframework.scheduling.annotation.Scheduled;
|
|
|
-//import org.springframework.stereotype.Component;
|
|
|
-//
|
|
|
-//import java.io.*;
|
|
|
-//import java.sql.*;
|
|
|
-//import java.text.*;
|
|
|
-//import java.util.*;
|
|
|
-//
|
|
|
-//@Component
|
|
|
-////@RabbitListener(queues = "schedule_produce")
|
|
|
-//public class ReceiveMsg {
|
|
|
-//// private final static String SCHEDULE_PRODUCE = "schedule_produce";
|
|
|
-// @Autowired
|
|
|
-// private DmDayPcService dmDayPcService;
|
|
|
-// @Autowired
|
|
|
-// private DmFoodService dmFoodService;
|
|
|
-// @Autowired
|
|
|
-// RabbitTemplate rabbitTemplate;
|
|
|
-// private static String isEable = "0";
|
|
|
-//
|
|
|
-// //每天10点执行
|
|
|
-// @Scheduled(cron = "0 0 10 * * ?")
|
|
|
-// //异步执行,指定线程池(配置类里配置)
|
|
|
-// @Async("threadPoolTaskExecutor1")
|
|
|
-// public void timer(){
|
|
|
-// isEable = "1";
|
|
|
-// }
|
|
|
-//
|
|
|
-// //每五分钟执行一次
|
|
|
-// @Scheduled(cron = "0 */5 * * * ?")
|
|
|
-// @Async("threadPoolTaskExecutor2")
|
|
|
-// public void timer2(){
|
|
|
-// if(isEable!=null&&isEable=="1"){
|
|
|
-// receive();
|
|
|
-// isEable = "0";
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// /**
|
|
|
-// * 消费者
|
|
|
-// */
|
|
|
-// public void receive(){
|
|
|
-// Connection connection = ConnectionUtil.getConnection();
|
|
|
-// try {
|
|
|
-// Channel channel = connection.createChannel();
|
|
|
-// //通过consumer来处理数据
|
|
|
-// Consumer consumer = new DefaultConsumer(channel){
|
|
|
-// @SneakyThrows
|
|
|
-// @Override
|
|
|
-// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
|
|
|
-// //body就是从队列中获取的数据
|
|
|
-// String str = new String(body);
|
|
|
-// List<DmDayPc> list = JSONArray.parseArray(str,DmDayPc.class);
|
|
|
-//// Map<String,Object> map = null;
|
|
|
-// for (DmDayPc dmDayPc : list) {
|
|
|
-//// map = new HashMap<>();
|
|
|
-//// map.put("food",dmDayPc.getFood());
|
|
|
-// dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
|
|
|
-// dmDayPcService.create(dmDayPc);
|
|
|
-// System.out.println("新增food==="+dmDayPc.getFood());
|
|
|
-// dmFoodService.create(dmDayPc.getFood());
|
|
|
+package me.zhengjie.modules.dm.daypc.rest;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.rabbitmq.client.*;
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
+import lombok.SneakyThrows;
|
|
|
+import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
|
|
|
+import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
|
|
|
+import me.zhengjie.modules.dm.food.domain.DmFood;
|
|
|
+import me.zhengjie.modules.dm.food.service.DmFoodService;
|
|
|
+import me.zhengjie.utils.ConnectionUtil;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.io.*;
|
|
|
+import java.sql.*;
|
|
|
+import java.text.*;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+@Component
|
|
|
+//@RabbitListener(queues = "schedule_produce")
|
|
|
+public class ReceiveMsg {
|
|
|
+// private final static String SCHEDULE_PRODUCE = "schedule_produce";
|
|
|
+ @Autowired
|
|
|
+ private DmDayPcService dmDayPcService;
|
|
|
+ @Autowired
|
|
|
+ private DmFoodService dmFoodService;
|
|
|
+ @Autowired
|
|
|
+ RabbitTemplate rabbitTemplate;
|
|
|
+ private static String isEable = "0";
|
|
|
+
|
|
|
+ //每天10点执行
|
|
|
+ @Scheduled(cron = "0 0 10 * * ?")
|
|
|
+ //异步执行,指定线程池(配置类里配置)
|
|
|
+ @Async("threadPoolTaskExecutor1")
|
|
|
+ public void timer(){
|
|
|
+ isEable = "1";
|
|
|
+ }
|
|
|
+
|
|
|
+ //每五分钟执行一次
|
|
|
+ @Scheduled(cron = "0 */5 * * * ?")
|
|
|
+ @Async("threadPoolTaskExecutor2")
|
|
|
+ public void timer2(){
|
|
|
+ if(isEable!=null&&isEable=="1"){
|
|
|
+ receive();
|
|
|
+ isEable = "0";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消费者
|
|
|
+ */
|
|
|
+
|
|
|
+ public void receive(){
|
|
|
+ Connection connection = ConnectionUtil.getConnection();
|
|
|
+ try {
|
|
|
+ Channel channel = connection.createChannel();
|
|
|
+ //通过consumer来处理数据
|
|
|
+ Consumer consumer = new DefaultConsumer(channel){
|
|
|
+ @SneakyThrows
|
|
|
+ @Override
|
|
|
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
|
|
|
+ //body就是从队列中获取的数据
|
|
|
+ String str = new String(body);
|
|
|
+ if (str==null){
|
|
|
+ System.out.println("erro"+str);
|
|
|
+ System.out.println("erro"+body);
|
|
|
+ }else {
|
|
|
+ System.out.println("sucess"+str);
|
|
|
+ System.out.println("sucess"+body);
|
|
|
+ }
|
|
|
+ List<DmDayPc> list = JSONArray.parseArray(str,DmDayPc.class);
|
|
|
+// Map<String,Object> map = null;
|
|
|
+ for (DmDayPc dmDayPc : list) {
|
|
|
+// map = new HashMap<>();
|
|
|
+// map.put("food",dmDayPc.getFood());
|
|
|
+ dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
|
|
|
+ dmDayPcService.create(dmDayPc);
|
|
|
+ System.out.println("新增food==="+dmDayPc.getFood());
|
|
|
+ dmFoodService.create(dmDayPc.getFood());
|
|
|
+ }
|
|
|
+// Map<String,Object> map = JSONObject.parseObject(str);
|
|
|
+// for (Map.Entry<String, Object> entry : map.entrySet()) {
|
|
|
+// dmFoodService.create((DmFood) entry);
|
|
|
+// System.out.println("key====>" + entry.getKey() + ",value===>" + entry.getValue());
|
|
|
// }
|
|
|
-//// Map<String,Object> map = JSONObject.parseObject(str);
|
|
|
-//// for (Map.Entry<String, Object> entry : map.entrySet()) {
|
|
|
-//// dmFoodService.create((DmFood) entry);
|
|
|
-//// System.out.println("key====>" + entry.getKey() + ",value===>" + entry.getValue());
|
|
|
-//// }
|
|
|
-//
|
|
|
-// }
|
|
|
-// };
|
|
|
-// //参数1:接收哪个队列的数据
|
|
|
-// //参数2:消息确认 是否应答,收到消息是否回复
|
|
|
-// //参数3:
|
|
|
-// channel.basicConsume("schedule_produce",true,consumer);
|
|
|
-// } catch (IOException e) {
|
|
|
-// e.printStackTrace();
|
|
|
-// }
|
|
|
+
|
|
|
+ }
|
|
|
+ };
|
|
|
+ //参数1:接收哪个队列的数据
|
|
|
+ //参数2:消息确认 是否应答,收到消息是否回复
|
|
|
+ //参数3:
|
|
|
+ System.out.println();
|
|
|
+ channel.basicConsume("schedule_produce",true,consumer);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+// public List<DmDayPc> getDmDayPc(){
|
|
|
+// return JSONObject.parseArray(getDmDayPc(),DmDayPc.class);
|
|
|
// }
|
|
|
-//
|
|
|
-//// public List<DmDayPc> getDmDayPc(){
|
|
|
-//// return JSONObject.parseArray(getDmDayPc(),DmDayPc.class);
|
|
|
-//// }
|
|
|
-//
|
|
|
-// public Object getObjectFromBytes(byte[] objBytes) throws Exception {
|
|
|
-// if (objBytes == null || objBytes.length == 0) {
|
|
|
-// return null;
|
|
|
-// }
|
|
|
-// ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
|
|
|
-// ObjectInputStream oi = new ObjectInputStream(bi);
|
|
|
-// return oi.readObject();
|
|
|
+
|
|
|
+ public Object getObjectFromBytes(byte[] objBytes) throws Exception {
|
|
|
+ if (objBytes == null || objBytes.length == 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
|
|
|
+ ObjectInputStream oi = new ObjectInputStream(bi);
|
|
|
+ return oi.readObject();
|
|
|
+ }
|
|
|
+
|
|
|
+// @RabbitHandler
|
|
|
+// public void process(DmDayPc dmDayPc) {
|
|
|
+// dmDayPcService.create(dmDayPc);
|
|
|
// }
|
|
|
-//
|
|
|
-//// @RabbitHandler
|
|
|
-//// public void process(DmDayPc dmDayPc) {
|
|
|
-//// dmDayPcService.create(dmDayPc);
|
|
|
-//// }
|
|
|
-//}
|
|
|
+}
|