浏览代码

修改定时任务逻辑可配置租户和mqtt推送数据

fuyuchuan 5 天之前
父节点
当前提交
513aba81f2

+ 0 - 3
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttInConfig.java

@@ -14,13 +14,11 @@ import org.springframework.messaging.MessageChannel;
  * @author han
  * @date 2025/03/20 14:30
  */
-@Configuration
 public class MqttInConfig {
     public MqttBaseConfig mqttBaseConfig;
 
     public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
 
-    @Bean(name = CHANNEL_NAME_INPUT)
     public MessageChannel mqttInputChannel() {
         return new DirectChannel();
     }
@@ -31,7 +29,6 @@ public class MqttInConfig {
      *
      * @return
      */
-    @Bean
     public MessageProducer inbound() {
         String msgTopic = mqttBaseConfig.getMsgTopic();
         if (msgTopic == null || msgTopic.trim().isEmpty()) {

+ 13 - 7
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -8,11 +8,13 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.integration.annotation.MessagingGateway;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
 
 import java.util.Map;
 
@@ -20,7 +22,7 @@ import java.util.Map;
  * @author han
  * @date 2025/03/20 14:31
  */
-@Configuration
+@Component
 public class MqttOutConfig {
     public MqttBaseConfig mqttBaseConfig;
 
@@ -40,24 +42,28 @@ public class MqttOutConfig {
         return new DirectChannel();
     }
 
+
     /**
      * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     * 注意:这个方法不再由Spring自动创建,而是在需要时手动创建
      *
-     * @return
+     * @param username MQTT用户名
+     * @param factory MQTT客户端工厂
+     * @return MessageHandler实例
      */
-    @Bean(name = MESSAGE_NAME)
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler outbound() {
+    public MessageHandler outbound(String username, DefaultMqttPahoClientFactory factory) {
         // 根据username动态生成client-id,格式:mqttx-username
-        String clientId = "mqttx-" + mqttBaseConfig.getUsername();
+        String clientId = "mqttx-" + username;
         MqttPahoMessageHandler messageHandler = 
-                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+                new MqttPahoMessageHandler(clientId, factory);
         //如果设置成true,发送消息时将不会阻塞。
         messageHandler.setAsync(true);
         messageHandler.setDefaultTopic(DEFAULT_TOPIC);
         return messageHandler;
     }
 
+    // 注意:这个接口需要被Spring扫描到,所以我们保留@MessagingGateway注解
+    // 但是我们移除了其他@Bean注解,以避免Spring自动创建时出现NullPointerException
     @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
     public interface MqttGateway {
         /**

+ 11 - 36
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -898,47 +898,22 @@ public class IotDataTransferService {
      */
     private void createMqttConnection(String username, String password) {
         try {
-            // 创建MqttBaseConfig实例并设置参数
-            MqttBaseConfig mqttBaseConfig = new MqttBaseConfig();
-            // 手动设置所有MQTT配置参数
-            mqttBaseConfig.setUsername(username);
-            mqttBaseConfig.setPassword(password);
-            mqttBaseConfig.setHostUrl(MQTT_URL);
-            mqttBaseConfig.setMsgTopic(MQTT_TOPIC);
-            mqttBaseConfig.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-            mqttBaseConfig.setCompletionTimeout(COMPLETION_TIMEOUT);
-
-            // 创建MqttOutConfig实例
-            MqttOutConfig mqttOutConfig = new MqttOutConfig();
-            mqttOutConfig.mqttBaseConfig = mqttBaseConfig;
-
-            // 使用Spring的ApplicationContext手动创建Bean
-            ApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
-            if (context == null) {
-                throw new IllegalStateException("ApplicationContext未找到,无法创建MQTT相关Bean");
-            }
-
-            // 注册MqttBaseConfig为Bean
-            ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) context).getBeanFactory();
-            beanFactory.registerSingleton("mqttBaseConfig", mqttBaseConfig);
-
-            // 创建并注册mqttClientFactory
+            // 直接创建和配置MQTT客户端,不依赖Spring自动配置
+            // 1. 创建MqttConnectOptions
             MqttConnectOptions options = new MqttConnectOptions();
-            options.setServerURIs(new String[]{mqttBaseConfig.getHostUrl()});
-            options.setUserName(mqttBaseConfig.getUsername());
-            options.setPassword(mqttBaseConfig.getPassword().toCharArray());
-            options.setKeepAliveInterval(mqttBaseConfig.getKeepAliveInterval());
+            options.setServerURIs(new String[]{MQTT_URL});
+            options.setUserName(username);
+            options.setPassword(password.toCharArray());
+            options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
 
+            // 2. 创建DefaultMqttPahoClientFactory
             DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
             factory.setConnectionOptions(options);
-            beanFactory.registerSingleton("mqttClientFactory", factory);
-
-            // 注册MqttOutConfig为Bean
-            beanFactory.registerSingleton("mqttOutConfig", mqttOutConfig);
 
-            // 获取mqttGateway
-            this.mqttGateway = context.getBean(MqttOutConfig.MqttGateway.class);
-            log.info("MQTT连接创建成功");
+            // 3. 由于@MessagingGateway需要Spring容器管理,我们需要确保Spring已经正确初始化了网关
+            // 这里我们不手动创建网关,而是依赖Spring自动创建,但是需要确保MqttOutConfig类不再被Spring自动配置
+            // 所以我们已经移除了MqttOutConfig和MqttInConfig类上的@Configuration注解和@Bean注解
+            log.info("MQTT连接参数配置完成,用户名:{}", username);
         } catch (Exception e) {
             log.error("创建MQTT连接失败: {}", e.getMessage(), e);
             throw new RuntimeException("创建MQTT连接失败", e);