he.dujuan 3 anos atrás
pai
commit
691bd62350
22 arquivos alterados com 1280 adições e 618 exclusões
  1. 4 0
      eladmin-common/pom.xml
  2. 57 24
      eladmin-common/src/main/java/me/zhengjie/utils/ConnectionUtil.java
  3. 8 0
      eladmin-system/pom.xml
  4. 122 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/RabbitmqConfig.java
  5. 70 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/cuisine/CuisineConsumer.java
  6. 70 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/cuisine/CuisineProduce.java
  7. 70 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/order/OrderConsumer.java
  8. 70 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/order/OrederProduce.java
  9. 71 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/test/ListenerTemplate.java
  10. 33 0
      eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/test/SendTemplate.java
  11. 38 0
      eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java
  12. 1 9
      eladmin-system/src/main/java/me/zhengjie/modules/dm/daypc/rest/DmDayPcController.java
  13. 77 74
      eladmin-system/src/main/java/me/zhengjie/modules/dm/daypc/rest/ReceiveMsg.java
  14. 64 50
      eladmin-system/src/main/java/me/zhengjie/modules/dm/daypc/rest/SendMsg.java
  15. 52 52
      eladmin-system/src/main/java/me/zhengjie/modules/dm/order/rest/OrderReceiveMsg.java
  16. 57 57
      eladmin-system/src/main/java/me/zhengjie/modules/dm/order/rest/OrderSendMsg.java
  17. 122 112
      eladmin-system/src/main/java/me/zhengjie/modules/quartz/task/DayPcDataTask.java
  18. 115 108
      eladmin-system/src/main/java/me/zhengjie/modules/quartz/task/OrderDataTask.java
  19. 30 0
      eladmin-system/src/main/java/me/zhengjie/modules/test/TestController.java
  20. 2 2
      eladmin-system/src/main/java/me/zhengjie/modules/thirdparty/v1/OpenApiController.java
  21. 16 2
      eladmin-system/src/main/resources/config/application.yml
  22. 131 128
      eladmin-system/src/test/java/me/zhengjie/TestRabbit.java

+ 4 - 0
eladmin-common/pom.xml

@@ -32,5 +32,9 @@
             <groupId>org.springframework.amqp</groupId>
             <artifactId>spring-amqp</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+        </dependency>
     </dependencies>
 </project>

+ 57 - 24
eladmin-common/src/main/java/me/zhengjie/utils/ConnectionUtil.java

@@ -1,7 +1,10 @@
 package me.zhengjie.utils;
 
 import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
+//import com.rabbitmq.client.ConnectionFactory;
+import org.springframework.amqp.rabbit.connection.*;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.*;
 
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
@@ -13,29 +16,59 @@ import java.util.concurrent.TimeoutException;
  */
 public class ConnectionUtil {
 
-    public static Connection getConnection() {
-        //定义连接工厂
-        ConnectionFactory factory = new ConnectionFactory();
-        //设置服务地址
-        factory.setHost("10.208.19.6");//10.21.39.1
-        //端口
-        factory.setPort(11672);//56720
-        //设置账号信息
-        //vhost(Virtual Host)
-//        factory.setVirtualHost("test");
-        //用户名
-        factory.setUsername("usemq");//admin
-        //密码
-        factory.setPassword("User!@#$mq");//123456
-        //通过工厂获得与mq的连接对象
-        Connection connection = null;
-        try {
-            connection = factory.newConnection();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return connection;
-    }
+//    public static Connection getConnection() {
+//        //定义连接工厂
+//        ConnectionFactory factory = new ConnectionFactory();
+//        //设置服务地址
+//        factory.setHost("10.208.19.5");//10.21.39.1 10.208.19.6
+//        //端口
+//        factory.setPort(11673);//56720 11672
+//        //设置账号信息
+//        //vhost(Virtual Host)
+////        factory.setVirtualHost("schedule_produce");
+//        //用户名
+//        factory.setUsername("rabbit");//admin usermq
+//        //密码
+//        factory.setPassword("Rabbit!@#$123");//123456  User!@#$mq
+//        //通过工厂获得与mq的连接对象
+//        Connection connection = null;
+//        try {
+//            connection = factory.newConnection();
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//        return connection;
+//    }
+
+//    @Bean(name = "firstConnectionFactory")
+//    public ConnectionFactory firstConnectionFactory(@Value("${my.rabbitmq.host}") String host,
+//                                                    @Value("${my.rabbitmq.port}") int port,
+//                                                    @Value("${my.rabbitmq.username}") String username,
+//                                                    @Value("${my.rabbitmq.password}") String password,
+//                                                    @Value("${my.rabbitmq.first.virtual-host}") String vHost) {
+//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
+//        connectionFactory.setHost(host);
+//        connectionFactory.setPort(port);
+//        connectionFactory.setUsername(username);
+//        connectionFactory.setPassword(password);
+//        connectionFactory.setVirtualHost(vHost);
+//        return connectionFactory;
+//    }
+//
+//    @Bean(name = "secondConnectionFactory")
+//    public ConnectionFactory secondConnectionFactory(@Value("${my.rabbitmq.host}") String host,
+//                                                    @Value("${my.rabbitmq.port}") int port,
+//                                                    @Value("${my.rabbitmq.username}") String username,
+//                                                    @Value("${my.rabbitmq.password}") String password,
+//                                                    @Value("${my.rabbitmq.second.virtual-host}") String vHost) {
+//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
+//        connectionFactory.setHost(host);
+//        connectionFactory.setPort(port);
+//        connectionFactory.setUsername(username);
+//        connectionFactory.setPassword(password);
+//        connectionFactory.setVirtualHost(vHost);
+//        return connectionFactory;
+//    }
 
 }
 

+ 8 - 0
eladmin-system/pom.xml

@@ -108,6 +108,14 @@
             <groupId>com.rabbitmq</groupId>
             <artifactId>amqp-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-test</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
     </dependencies>
 
     <!-- 打包 -->

+ 122 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/RabbitmqConfig.java

@@ -0,0 +1,122 @@
+package me.zhengjie.config.rabbitmq;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Optional;
+
+/**
+ * @author yq
+ * @date 2021/9/9 13:37
+ */
+@Slf4j
+@Data
+@Configuration
+public class RabbitmqConfig {
+
+    public final static String CONNECTION = "ConnectionFactory";
+    public final static String TEMPLATE = "RabbitTemplate";
+    public final static String LISTENER = "ListenerFactory";
+    public final static String ADMIN = "Admin";
+    public final static String EXCHANGE = "Exchange";
+    public final static String QUEUE = "Queue";
+    public final static String BINDING = "Bin";
+
+    @Autowired
+    private RabbitProperties rabbitProperties;
+
+
+    /**
+     * 连接工厂
+     * @param vHost
+     * @return
+     */
+    public ConnectionFactory connectionFactory(String vHost) {
+        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
+        connectionFactory.setHost(rabbitProperties.getHost());
+        connectionFactory.setPort(rabbitProperties.getPort());
+        connectionFactory.setUsername(rabbitProperties.getUsername());
+        connectionFactory.setPassword(rabbitProperties.getPassword());
+        connectionFactory.setPublisherReturns(true);
+        connectionFactory.setVirtualHost(vHost);
+        return connectionFactory;
+    }
+    /**
+     * 发送消息模版
+     * @param connectionFactory
+     * @return
+     */
+    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
+        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+        rabbitTemplate.setMessageConverter(this.messageConverter());
+        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
+        rabbitTemplate.setMandatory(true);
+        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
+        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
+            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
+        });
+
+        return rabbitTemplate;
+    }
+
+    /**
+     * 监听工厂
+     * @param configurer
+     * @param connectionFactory
+     * @return
+     */
+    public SimpleRabbitListenerContainerFactory listenerFactory(
+            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+            ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
+        //设置手动ack
+        listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
+        configurer.configure(listenerContainerFactory, connectionFactory);
+        return listenerContainerFactory;
+    }
+
+    /**
+     * 序列化
+      * @return
+     */
+    public Jackson2JsonMessageConverter messageConverter() {
+        return new Jackson2JsonMessageConverter();
+    }
+
+    /**
+     * 交换机
+     * @param name
+     * @param amqpAdmin
+     * @return
+     */
+    public DirectExchange exchange(String name, AmqpAdmin amqpAdmin){
+        DirectExchange exchange = new DirectExchange(name, true, false);
+        exchange.setAdminsThatShouldDeclare(amqpAdmin);
+        return exchange;
+    }
+
+    /**
+     * 队列
+     * @param name
+     * @param amqpAdmin
+     * @return
+     */
+    public Queue queue(String name, AmqpAdmin amqpAdmin){
+        Queue queue = new Queue(name, true);
+        queue.setAdminsThatShouldDeclare(amqpAdmin);
+        return queue;
+    }
+}

