Bläddra i källkod

修改定时任务逻辑可配置租户和mqtt推送数据

fuyuchuan 6 dagar sedan
förälder
incheckning
37bfc2ab29

+ 2 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/RuoYiSystemApplication.java

@@ -11,6 +11,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.core.env.Environment;
+import org.springframework.integration.config.EnableIntegration;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 import java.net.InetAddress;
@@ -27,6 +28,7 @@ import java.net.UnknownHostException;
 @EnableFeignClients(basePackages = "com.usky")
 @MapperScan(value = "com.usky.cdi.mapper")
 @ComponentScan("com.usky")
+@EnableIntegration // 启用Spring Integration
 @SpringBootApplication
 public class RuoYiSystemApplication
 {

+ 32 - 8
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -1,6 +1,7 @@
 package com.usky.cdi.service.config.mqtt;
 
 import com.alibaba.fastjson.JSONObject;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -44,16 +45,18 @@ public class MqttOutConfig {
 
 
     /**
-     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
-     * 注意:这个方法不再由Spring自动创建,而是在需要时手动创建
-     *
-     * @param username MQTT用户名
+     * MQTT消息发送处理器
+     * 注意:这个方法会被Spring自动创建,用于处理mqttOutboundChannel通道上的消息
+     * 
      * @param factory MQTT客户端工厂
      * @return MessageHandler实例
      */
-    public MessageHandler outbound(String username, DefaultMqttPahoClientFactory factory) {
-        // 根据username动态生成client-id,格式:mqttx-username
-        String clientId = "mqttx-" + username;
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound(DefaultMqttPahoClientFactory factory) {
+        // 注意:这里的client-id暂时使用固定值,因为username在启动时还不可用
+        // 实际使用时,会在createMqttConnection方法中重新设置
+        String clientId = "mqttx-" + System.currentTimeMillis();
         MqttPahoMessageHandler messageHandler = 
                 new MqttPahoMessageHandler(clientId, factory);
         //如果设置成true,发送消息时将不会阻塞。
@@ -61,9 +64,30 @@ public class MqttOutConfig {
         messageHandler.setDefaultTopic(DEFAULT_TOPIC);
         return messageHandler;
     }
+    
+    /**
+     * MQTT客户端工厂
+     * 注意:这个方法会被Spring自动创建,用于创建MQTT客户端
+     * 
+     * @return DefaultMqttPahoClientFactory实例
+     */
+    @Bean
+    public DefaultMqttPahoClientFactory mqttClientFactory() {
+        // 创建默认的MqttPahoClientFactory
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        
+        // 设置默认的MqttConnectOptions,确保serverURIs不为null
+        // 实际使用时,会在createMqttConnection方法中重新配置
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"}); // 设置默认的服务器地址
+        options.setKeepAliveInterval(60); // 设置默认的心跳间隔
+        factory.setConnectionOptions(options);
+        
+        return factory;
+    }
 
     // 注意:这个接口需要被Spring扫描到,所以我们保留@MessagingGateway注解
-    // 但是我们移除了其他@Bean注解,以避免Spring自动创建时出现NullPointerException
+    // Spring会自动创建这个接口的实现类
     @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
     public interface MqttGateway {
         /**

+ 34 - 14
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -22,6 +22,8 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.MessageChannel;
 import org.springframework.stereotype.Service;
 import org.springframework.web.context.ContextLoader;
 
@@ -46,7 +48,11 @@ import java.util.stream.Collectors;
 public class IotDataTransferService {
 
     private MqttOutConfig.MqttGateway mqttGateway;
-
+    
+    // 注入ApplicationContext,确保总是能获取到
+    @Autowired
+    private ApplicationContext context;
+    
     // MQTT连接相关配置
     private static final String MQTT_URL = "ssl://114.80.201.143:8883";
     private static final String MQTT_TOPIC = "iotInfo/+";
@@ -898,25 +904,39 @@ public class IotDataTransferService {
      */
     private void createMqttConnection(String username, String password) {
         try {
-            // 直接创建和配置MQTT客户端,不依赖Spring自动配置
-            // 1. 创建MqttConnectOptions
+            // 使用注入的ApplicationContext获取已有的mqttGateway实例
+            // 因为我们保留了@MessagingGateway注解,Spring会自动创建这个实例
+            if (this.context == null) {
+                throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
+            }
+            
+            // 1. 获取mqttGateway实例
+            this.mqttGateway = this.context.getBean(MqttOutConfig.MqttGateway.class);
+            if (this.mqttGateway == null) {
+                throw new IllegalStateException("MQTT Gateway未找到,无法发送消息");
+            }
+            
+            // 2. 获取现有的mqttClientFactory实例
+            DefaultMqttPahoClientFactory mqttClientFactory = this.context.getBean(DefaultMqttPahoClientFactory.class);
+            if (mqttClientFactory == null) {
+                throw new IllegalStateException("MQTT Client Factory未找到,无法创建MQTT连接");
+            }
+            
+            // 3. 创建并配置MqttConnectOptions
             MqttConnectOptions options = new MqttConnectOptions();
             options.setServerURIs(new String[]{MQTT_URL});
             options.setUserName(username);
             options.setPassword(password.toCharArray());
             options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-
-            // 2. 创建DefaultMqttPahoClientFactory
-            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-            factory.setConnectionOptions(options);
-
-            // 3. 由于@MessagingGateway需要Spring容器管理,我们需要确保Spring已经正确初始化了网关
-            // 这里我们不手动创建网关,而是依赖Spring自动创建,但是需要确保MqttOutConfig类不再被Spring自动配置
-            // 所以我们已经移除了MqttOutConfig和MqttInConfig类上的@Configuration注解和@Bean注解
-            log.info("MQTT连接参数配置完成,用户名:{}", username);
+            
+            // 4. 更新mqttClientFactory的连接选项
+            mqttClientFactory.setConnectionOptions(options);
+            
+            log.info("MQTT Gateway初始化成功,用户名:{}", username);
+            log.info("MQTT连接配置完成,服务器地址:{},客户端ID:mqttx-{}", MQTT_URL, username);
         } catch (Exception e) {
-            log.error("创建MQTT连接失败: {}", e.getMessage(), e);
-            throw new RuntimeException("创建MQTT连接失败", e);
+            log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
+            throw new RuntimeException("初始化MQTT连接失败", e);
         }
     }
 

+ 6 - 4
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,14 +26,16 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    // @Scheduled(fixedDelay = 25 * 60 * 1000, initialDelay = 0)
+    // @Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
     // public void scheduledDeviceDataSync() {
-    //     Integer tenantId = 1206;
-    //     Long engineeringId = 3101170019L;
+    //     Integer tenantId = 1205;
+    //     Long engineeringId = 3101070011L;
+    //     String username = "3101070011";
+    //     String password = "5RqhJ7VG";
     //     log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
     //
     //     try {
-    //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId);
+    //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);
     //     } catch (Exception e) {
     //         log.error("定时任务执行设备数据同步失败:{}", e.getMessage(), e);
     //     }