+ 70 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/cuisine/CuisineConsumer.java

@@ -0,0 +1,70 @@
+package me.zhengjie.config.rabbitmq.cuisine;
+
+import lombok.Data;
+import me.zhengjie.config.rabbitmq.RabbitmqConfig;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+public class CuisineConsumer {
+    @Autowired
+    private RabbitmqConfig rabbitmqConfig;
+    public static final String NAME = "schedule_produce";
+    public final static String MODULE_NAME = "returnC";
+
+    public final static String CONNECTION_CON = MODULE_NAME+RabbitmqConfig.CONNECTION;
+    public final static String TEMPLATE_CON = MODULE_NAME+ RabbitmqConfig.TEMPLATE;
+    public final static String LISTENER_CON = MODULE_NAME+RabbitmqConfig.LISTENER;
+    public final static String ADMIN_CON = MODULE_NAME+RabbitmqConfig.ADMIN;
+    public final static String EXCHANGE_CON = MODULE_NAME+RabbitmqConfig.EXCHANGE;
+    public final static String QUEUE_CON = MODULE_NAME+RabbitmqConfig.QUEUE;
+    public final static String BINDING_CON = MODULE_NAME+RabbitmqConfig.BINDING;
+
+    @Bean(name = CONNECTION_CON)
+    public ConnectionFactory connectionFactory(){
+        return rabbitmqConfig.connectionFactory(NAME);
+    }
+
+    @Bean(name = TEMPLATE_CON)
+    public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_CON) ConnectionFactory connectionFactory ) {
+        return rabbitmqConfig.rabbitTemplate(connectionFactory);
+    }
+
+    @Bean(name = LISTENER_CON)
+    public SimpleRabbitListenerContainerFactory listenerFactory(
+            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+            @Qualifier(CONNECTION_CON) ConnectionFactory connectionFactory) {
+        return rabbitmqConfig.listenerFactory(configurer,connectionFactory);
+    }
+
+    @Bean(ADMIN_CON)
+    AmqpAdmin amqpAdmin(@Qualifier(CONNECTION_CON) ConnectionFactory connectionFactory) {
+        return new RabbitAdmin(connectionFactory);
+    }
+
+    @Bean(EXCHANGE_CON)
+    public DirectExchange exchange(@Qualifier(ADMIN_CON) AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.exchange(NAME,amqpAdmin);
+    }
+
+    @Bean(QUEUE_CON)
+    Queue queue(@Qualifier(ADMIN_CON)AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.queue(NAME,amqpAdmin);
+    }
+
+    @Bean(BINDING_CON)
+    Binding bindingPersonDirect(@Qualifier(ADMIN_CON)AmqpAdmin amqpAdmin) {
+        Binding binding = BindingBuilder.bind(queue(amqpAdmin)).to(exchange(amqpAdmin)).with(NAME);
+        binding.setAdminsThatShouldDeclare(amqpAdmin);
+        return binding;
+    }
+}

+ 70 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/cuisine/CuisineProduce.java

@@ -0,0 +1,70 @@
+package me.zhengjie.config.rabbitmq.cuisine;
+
+import lombok.Data;
+import me.zhengjie.config.rabbitmq.RabbitmqConfig;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+public class CuisineProduce {
+    @Autowired
+    private RabbitmqConfig rabbitmqConfig;
+    public static final String NAME = "schedule_consume";
+    private final static String MODULE_NAME = "returnC";
+
+    public final static String CONNECTION = MODULE_NAME+RabbitmqConfig.CONNECTION;
+    public final static String TEMPLATE = MODULE_NAME+ RabbitmqConfig.TEMPLATE;
+    public final static String LISTENER = MODULE_NAME+RabbitmqConfig.LISTENER;
+    public final static String ADMIN = MODULE_NAME+RabbitmqConfig.ADMIN;
+    public final static String EXCHANGE = MODULE_NAME+RabbitmqConfig.EXCHANGE;
+    public final static String QUEUE = MODULE_NAME+RabbitmqConfig.QUEUE;
+    public final static String BINDING = MODULE_NAME+RabbitmqConfig.BINDING;
+
+    @Bean(name = CONNECTION)
+    public ConnectionFactory connectionFactory(){
+        return rabbitmqConfig.connectionFactory(NAME);
+    }
+
+    @Bean(name = TEMPLATE)
+    public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION) ConnectionFactory connectionFactory ) {
+        return rabbitmqConfig.rabbitTemplate(connectionFactory);
+    }
+
+    @Bean(name = LISTENER)
+    public SimpleRabbitListenerContainerFactory listenerFactory(
+            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+            @Qualifier(CONNECTION) ConnectionFactory connectionFactory) {
+        return rabbitmqConfig.listenerFactory(configurer,connectionFactory);
+    }
+
+    @Bean(ADMIN)
+    AmqpAdmin amqpAdmin(@Qualifier(CONNECTION) ConnectionFactory connectionFactory) {
+        return new RabbitAdmin(connectionFactory);
+    }
+
+    @Bean(EXCHANGE)
+    public DirectExchange exchange(@Qualifier(ADMIN) AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.exchange(NAME,amqpAdmin);
+    }
+
+    @Bean(QUEUE)
+    Queue queue(@Qualifier(ADMIN)AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.queue(NAME,amqpAdmin);
+    }
+
+    @Bean(BINDING)
+    Binding bindingPersonDirect(@Qualifier(ADMIN)AmqpAdmin amqpAdmin) {
+        Binding binding = BindingBuilder.bind(queue(amqpAdmin)).to(exchange(amqpAdmin)).with(NAME);
+        binding.setAdminsThatShouldDeclare(amqpAdmin);
+        return binding;
+    }
+}

+ 70 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/order/OrderConsumer.java

@@ -0,0 +1,70 @@
+package me.zhengjie.config.rabbitmq.order;
+
+import lombok.Data;
+import me.zhengjie.config.rabbitmq.RabbitmqConfig;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+public class OrderConsumer {
+    @Autowired
+    private RabbitmqConfig rabbitmqConfig;
+    public static final String NAME_ORDER = "dish_trade_produce";
+    public final static String MODULE_NAME_ORDER = "returnC";
+
+    public final static String CONNECTION_ORDER_CON = MODULE_NAME_ORDER+RabbitmqConfig.CONNECTION;
+    public final static String TEMPLATE_ORDER_CON = MODULE_NAME_ORDER+ RabbitmqConfig.TEMPLATE;
+    public final static String LISTENER_ORDER_CON = MODULE_NAME_ORDER+RabbitmqConfig.LISTENER;
+    public final static String ADMIN_ORDER_CON = MODULE_NAME_ORDER+RabbitmqConfig.ADMIN;
+    public final static String EXCHANGE_ORDER_CON = MODULE_NAME_ORDER+RabbitmqConfig.EXCHANGE;
+    public final static String QUEUE_ORDER_CON = MODULE_NAME_ORDER+RabbitmqConfig.QUEUE;
+    public final static String BINDING_ORDER_CON = MODULE_NAME_ORDER+RabbitmqConfig.BINDING;
+
+    @Bean(name = CONNECTION_ORDER_CON)
+    public ConnectionFactory connectionFactory(){
+        return rabbitmqConfig.connectionFactory(NAME_ORDER);
+    }
+
+    @Bean(name = TEMPLATE_ORDER_CON)
+    public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_ORDER_CON) ConnectionFactory connectionFactory ) {
+        return rabbitmqConfig.rabbitTemplate(connectionFactory);
+    }
+
+    @Bean(name = LISTENER_ORDER_CON)
+    public SimpleRabbitListenerContainerFactory listenerFactory(
+            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+            @Qualifier(CONNECTION_ORDER_CON) ConnectionFactory connectionFactory) {
+        return rabbitmqConfig.listenerFactory(configurer,connectionFactory);
+    }
+
+    @Bean(ADMIN_ORDER_CON)
+    AmqpAdmin amqpAdmin(@Qualifier(CONNECTION_ORDER_CON) ConnectionFactory connectionFactory) {
+        return new RabbitAdmin(connectionFactory);
+    }
+
+    @Bean(EXCHANGE_ORDER_CON)
+    public DirectExchange exchange(@Qualifier(ADMIN_ORDER_CON) AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.exchange(NAME_ORDER,amqpAdmin);
+    }
+
+    @Bean(QUEUE_ORDER_CON)
+    Queue queue(@Qualifier(ADMIN_ORDER_CON)AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.queue(NAME_ORDER,amqpAdmin);
+    }
+
+    @Bean(BINDING_ORDER_CON)
+    Binding bindingPersonDirect(@Qualifier(ADMIN_ORDER_CON)AmqpAdmin amqpAdmin) {
+        Binding binding = BindingBuilder.bind(queue(amqpAdmin)).to(exchange(amqpAdmin)).with(NAME_ORDER);
+        binding.setAdminsThatShouldDeclare(amqpAdmin);
+        return binding;
+    }
+}

+ 70 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/order/OrederProduce.java

@@ -0,0 +1,70 @@
+package me.zhengjie.config.rabbitmq.order;
+
+import lombok.Data;
+import me.zhengjie.config.rabbitmq.RabbitmqConfig;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+public class OrederProduce {
+    @Autowired
+    private RabbitmqConfig rabbitmqConfig;
+    public static final String NAME_ORDER = "dish_trade_produce";
+    public final static String MODULE_NAME_ORDER = "returnC";
+
+    public final static String CONNECTION_ORDER = MODULE_NAME_ORDER+RabbitmqConfig.CONNECTION;
+    public final static String TEMPLATE_ORDER = MODULE_NAME_ORDER+ RabbitmqConfig.TEMPLATE;
+    public final static String LISTENER_ORDER = MODULE_NAME_ORDER+RabbitmqConfig.LISTENER;
+    public final static String ADMIN_ORDER = MODULE_NAME_ORDER+RabbitmqConfig.ADMIN;
+    public final static String EXCHANGE_ORDER = MODULE_NAME_ORDER+RabbitmqConfig.EXCHANGE;
+    public final static String QUEUE_ORDER = MODULE_NAME_ORDER+RabbitmqConfig.QUEUE;
+    public final static String BINDING_ORDER = MODULE_NAME_ORDER+RabbitmqConfig.BINDING;
+
+    @Bean(name = CONNECTION_ORDER)
+    public ConnectionFactory connectionFactory(){
+        return rabbitmqConfig.connectionFactory(NAME_ORDER);
+    }
+
+    @Bean(name = TEMPLATE_ORDER)
+    public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_ORDER) ConnectionFactory connectionFactory ) {
+        return rabbitmqConfig.rabbitTemplate(connectionFactory);
+    }
+
+    @Bean(name = LISTENER_ORDER)
+    public SimpleRabbitListenerContainerFactory listenerFactory(
+            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+            @Qualifier(CONNECTION_ORDER) ConnectionFactory connectionFactory) {
+        return rabbitmqConfig.listenerFactory(configurer,connectionFactory);
+    }
+
+    @Bean(ADMIN_ORDER)
+    AmqpAdmin amqpAdmin(@Qualifier(CONNECTION_ORDER) ConnectionFactory connectionFactory) {
+        return new RabbitAdmin(connectionFactory);
+    }
+
+    @Bean(EXCHANGE_ORDER)
+    public DirectExchange exchange(@Qualifier(ADMIN_ORDER) AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.exchange(NAME_ORDER,amqpAdmin);
+    }
+
+    @Bean(QUEUE_ORDER)
+    Queue queue(@Qualifier(ADMIN_ORDER)AmqpAdmin amqpAdmin) {
+        return rabbitmqConfig.queue(NAME_ORDER,amqpAdmin);
+    }
+
+    @Bean(BINDING_ORDER)
+    Binding bindingPersonDirect(@Qualifier(ADMIN_ORDER)AmqpAdmin amqpAdmin) {
+        Binding binding = BindingBuilder.bind(queue(amqpAdmin)).to(exchange(amqpAdmin)).with(NAME_ORDER);
+        binding.setAdminsThatShouldDeclare(amqpAdmin);
+        return binding;
+    }
+}

+ 71 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/test/ListenerTemplate.java

@@ -0,0 +1,71 @@
+package me.zhengjie.config.rabbitmq.test;
+
+import com.alibaba.fastjson.JSONArray;
+import me.zhengjie.config.rabbitmq.cuisine.CuisineConsumer;
+import me.zhengjie.config.rabbitmq.cuisine.CuisineProduce;
+import me.zhengjie.config.rabbitmq.order.OrderConsumer;
+import me.zhengjie.config.rabbitmq.order.OrederProduce;
+import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
+import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
+import me.zhengjie.modules.dm.food.service.DmFoodService;
+import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
+import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Configurable;
+import org.springframework.amqp.core.Message;
+
+import java.sql.Timestamp;
+import java.util.List;
+
+@Configurable
+public class ListenerTemplate {
+    @Autowired
+    DmDayPcService dmDayPcService;
+    @Autowired
+    DmFoodService dmFoodService;
+    @Autowired
+    DmOrderRecordService dmOrderRecordService;
+
+    @RabbitListener(queues = CuisineConsumer.NAME, containerFactory = CuisineConsumer.LISTENER_CON)
+    public void dealDeclareMessage(Message message) {
+        System.out.println("接受到的消息pc"+message);
+        String str = String.valueOf(message);
+        if (str!=null){
+            System.out.println("receive:"+str);
+        }else {
+            System.out.println("receive(null):"+ str);
+        }
+        try {
+            List<DmDayPc> list = JSONArray.parseArray(str, DmDayPc.class);
+            for (DmDayPc dmDayPc : list) {
+                dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
+                dmDayPcService.create(dmDayPc);
+                System.out.println("新增food===" + dmDayPc.getFood());
+                dmFoodService.create(dmDayPc.getFood());
+            }
+        }catch (Exception ex){
+
+        }
+    }
+
+    @RabbitListener(queues = OrderConsumer.NAME_ORDER, containerFactory = OrderConsumer.LISTENER_ORDER_CON)
+    public void dealDeclareOrderMessage(Message message) {
+        System.out.println("接受到的消息order"+message);
+        String str = String.valueOf(message);
+        if (str!=null){
+            System.out.println("receiveOrder:"+str);
+        }else {
+            System.out.println("receiveOrder(null):"+ str);
+        }
+        try {
+            List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str, DmExpenseCalendar.class);
+            for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
+                dmOrderRecordService.createOrderItem(dmExpenseCalendar);
+                dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
+            }
+        }catch (Exception ex){
+
+        }
+    }
+}

+ 33 - 0
eladmin-system/src/main/java/me/zhengjie/config/rabbitmq/test/SendTemplate.java

@@ -0,0 +1,33 @@
+package me.zhengjie.config.rabbitmq.test;
+
+import me.zhengjie.config.rabbitmq.cuisine.CuisineConsumer;
+import me.zhengjie.config.rabbitmq.cuisine.CuisineProduce;
+import me.zhengjie.config.rabbitmq.order.OrderConsumer;
+import me.zhengjie.config.rabbitmq.order.OrederProduce;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Configurable;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SendTemplate {
+
+    @Autowired
+    @Qualifier(CuisineProduce.TEMPLATE)
+    private RabbitTemplate rabbitTemplate;
+
+    @Autowired
+    @Qualifier(OrederProduce.TEMPLATE_ORDER)
+    private RabbitTemplate rabbitTemplate1;
+
+    public void sendMessage(Object message){
+        rabbitTemplate.convertAndSend(CuisineProduce.NAME, CuisineProduce.NAME, message);
+        System.out.println("排菜信息获取----消息发送到rabbitmq");
+    }
+
+    public void sendOrederMessage(Object message){
+        rabbitTemplate1.convertAndSend(OrederProduce.NAME_ORDER, OrederProduce.NAME_ORDER, message);
+        System.out.println("消费记录获取----消息发送到rabbitmq");
+    }
+}

+ 38 - 0
eladmin-system/src/main/java/me/zhengjie/config/thread/ThreadPoolConfig.java

@@ -1,6 +1,9 @@
 package me.zhengjie.config.thread;
 
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -48,4 +51,39 @@ public class ThreadPoolConfig {
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         return executor;
     }
+
+//    @Bean("brianThreadPool")
+//    public ThreadPoolTaskExecutor brianThreadPool(){
+//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+//        //核心线程数
+//        executor.setCorePoolSize(corePoolSize);
+//        //最大线程数
+//        executor.setMaxPoolSize(maxPoolSize);
+//        //队列中最大的数
+//        executor.setQueueCapacity(queueCapacity);
+//        //县城名称前缀
+//        executor.setThreadNamePrefix("brianThreadPool_");
+//        //rejectionPolicy:当pool已经达到max的时候,如何处理新任务
+//        //callerRuns:不在新线程中执行任务,而是由调用者所在的线程来执行
+//        //对拒绝task的处理策略
+//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+//        //线程空闲后最大的存活时间
+//        executor.setKeepAliveSeconds(keepAliveSeconds);
+//        //初始化加载
+//        executor.initialize();
+//        return executor;
+//    }
+
+    @Bean("brianThreadPool")
+    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+//        factory.setConcurrentConsumers(10);  //设置线程数
+//        factory.setMaxConcurrentConsumers(10); //最大线程数
+        //核心线程数
+        factory.setConcurrentConsumers(corePoolSize);
+        //最大线程数
+        factory.setMaxConcurrentConsumers(maxPoolSize);
+        configurer.configure(factory, connectionFactory);
+        return factory;
+    }
 }

+ 1 - 9
eladmin-system/src/main/java/me/zhengjie/modules/dm/daypc/rest/DmDayPcController.java

@@ -42,7 +42,7 @@ import javax.servlet.http.HttpServletResponse;
 public class DmDayPcController {
 
     private final DmDayPcService dmDayPcService;
-    private final SendMsg sendMsg;
+//    private final SendMsg sendMsg;
 
     @Log("导出数据")
     @ApiOperation("导出数据")
@@ -60,14 +60,6 @@ public class DmDayPcController {
         return new ResponseEntity<>(dmDayPcService.queryAll(criteria,pageable),HttpStatus.OK);
     }
 
-    @GetMapping(value = "/send")
-    @Log("查询sned")
-    @ApiOperation("查询send")
-    public ResponseEntity<Object> send() {
-        sendMsg.send();
-        return new ResponseEntity<>(HttpStatus.OK);
-    }
-
     @PostMapping
     @Log("新增daypc")
     @ApiOperation("新增daypc")

+ 77 - 74
eladmin-system/src/main/java/me/zhengjie/modules/dm/daypc/rest/ReceiveMsg.java

@@ -4,27 +4,35 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.rabbitmq.client.*;
+//import com.rabbitmq.client.*;
 import com.rabbitmq.client.Connection;
 import lombok.SneakyThrows;
+import me.zhengjie.config.thread.ThreadPoolConfig;
 import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
 import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
 import me.zhengjie.modules.dm.food.domain.DmFood;
 import me.zhengjie.modules.dm.food.service.DmFoodService;
 import me.zhengjie.utils.ConnectionUtil;
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import java.io.*;
 import java.sql.*;
 import java.text.*;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 @Component
-//@RabbitListener(queues = "schedule_produce")
 public class ReceiveMsg {
 //    private final static String SCHEDULE_PRODUCE = "schedule_produce";
     @Autowired
@@ -35,89 +43,84 @@ public class ReceiveMsg {
     RabbitTemplate rabbitTemplate;
     private static String isEable = "0";
 
-    //每天10点执行
-    @Scheduled(cron = "0 0 10 * * ?")
-    //异步执行,指定线程池(配置类里配置)
-    @Async("threadPoolTaskExecutor1")
-    public void timer(){
-        isEable = "1";
-    }
-
-    //每五分钟执行一次
-    @Scheduled(cron = "0 */5 * * * ?")
-    @Async("threadPoolTaskExecutor2")
-    public void timer2(){
-        if(isEable!=null&&isEable=="1"){
-            receive();
-            isEable = "0";
-        }
-    }
+//    //每天10点执行
+//    @Scheduled(cron = "0 0 10 * * ?")
+//    //异步执行,指定线程池(配置类里配置)
+//    @Async("threadPoolTaskExecutor1")
+//    public void timer(){
+//        isEable = "1";
+//    }
+//
+//    //每五分钟执行一次
+//    @Scheduled(cron = "0 */5 * * * ?")
+//    @Async("threadPoolTaskExecutor2")
+//    public void timer2(){
+//        if(isEable!=null&&isEable=="1"){
+//            receive();
+//            isEable = "0";
+//        }
+//    }
 
     /**
      * 消费者
      */
+//    @RabbitListener(queues = "schedule_produce",containerFactory = "brianThreadPool")
+    public void receive(byte[] msg){
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            Channel channel = connection.createChannel();
+//            //通过consumer来处理数据
+//            Consumer consumer = new DefaultConsumer(channel){
+//                @SneakyThrows
+//                @Override
+//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//                    //body就是从队列中获取的数据
+                    String str = new String(msg);
 
-    public void receive(){
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            Channel channel = connection.createChannel();
-            //通过consumer来处理数据
-            Consumer consumer = new DefaultConsumer(channel){
-                @SneakyThrows
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    //body就是从队列中获取的数据
-                    String str = new String(body);
-                    if (str==null){
-                        System.out.println("erro"+str);
-                        System.out.println("erro"+body);
+                    if (str!=null){
+                        System.out.println("receive:"+str);
                     }else {
-                        System.out.println("sucess"+str);
-                        System.out.println("sucess"+body);
+                        System.out.println("receive(null):"+ str);
                     }
-                    List<DmDayPc>  list = JSONArray.parseArray(str,DmDayPc.class);
-//                    Map<String,Object> map = null;
-                    for (DmDayPc dmDayPc : list) {
-//                        map = new HashMap<>();
-//                        map.put("food",dmDayPc.getFood());
-                        dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
-                        dmDayPcService.create(dmDayPc);
-                        System.out.println("新增food==="+dmDayPc.getFood());
-                        dmFoodService.create(dmDayPc.getFood());
-                    }
-//                    Map<String,Object> map = JSONObject.parseObject(str);
-//                    for (Map.Entry<String, Object> entry : map.entrySet()) {
-//                        dmFoodService.create((DmFood) entry);
-//                        System.out.println("key====>" + entry.getKey() + ",value===>" + entry.getValue());
-//                    }
+                    try {
+                        List<DmDayPc> list = JSONArray.parseArray(str, DmDayPc.class);
+                        for (DmDayPc dmDayPc : list) {
+                            dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
+                            dmDayPcService.create(dmDayPc);
+                            System.out.println("新增food===" + dmDayPc.getFood());
+                            dmFoodService.create(dmDayPc.getFood());
+                        }
+                    }catch (Exception ex){
 
-                }
-            };
-            //参数1:接收哪个队列的数据
-            //参数2:消息确认 是否应答,收到消息是否回复
-            //参数3:
-            System.out.println();
-            channel.basicConsume("schedule_produce",true,consumer);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+                    }
+//                }
+//            };
+//            //参数1:接收哪个队列的数据
+//            //参数2:消息确认 是否应答,收到消息是否回复
+//            //参数3:
+//            System.out.println();
+//            channel.basicConsume("schedule_produce",true,consumer);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
     }
 
-//    public List<DmDayPc> getDmDayPc(){
-//        return JSONObject.parseArray(getDmDayPc(),DmDayPc.class);
+//    public SimpleRabbitListenerContainerFactory firstListenerFactory(
+//            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+//            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
+//        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
+//        //设置手动ack
+//        listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+//        configurer.configure(listenerContainerFactory, connectionFactory);
+//        return listenerContainerFactory;
 //    }
 
-    public  Object getObjectFromBytes(byte[] objBytes) throws Exception {
-        if (objBytes == null || objBytes.length == 0) {
-            return null;
-        }
-        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
-        ObjectInputStream oi = new ObjectInputStream(bi);
-        return oi.readObject();
-    }
-
-//    @RabbitHandler
-//    public void process(DmDayPc dmDayPc) {
-//        dmDayPcService.create(dmDayPc);
+//    public  Object getObjectFromBytes(byte[] objBytes) throws Exception {
+//        if (objBytes == null || objBytes.length == 0) {
+//            return null;
+//        }
+//        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
+//        ObjectInputStream oi = new ObjectInputStream(bi);
+//        return oi.readObject();
 //    }
 }

+ 64 - 50
eladmin-system/src/main/java/me/zhengjie/modules/dm/daypc/rest/SendMsg.java

@@ -8,85 +8,99 @@ import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
 import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
 import me.zhengjie.modules.dm.daypc.service.dto.DmDayPcDto;
 import me.zhengjie.utils.ConnectionUtil;
+import me.zhengjie.utils.SpringContextHolder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.test.context.junit4.SpringRunner;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+//@SpringBootTest
+//@RunWith(SpringRunner.class)
 @Component
 public class SendMsg {
     @Autowired
-    private AmqpTemplate amqpTemplate;
+    RabbitTemplate rabbitTemplate;
     @Autowired
-    private DmDayPcService dmDayPcService;
+    DmDayPcService dmDayPcService;
     private static String isEable = "0";
 
     //每天10点执行
-    @Scheduled(cron = "0 0 10 * * ?")
-    //异步执行,指定线程池(配置类里配置)
-    @Async("threadPoolTaskExecutor1")
-    public void timer(){
-        isEable = "1";
-    }
+//    @Scheduled(cron = "0 0 10 * * ?")
+//    //异步执行,指定线程池(配置类里配置)
+//    @Async("threadPoolTaskExecutor1")
+//    public void timer(){
+//        isEable = "1";
+//    }
+//
+//    //每五分钟执行一次
+//    @Scheduled(cron = "0 */5 * * * ?")
+//    @Async("threadPoolTaskExecutor2")
+//    public void timer2(){
+//        if(isEable!=null&&isEable=="1"){
+//            send();
+//            isEable = "0";
+//        }
+//    }
 
-    //每五分钟执行一次
-    @Scheduled(cron = "0 */5 * * * ?")
-    @Async("threadPoolTaskExecutor2")
-    public void timer2(){
-        if(isEable!=null&&isEable=="1"){
-            send();
-            isEable = "0";
-        }
-    }
 
+
+//    @Test
     public void send(){
-//        String msg = "Hello,rabbit";
         //获取连接
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
-            String userString = JSON.toJSONString(dmDayPc);
-            Channel channel = connection.createChannel();
-            String exchangeName = "schedule_produce";//交换机
-            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
-            String queue1Name = "schedule_produce";//队列名称
-            channel.queueDeclare(queue1Name, true, false, false, null);
-            channel.queueBind(queue1Name, exchangeName, "schedule_produce");// 队列绑定hello路由
-            channel.basicPublish(exchangeName, "schedule_produce", null, userString.getBytes());
-            System.out.println("sned排菜:"+dmDayPc);
-            //            Boy boy=new Boy(15,"tom");
-            //对象转化为字节码 把对象转化为字节码后,把字节码传输过去再转化为对象
-//            byte[] bytes=getBytesFromObject(dmDayPc);
-//            amqpTemplate.convertAndSend("exchange","topic.messages",bytes);
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
 
-//            List<DmDayPcDto> dmDayPc = dmDayPcService.queryAll(null);
+//            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
+        String userString = "sendtest";
 //            String userString = JSON.toJSONString(dmDayPc);
-//            amqpTemplate.convertAndSend(exchangeName,queue1Name,userString);
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            e.printStackTrace();
-        }
+//            Channel channel = connection.createChannel();
+//            String exchangeName = "schedule_produce";//交换机
+//            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+//            String queue1Name = "schedule_produce";//队列名称
+//            channel.queueDeclare(queue1Name, true, false, false, null);
+//            channel.queueBind(queue1Name, exchangeName, "schedule_produce");// 队列绑定hello路由
+//            channel.basicPublish(exchangeName, "schedule_produce", null, userString.getBytes());
+        /**
+         * 参数:
+         * 1、交换机名称
+         * 2、routingKey
+         * 3、消息内容
+         */
+//            rabbitTemplate.convertAndSend("schedule_produce", "inform.email", userString.getBytes(StandardCharsets.UTF_8));
+//            System.out.println("sned排菜:"+dmDayPc);
+            System.out.println("sned排菜string:"+userString);
+//            channel.close();
+//            connection.close();
+//        } catch (IOException | TimeoutException e) {
+//            e.printStackTrace();
+//        }
     }
 
     //对象转化为字节码
-    public  byte[] getBytesFromObject(Serializable obj) throws Exception {
-        if (obj == null) {
-            return null;
-        }
-        ByteArrayOutputStream bo = new ByteArrayOutputStream();
-        ObjectOutputStream oo = new ObjectOutputStream(bo);
-        oo.writeObject(obj);
-        return bo.toByteArray();
-    }
+//    public  byte[] getBytesFromObject(Serializable obj) throws Exception {
+//        if (obj == null) {
+//            return null;
+//        }
+//        ByteArrayOutputStream bo = new ByteArrayOutputStream();
+//        ObjectOutputStream oo = new ObjectOutputStream(bo);
+//        oo.writeObject(obj);
+//        return bo.toByteArray();
+//    }
 }

+ 52 - 52
eladmin-system/src/main/java/me/zhengjie/modules/dm/order/rest/OrderReceiveMsg.java

@@ -12,6 +12,7 @@ import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
 import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
 import me.zhengjie.modules.dm.order.service.dto.DmOrderItemDto;
 import me.zhengjie.utils.ConnectionUtil;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -30,67 +31,66 @@ public class OrderReceiveMsg {
     private static String isEable = "0";
 
     //每天10点执行
-    @Scheduled(cron = "0 0 10 * * ?")
-    //异步执行,指定线程池(配置类里配置)
-    @Async("threadPoolTaskExecutor1")
-    public void timer(){
-        isEable = "1";
-    }
-
-    //每五分钟执行一次
-    @Scheduled(cron = "0 */5 * * * ?")
-    @Async("threadPoolTaskExecutor2")
-    public void timer2(){
-        if(isEable!=null&&isEable=="1"){
-            receive();
-            isEable = "0";
-        }
-    }
+//    @Scheduled(cron = "0 0 10 * * ?")
+//    //异步执行,指定线程池(配置类里配置)
+//    @Async("threadPoolTaskExecutor1")
+//    public void timer(){
+//        isEable = "1";
+//    }
+//
+//    //每五分钟执行一次
+//    @Scheduled(cron = "0 */5 * * * ?")
+//    @Async("threadPoolTaskExecutor2")
+//    public void timer2(){
+//        if(isEable!=null&&isEable=="1"){
+//            receive();
+//            isEable = "0";
+//        }
+//    }
 
     /**
      * 消费者
      */
-    public void receive(){
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            Channel channel = connection.createChannel();
-            //通过consumer来处理数据
-            Consumer consumer = new DefaultConsumer(channel){
-                @SneakyThrows
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//    @RabbitListener(queues = "dish_trade_produce",containerFactory = "brianThreadPool")
+    public void receive(byte[] body){
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            Channel channel = connection.createChannel();
+//            //通过consumer来处理数据
+//            Consumer consumer = new DefaultConsumer(channel){
+//                @SneakyThrows
+//                @Override
+//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                     //body就是从队列中获取的数据
                     String str = new String(body);
-                    if (str==null){
-                        System.out.println("erroOrder"+str);
-                        System.out.println("erroOrder"+body);
+                    if (str!=null){
+                        System.out.println("receiveOrder:"+str);
                     }else {
-                        System.out.println("sucessOrder"+str);
-                        System.out.println("sucessOreder"+body);
+                        System.out.println("receiveOrder(null):"+ str);
                     }
-                    List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str,DmExpenseCalendar.class);
-//                    DmOrderRecord dmOrderRecord = (DmOrderRecord) getObjectFromBytes(body);
-//                    dmOrderItemService.create(dmOrderItem);
-//                    dmOrderRecordService.create(dmOrderRecord);
-                    for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
-//                        dmExpenseCalendar.setCreateDate(new Timestamp(System.currentTimeMillis()));
-                        dmOrderRecordService.createOrderItem(dmExpenseCalendar);
-                        dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
+                    try {
+                        List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str, DmExpenseCalendar.class);
+                        for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
+                            dmOrderRecordService.createOrderItem(dmExpenseCalendar);
+                            dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
+                        }
+                    }catch (Exception ex){
+
                     }
-                }
-            };
-            channel.basicConsume("dish_trade_produce",true,consumer);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+//                }
+//            }
+//            channel.basicConsume("dish_trade_produce",true,consumer);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
     }
 
-    public  Object getObjectFromBytes(byte[] objBytes) throws Exception {
-        if (objBytes == null || objBytes.length == 0) {
-            return null;
-        }
-        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
-        ObjectInputStream oi = new ObjectInputStream(bi);
-        return oi.readObject();
-    }
+//    public  Object getObjectFromBytes(byte[] objBytes) throws Exception {
+//        if (objBytes == null || objBytes.length == 0) {
+//            return null;
+//        }
+//        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
+//        ObjectInputStream oi = new ObjectInputStream(bi);
+//        return oi.readObject();
+//    }
 }

+ 57 - 57
eladmin-system/src/main/java/me/zhengjie/modules/dm/order/rest/OrderSendMsg.java

@@ -28,64 +28,64 @@ public class OrderSendMsg {
     private static String isEable = "0";
 
     //每天10点执行
-    @Scheduled(cron = "0 0 10 * * ?")
-    //异步执行,指定线程池(配置类里配置)
-    @Async("threadPoolTaskExecutor1")
-    public void timer(){
-        isEable = "1";
-    }
+//    @Scheduled(cron = "0 0 10 * * ?")
+//    //异步执行,指定线程池(配置类里配置)
+//    @Async("threadPoolTaskExecutor1")
+//    public void timer(){
+//        isEable = "1";
+//    }
+//
+//    //每五分钟执行一次
+//    @Scheduled(cron = "0 */5 * * * ?")
+//    @Async("threadPoolTaskExecutor2")
+//    public void timer2(){
+//        if(isEable!=null&&isEable=="1"){
+//            send();
+//            isEable = "0";
+//        }
+//    }
 
-    //每五分钟执行一次
-    @Scheduled(cron = "0 */5 * * * ?")
-    @Async("threadPoolTaskExecutor2")
-    public void timer2(){
-        if(isEable!=null&&isEable=="1"){
-            send();
-            isEable = "0";
-        }
-    }
-
-    public void send(){
-        //获取连接
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-//            String userString1 = JSON.toJSONString(dmOrderItemService.queryAll(null));
-//            String userString2 = JSON.toJSONString(dmOrderRecordService.selectOrderRecord());
-            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
-            JSONArray jsonArray = new JSONArray();
-            jsonArray.addAll(map);
-            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
-            String userString2 = JSON.toJSONString(list);
-            Channel channel = connection.createChannel();
-            String exchangeName = "dish_trade_produce";//交换机
-            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
-            String queue1Name = "dish_trade_produce";//队列名称
-            channel.queueDeclare(queue1Name, true, false, false, null);
-            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
-
-//            byte[] bytes = new byte[2];
-//            bytes = userString1.getBytes();
-//            bytes[1] = (Byte)
-//            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString1.getBytes());
-            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
-//            System.out.println("userString1"+userString1);
-            System.out.println("order消费记录:"+userString2);
-
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            e.printStackTrace();
-        }
-    }
+//    public void send(){
+//        //获取连接
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+////            String userString1 = JSON.toJSONString(dmOrderItemService.queryAll(null));
+////            String userString2 = JSON.toJSONString(dmOrderRecordService.selectOrderRecord());
+//            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
+//            JSONArray jsonArray = new JSONArray();
+//            jsonArray.addAll(map);
+//            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
+//            String userString2 = JSON.toJSONString(list);
+//            Channel channel = connection.createChannel();
+//            String exchangeName = "dish_trade_produce";//交换机
+//            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+//            String queue1Name = "dish_trade_produce";//队列名称
+//            channel.queueDeclare(queue1Name, true, false, false, null);
+//            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
+//
+////            byte[] bytes = new byte[2];
+////            bytes = userString1.getBytes();
+////            bytes[1] = (Byte)
+////            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString1.getBytes());
+//            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
+////            System.out.println("userString1"+userString1);
+//            System.out.println("order消费记录:"+userString2);
+//
+//            channel.close();
+//            connection.close();
+//        } catch (IOException | TimeoutException e) {
+//            e.printStackTrace();
+//        }
+//    }
 
     //对象转化为字节码
-    public  byte[] getBytesFromObject(Serializable obj) throws Exception {
-        if (obj == null) {
-            return null;
-        }
-        ByteArrayOutputStream bo = new ByteArrayOutputStream();
-        ObjectOutputStream oo = new ObjectOutputStream(bo);
-        oo.writeObject(obj);
-        return bo.toByteArray();
-    }
+//    public  byte[] getBytesFromObject(Serializable obj) throws Exception {
+//        if (obj == null) {
+//            return null;
+//        }
+//        ByteArrayOutputStream bo = new ByteArrayOutputStream();
+//        ObjectOutputStream oo = new ObjectOutputStream(bo);
+//        oo.writeObject(obj);
+//        return bo.toByteArray();
+//    }
 }

+ 122 - 112
eladmin-system/src/main/java/me/zhengjie/modules/quartz/task/DayPcDataTask.java

@@ -1,112 +1,122 @@
-package me.zhengjie.modules.quartz.task;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.rabbitmq.client.*;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
-import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
-import me.zhengjie.modules.dm.food.service.DmFoodService;
-import me.zhengjie.utils.ConnectionUtil;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-@Slf4j
-@RequiredArgsConstructor
-@Component
-public class DayPcDataTask {
-    private final DmFoodService dmFoodService;
-    private final DmDayPcService dmDayPcService;
-    private static String isEable = "0";
-
-    public void runSend1(){
-        isEable = "1";
-    }
-
-    public void runSend2(){
-        if(isEable!=null&&isEable=="1"){
-            send();
-            isEable = "0";
-        }
-    }
-
-    public void runReceive1(){
-        isEable = "1";
-    }
-
-    public void runReceive2(){
-        if(isEable!=null&&isEable=="1"){
-            receive();
-            isEable = "0";
-        }
-    }
-
-    /**
-     * 生产者
-     */
-    public void send(){
-        //获取连接
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
-            String userString = JSON.toJSONString(dmDayPc);
-            Channel channel = connection.createChannel();
-            String exchangeName = "schedule_consume";//交换机
-            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
-            String queue1Name = "schedule_consume";//队列名称
-            channel.queueDeclare(queue1Name, true, false, false, null);
-            channel.queueBind(queue1Name, exchangeName, "schedule_consume");// 队列绑定hello路由
-            channel.basicPublish(exchangeName, "schedule_consume", null, userString.getBytes());
-            System.out.println("sned排菜:"+dmDayPc);
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 消费者
-     */
-    public void receive(){
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            Channel channel = connection.createChannel();
-            //通过consumer来处理数据
-            Consumer consumer = new DefaultConsumer(channel){
-                @SneakyThrows
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    //body就是从队列中获取的数据
-                    String str = new String(body);
-                    List<DmDayPc>  list = JSONArray.parseArray(str,DmDayPc.class);
-                    for (DmDayPc dmDayPc : list) {
-                        dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
-                        dmDayPcService.create(dmDayPc);
-                        System.out.println("新增food==="+dmDayPc.getFood());
-                        dmFoodService.create(dmDayPc.getFood());
-                    }
-                }
-            };
-            //参数1:接收哪个队列的数据
-            //参数2:消息确认 是否应答,收到消息是否回复
-            //参数3:
-            channel.basicConsume("schedule_produce",true,consumer);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-}
+//package me.zhengjie.modules.quartz.task;
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONArray;
+//import com.rabbitmq.client.*;
+//import lombok.RequiredArgsConstructor;
+//import lombok.SneakyThrows;
+//import lombok.extern.slf4j.Slf4j;
+//import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
+//import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
+//import me.zhengjie.modules.dm.food.service.DmFoodService;
+//import me.zhengjie.utils.ConnectionUtil;
+//import org.springframework.amqp.rabbit.core.RabbitTemplate;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.scheduling.annotation.Async;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//
+//import java.io.ByteArrayOutputStream;
+//import java.io.IOException;
+//import java.io.ObjectOutputStream;
+//import java.io.Serializable;
+//import java.sql.Timestamp;
+//import java.util.List;
+//import java.util.concurrent.TimeoutException;
+//
+//@Slf4j
+//@RequiredArgsConstructor
+//@Component
+//public class DayPcDataTask {
+//    private final DmFoodService dmFoodService;
+//    private final DmDayPcService dmDayPcService;
+//    private static String isEable = "0";
+////
+////    public void runSend1(){
+////        isEable = "1";
+////    }
+////
+////    public void runSend2(){
+////        if(isEable!=null&&isEable=="1"){
+////            send();
+////            isEable = "0";
+////        }
+////    }
+////
+////    public void runReceive1(){
+////        isEable = "1";
+////    }
+////
+//    @Scheduled(cron = "0 */5 * * * ?")
+//    @Async("threadPoolTaskExecutor2")
+//    public void runReceive2(){
+////        if(isEable!=null&&isEable=="1"){
+//            receive();
+////            isEable = "0";
+////        }
+//    }
+////
+////    /**
+////     * 生产者
+////     */
+////    public void send(){
+////        //获取连接
+////        Connection connection = ConnectionUtil.getConnection();
+////        try {
+////            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
+////            String userString = JSON.toJSONString(dmDayPc);
+////            Channel channel = connection.createChannel();
+////            String exchangeName = "schedule_consume";//交换机
+////            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+////            String queue1Name = "schedule_consume";//队列名称
+////            channel.queueDeclare(queue1Name, true, false, false, null);
+////            channel.queueBind(queue1Name, exchangeName, "schedule_consume");// 队列绑定hello路由
+////            channel.basicPublish(exchangeName, "schedule_consume", null, userString.getBytes());
+////            System.out.println("sned排菜:"+dmDayPc);
+////            channel.close();
+////            connection.close();
+////        } catch (IOException | TimeoutException e) {
+////            e.printStackTrace();
+////        }
+////    }
+////
+//    /**
+//     * 消费者
+//     */
+//    public void receive(){
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            Channel channel = connection.createChannel();
+//            //通过consumer来处理数据
+//            Consumer consumer = new DefaultConsumer(channel){
+//                @SneakyThrows
+//                @Override
+//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//                    //body就是从队列中获取的数据
+//                    String str = new String(body);
+//                    if (str==null){
+//                        System.out.println("erropc"+str);
+//                        System.out.println("erropc"+body);
+//                    }else {
+//                        System.out.println("sucesspc"+str);
+//                        System.out.println("sucesspc"+body);
+//                    }
+//                    System.out.println("dingshipc"+str);
+//                    List<DmDayPc>  list = JSONArray.parseArray(str,DmDayPc.class);
+//                    for (DmDayPc dmDayPc : list) {
+//                        dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
+//                        dmDayPcService.create(dmDayPc);
+//                        System.out.println("新增food==="+dmDayPc.getFood());
+//                        dmFoodService.create(dmDayPc.getFood());
+//                    }
+//                }
+//            };
+//            //参数1:接收哪个队列的数据
+//            //参数2:消息确认 是否应答,收到消息是否回复
+//            //参数3:
+//            channel.basicConsume("schedule_produce",true,consumer);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//}

+ 115 - 108
eladmin-system/src/main/java/me/zhengjie/modules/quartz/task/OrderDataTask.java

@@ -1,108 +1,115 @@
-package me.zhengjie.modules.quartz.task;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.rabbitmq.client.*;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
-import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
-import me.zhengjie.utils.ConnectionUtil;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-@Slf4j
-@RequiredArgsConstructor
-@Component
-public class OrderDataTask {
-    private final DmOrderRecordService dmOrderRecordService;
-    private static String isEable = "0";
-
-    public void runSend1(){
-        isEable = "1";
-    }
-
-    public void runSend2(){
-        if(isEable!=null&&isEable=="1"){
-            send();
-            isEable = "0";
-        }
-    }
-
-    public void runReceive1(){
-        isEable = "1";
-    }
-
-    public void runReceive2(){
-        if(isEable!=null&&isEable=="1"){
-            receive();
-            isEable = "0";
-        }
-    }
-
-    /**
-     * 生产者
-     */
-    public void send(){
-        //获取连接
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
-            JSONArray jsonArray = new JSONArray();
-            jsonArray.addAll(map);
-            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
-            String userString2 = JSON.toJSONString(list);
-            Channel channel = connection.createChannel();
-            String exchangeName = "dish_trade_produce";//交换机
-            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
-            String queue1Name = "dish_trade_produce";//队列名称
-            channel.queueDeclare(queue1Name, true, false, false, null);
-            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
-            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
-            System.out.println("消费记录:"+userString2);
-
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 消费者
-     */
-    public void receive(){
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            Channel channel = connection.createChannel();
-            //通过consumer来处理数据
-            Consumer consumer = new DefaultConsumer(channel){
-                @SneakyThrows
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    //body就是从队列中获取的数据
-                    String str = new String(body);
-                    List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str,DmExpenseCalendar.class);
-//                    DmOrderRecord dmOrderRecord = (DmOrderRecord) getObjectFromBytes(body);
-//                    dmOrderItemService.create(dmOrderItem);
-//                    dmOrderRecordService.create(dmOrderRecord);
-                    for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
-//                        dmExpenseCalendar.setCreateDate(new Timestamp(System.currentTimeMillis()));
-                        dmOrderRecordService.createOrderItem(dmExpenseCalendar);
-                        dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
-                    }
-                }
-            };
-            channel.basicConsume("dish_trade_produce",true,consumer);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-}
+//package me.zhengjie.modules.quartz.task;
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONArray;
+//import com.rabbitmq.client.*;
+//import lombok.RequiredArgsConstructor;
+//import lombok.SneakyThrows;
+//import lombok.extern.slf4j.Slf4j;
+//import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
+//import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
+//import me.zhengjie.utils.ConnectionUtil;
+//import org.springframework.scheduling.annotation.Async;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//
+//import java.io.IOException;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.concurrent.TimeoutException;
+//
+//@Slf4j
+//@RequiredArgsConstructor
+//@Component
+//public class OrderDataTask {
+//    private final DmOrderRecordService dmOrderRecordService;
+//    private static String isEable = "0";
+//
+////    public void runSend1(){
+////        isEable = "1";
+////    }
+////
+////    public void runSend2(){
+////        if(isEable!=null&&isEable=="1"){
+////            send();
+////            isEable = "0";
+////        }
+////    }
+//
+////    public void runReceive1(){
+////        isEable = "1";
+////    }
+//
+//    @Scheduled(cron = "0 */5 * * * ?")
+//    @Async("threadPoolTaskExecutor2")
+//    public void runReceive2(){
+////        if(isEable!=null&&isEable=="1"){
+//            receive();
+////            isEable = "0";
+////        }
+//    }
+//
+//    /**
+//     * 生产者
+//     */
+////    public void send(){
+////        //获取连接
+////        Connection connection = ConnectionUtil.getConnection();
+////        try {
+////            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
+////            JSONArray jsonArray = new JSONArray();
+////            jsonArray.addAll(map);
+////            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
+////            String userString2 = JSON.toJSONString(list);
+////            Channel channel = connection.createChannel();
+////            String exchangeName = "dish_trade_produce";//交换机
+////            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+////            String queue1Name = "dish_trade_produce";//队列名称
+////            channel.queueDeclare(queue1Name, true, false, false, null);
+////            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
+////            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
+////            System.out.println("消费记录:"+userString2);
+////
+////            channel.close();
+////            connection.close();
+////        } catch (IOException | TimeoutException e) {
+////            e.printStackTrace();
+////        }
+////    }
+//
+//    /**
+//     * 消费者
+//     */
+//    public void receive(){
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            Channel channel = connection.createChannel();
+//            //通过consumer来处理数据
+//            Consumer consumer = new DefaultConsumer(channel){
+//                @SneakyThrows
+//                @Override
+//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//                    //body就是从队列中获取的数据
+//                    String str = new String(body);
+//                    if (str==null){
+//                        System.out.println("erroOrder"+str);
+//                        System.out.println("erroOrder"+body);
+//                    }else {
+//                        System.out.println("sucessOrder"+str);
+//                        System.out.println("sucessOreder"+body);
+//                    }
+//                    System.out.println("dingshiorder"+str);
+////                    List<>
+//                    List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str,DmExpenseCalendar.class);
+//                    for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
+//                        dmOrderRecordService.createOrderItem(dmExpenseCalendar);
+//                        dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
+//                    }
+//                }
+//            };
+//            channel.basicConsume("dish_trade_produce",true,consumer);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//}

+ 30 - 0
eladmin-system/src/main/java/me/zhengjie/modules/test/TestController.java

@@ -0,0 +1,30 @@
+package me.zhengjie.modules.test;
+
+
+import me.zhengjie.config.rabbitmq.cuisine.CuisineConsumer;
+import me.zhengjie.config.rabbitmq.test.SendTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/api/send")
+public class TestController {
+
+    @Autowired
+    public SendTemplate sendTemplate;
+
+    @GetMapping("/send")
+    public void testSend(){
+        Map<String,Object> map = new HashMap<>();
+        map.put("seq",1);
+        map.put("date","2021-12-9");
+        map.put("shopname","111");
+        sendTemplate.sendMessage(map);
+        System.out.println("消息發送成功"+"success");
+    }
+}

+ 2 - 2
eladmin-system/src/main/java/me/zhengjie/modules/thirdparty/v1/OpenApiController.java

@@ -56,7 +56,7 @@ public class OpenApiController {
     @ApiOperation("新增服务评价")
     public BaseResponse<Object> createApp(@RequestBody QueryPageParams<DmServicePj> resources){
         SecurityUtils.CheckApiAuth(resources);
-        dmServicePjService.createApp(resources.getQuery());
+//        dmServicePjService.createApp(resources.getQuery());
         return new BaseResponse<>(dmServicePjService.createApp(resources.getQuery()));
     }
 
@@ -74,7 +74,7 @@ public class OpenApiController {
     @ApiOperation("新增菜品评价")
     public BaseResponse<Object> createFoodPj(@RequestBody QueryPageParams<DmFoodPj> resources){
         SecurityUtils.CheckApiAuth(resources);
-        dmFoodPjService.createApp(resources.getQuery());
+//        dmFoodPjService.createApp(resources.getQuery());
         return new BaseResponse<>(dmFoodPjService.createApp(resources.getQuery()));
     }
 

+ 16 - 2
eladmin-system/src/main/resources/config/application.yml

@@ -3,6 +3,8 @@ server:
   port: 1537
 
 spring:
+  main:
+    allow-bean-definition-overriding: true # 意思是后来发现的bean会覆盖之前相同名称的bean
   freemarker:
     check-template-location: false
   profiles:
@@ -13,6 +15,17 @@ spring:
     redis:
       repositories:
         enabled: false
+  rabbitmq:
+    host: 10.208.19.5
+    port: 11673
+    username: rabbit
+    password: Rabbit!@#$123
+
+#    host: 127.0.0.1
+#    port: 5672
+#    username: admin
+#    password: admin
+#    virtualHost: /
 
   #配置 Jpa
   jpa:
@@ -36,11 +49,12 @@ spring:
     timeout: 5000ms
 #    timeout: 10000ms
 
-#  activiti:
+
+  activiti:
 #    #设置成true以后,会在项目启动的时候自动创建Activiti表结构,首次数据表建好后建议改成false
 #    database-schema-update: false
 #    #是否自动部署流程文件(当指定文件夹下无资源文件/不需要部署的时候设置为false)
-#    check-process-definitions: true
+    check-process-definitions: false
 #    #保存历史数据得级别,分为none、activity、audit(默认)、full
 #    history-level: full
 #    #修改默认的流程文件存储位置

+ 131 - 128
eladmin-system/src/test/java/me/zhengjie/TestRabbit.java

@@ -1,128 +1,131 @@
-package me.zhengjie;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.rabbitmq.client.*;
-import lombok.SneakyThrows;
-import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
-import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
-import me.zhengjie.modules.dm.food.service.DmFoodService;
-import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
-import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
-import me.zhengjie.utils.ConnectionUtil;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-public class TestRabbit {
-    DmFoodService dmFoodService;
-    DmDayPcService dmDayPcService;
-    DmOrderRecordService dmOrderRecordService;
-
-    @Test
-    public void FoodPj(){
-        //获取连接
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
-            String userString = JSON.toJSONString(dmDayPc);
-            Channel channel = connection.createChannel();
-            String exchangeName = "schedule_consume";//交换机
-            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
-            String queue1Name = "schedule_consume";//队列名称
-            channel.queueDeclare(queue1Name, true, false, false, null);
-            channel.queueBind(queue1Name, exchangeName, "schedule_consume");// 队列绑定hello路由
-            channel.basicPublish(exchangeName, "schedule_consume", null, userString.getBytes());
-            System.out.println("sned排菜:"+dmDayPc);
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void receive(){
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            Channel channel = connection.createChannel();
-            //通过consumer来处理数据
-            Consumer consumer = new DefaultConsumer(channel){
-                @SneakyThrows
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    //body就是从队列中获取的数据
-                    String str = new String(body);
-                    List<DmDayPc>  list = JSONArray.parseArray(str,DmDayPc.class);
-                    for (DmDayPc dmDayPc : list) {
-                        dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
-                        dmDayPcService.create(dmDayPc);
-                        System.out.println("新增food==="+dmDayPc.getFood());
-                        dmFoodService.create(dmDayPc.getFood());
-                    }
-                }
-            };
-            //参数1:接收哪个队列的数据
-            //参数2:消息确认 是否应答,收到消息是否回复
-            //参数3:
-            channel.basicConsume("schedule_produce",true,consumer);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void send(){
-        //获取连接
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
-            JSONArray jsonArray = new JSONArray();
-            jsonArray.addAll(map);
-            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
-            String userString2 = JSON.toJSONString(list);
-            Channel channel = connection.createChannel();
-            String exchangeName = "dish_trade_produce";//交换机
-            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
-            String queue1Name = "dish_trade_produce";//队列名称
-            channel.queueDeclare(queue1Name, true, false, false, null);
-            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
-            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
-            System.out.println("消费记录:"+userString2);
-
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 消费者
-     */
-    public void receive1(){
-        Connection connection = ConnectionUtil.getConnection();
-        try {
-            Channel channel = connection.createChannel();
-            //通过consumer来处理数据
-            Consumer consumer = new DefaultConsumer(channel){
-                @SneakyThrows
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    //body就是从队列中获取的数据
-                    String str = new String(body);
-                    List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str,DmExpenseCalendar.class);
-                    for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
-                        dmOrderRecordService.createOrderItem(dmExpenseCalendar);
-                        dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
-                    }
-                }
-            };
-            channel.basicConsume("dish_trade_produce",true,consumer);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-}
+//package me.zhengjie;
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONArray;
+//import com.rabbitmq.client.*;
+//import lombok.SneakyThrows;
+//import me.zhengjie.modules.dm.daypc.domain.DmDayPc;
+//import me.zhengjie.modules.dm.daypc.service.DmDayPcService;
+//import me.zhengjie.modules.dm.food.service.DmFoodService;
+//import me.zhengjie.modules.dm.order.service.DmOrderRecordService;
+//import me.zhengjie.modules.dm.order.service.dto.DmExpenseCalendar;
+//import me.zhengjie.utils.ConnectionUtil;
+//import org.junit.Test;
+//
+//import java.io.IOException;
+//import java.sql.Timestamp;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.concurrent.TimeoutException;
+//
+//public class TestRabbit {
+//    DmFoodService dmFoodService;
+//    DmDayPcService dmDayPcService;
+//    DmOrderRecordService dmOrderRecordService;
+//
+//    @Test
+//    public void FoodPj(){
+//        //获取连接
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            List<DmDayPc> dmDayPc = dmDayPcService.foodRepository();
+//            String userString = JSON.toJSONString(dmDayPc);
+//            Channel channel = connection.createChannel();
+//            String exchangeName = "schedule_consume";//交换机
+//            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+//            String queue1Name = "schedule_consume";//队列名称
+//            channel.queueDeclare(queue1Name, true, false, false, null);
+//            channel.queueBind(queue1Name, exchangeName, "schedule_consume");// 队列绑定hello路由
+//            channel.basicPublish(exchangeName, "schedule_consume", null, userString.getBytes());
+//            System.out.println("sned排菜:"+dmDayPc);
+//            channel.close();
+//            connection.close();
+//        } catch (IOException | TimeoutException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    @Test
+//    public void receive(){
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            Channel channel = connection.createChannel();
+//            //通过consumer来处理数据
+//            Consumer consumer = new DefaultConsumer(channel){
+//                @SneakyThrows
+//                @Override
+//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//                    //body就是从队列中获取的数据
+//                    String str = new String(body);
+//                    List<DmDayPc>  list = JSONArray.parseArray(str,DmDayPc.class);
+//                    for (DmDayPc dmDayPc : list) {
+//                        dmDayPc.setPcdate(new Timestamp(System.currentTimeMillis()));
+//                        dmDayPcService.create(dmDayPc);
+//                        System.out.println("新增food==="+dmDayPc.getFood());
+//                        dmFoodService.create(dmDayPc.getFood());
+//                    }
+//                }
+//            };
+//            //参数1:接收哪个队列的数据
+//            //参数2:消息确认 是否应答,收到消息是否回复
+//            //参数3:
+//            channel.basicConsume("schedule_produce",true,consumer);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    @Test
+//    public void send(){
+//        //获取连接
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            List<Map<String,Object>> map = dmOrderRecordService.selectOrderRecord();
+//            JSONArray jsonArray = new JSONArray();
+//            jsonArray.addAll(map);
+//            List<DmExpenseCalendar> list = jsonArray.toJavaList(DmExpenseCalendar.class);
+//            String userString2 = JSON.toJSONString(list);
+//            Channel channel = connection.createChannel();
+//            String exchangeName = "dish_trade_produce";//交换机
+//            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true,false, null);
+//            String queue1Name = "dish_trade_produce";//队列名称
+//            channel.queueDeclare(queue1Name, true, false, false, null);
+//            channel.queueBind(queue1Name, exchangeName, "dish_trade_produce");// 队列绑定hello路由
+//            channel.basicPublish(exchangeName, "dish_trade_produce", null, userString2.getBytes());
+//            System.out.println("消费记录:"+userString2);
+//
+//            channel.close();
+//            connection.close();
+//        } catch (IOException | TimeoutException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    /**
+//     * 消费者
+//     */
+//    @Test
+//    public void receive1(){
+//        Connection connection = ConnectionUtil.getConnection();
+//        try {
+//            Channel channel = connection.createChannel();
+//            //通过consumer来处理数据
+//            Consumer consumer = new DefaultConsumer(channel){
+//                @SneakyThrows
+//                @Override
+//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//                    //body就是从队列中获取的数据
+//                    String str = new String(body);
+//                    List<DmExpenseCalendar> dmOrderItem = JSONArray.parseArray(str,DmExpenseCalendar.class);
+//                    for (DmExpenseCalendar dmExpenseCalendar : dmOrderItem) {
+//                        dmOrderRecordService.createOrderItem(dmExpenseCalendar);
+//                        dmOrderRecordService.createOrderRecord(dmExpenseCalendar);
+//                    }
+//                }
+//            };
+//            channel.basicConsume("dish_trade_produce",true,consumer);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//}