فهرست منبع

优化数据推送定时任务代码

fuyuchuan 1 روز پیش
والد
کامیت
224abc4047

+ 27 - 72
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -9,6 +9,7 @@ import com.usky.cdi.mapper.DmpDeviceMapper;
 import com.usky.cdi.mapper.DmpProductMapper;
 import com.usky.cdi.service.config.mqtt.MqttOutConfig;
 import com.usky.cdi.service.enums.EnvMonitorMqttTopic;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
 import com.usky.cdi.service.util.DeviceDataQuery;
 import com.usky.cdi.service.util.SnowflakeIdGenerator;
 import com.usky.cdi.service.vo.IotDataTransferVO;
@@ -17,17 +18,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.support.DefaultListableBeanFactory;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.dsl.IntegrationFlow;
 import org.springframework.integration.dsl.IntegrationFlows;
-import org.springframework.integration.endpoint.EventDrivenConsumer;
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-import org.springframework.messaging.SubscribableChannel;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
@@ -50,6 +49,9 @@ public class IotDataTransferService {
 
     private MqttOutConfig.MqttGateway mqttGateway;
 
+    @Autowired
+    private MqttConnectionTool mqttConnectionTool;
+
     // 注入ApplicationContext,确保总是能获取到
     @Autowired
     private ApplicationContext context;
@@ -61,7 +63,7 @@ public class IotDataTransferService {
     private static final int COMPLETION_TIMEOUT = 5000;
 
     // 存储每个任务的MQTT客户端工厂和网关
-    private final Map<String, MqttOutConfig.MqttGateway> mqttGatewayMap = new ConcurrentHashMap<>();
+    private final Map<String, MqttConnectionTool.MqttGateway> mqttGatewayMap = new ConcurrentHashMap<>();
     private final Map<String, DefaultMqttPahoClientFactory> mqttClientFactoryMap = new ConcurrentHashMap<>();
 
     private SnowflakeIdGenerator idGenerator;
@@ -768,6 +770,7 @@ public class IotDataTransferService {
      * @param password MQTT密码
      */
     public void synchronizeDeviceData(Integer tenantId, Long engineeringId, String username, String password) {
+        log.info("用户名:{},密码:{}", username, password);
         // 参数校验
         if (engineeringId == null || username == null || password == null) {
             log.error("工程ID、MQTT用户名或密码不能为空");
@@ -914,75 +917,21 @@ public class IotDataTransferService {
      * @param username MQTT用户名
      * @param password MQTT密码
      */
-    public void createMqttConnection(String username, String password) {
+    public synchronized void createMqttConnection(String username, String password) {
+        log.info("手动创建/刷新 MQTT 连接(含动态 clientId),用户名:{},密码:{}", username, password);
         try {
-            // 使用注入的ApplicationContext获取或创建MQTT连接
-            if (this.context == null) {
-                throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
+            // 检查MqttConnectionTool是否已注入
+            if (this.mqttConnectionTool == null) {
+                throw new IllegalStateException("MqttConnectionTool未注入,无法获取MQTT Gateway");
             }
 
-            // 检查是否已经为该用户创建了MQTT连接
-            if (!mqttGatewayMap.containsKey(username)) {
-                synchronized (this) {
-                    // 双重检查锁定
-                    if (!mqttGatewayMap.containsKey(username)) {
-                        // 1. 创建新的MQTT客户端工厂
-                        DefaultMqttPahoClientFactory mqttClientFactory = new DefaultMqttPahoClientFactory();
-
-                        // 2. 创建并配置MqttConnectOptions
-                        MqttConnectOptions options = new MqttConnectOptions();
-                        options.setServerURIs(new String[]{MQTT_URL});
-                        options.setUserName(username);
-                        options.setPassword(password.toCharArray());
-                        options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-
-                        // 3. 设置连接选项
-                        mqttClientFactory.setConnectionOptions(options);
-
-                        // 4. 创建唯一的客户端ID
-                        String clientId = "mqttx-" + username;
-
-                        // 5. 创建MQTT消息处理器
-                        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory);
-                        messageHandler.setAsync(true);
-                        messageHandler.setDefaultTopic("testTopic");
-
-                        // 6. 获取消息通道
-                        MessageChannel mqttOutboundChannel = context.getBean("mqttOutboundChannel", MessageChannel.class);
-
-                        // 7. 注册消息处理器到通道
-                        IntegrationFlow flow = IntegrationFlows.from(mqttOutboundChannel)
-                                .handle(messageHandler)
-                                .get();
-
-                        // 8. 注册IntegrationFlow
-                        ((GenericApplicationContext) context).registerBean("mqttFlow-" + username, IntegrationFlow.class, () -> flow);
-                        ((GenericApplicationContext) context).refresh();
+            // 使用MqttConnectionTool创建或刷新MQTT连接
+            MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(username, password);
 
-                        // 9. 获取MQTT Gateway实例
-                        MqttOutConfig.MqttGateway mqttGateway = context.getBean(MqttOutConfig.MqttGateway.class);
-
-                        // 10. 存储到映射中
-                        mqttGatewayMap.put(username, mqttGateway);
-                        mqttClientFactoryMap.put(username, mqttClientFactory);
-
-                        log.info("MQTT连接创建成功,用户名:{},客户端ID:{}", username, clientId);
-                    }
-                }
-            } else {
-                // 连接已存在,更新密码
-                DefaultMqttPahoClientFactory clientFactory = mqttClientFactoryMap.get(username);
-                if (clientFactory != null) {
-                    MqttConnectOptions options = new MqttConnectOptions();
-                    options.setServerURIs(new String[]{MQTT_URL});
-                    options.setUserName(username);
-                    options.setPassword(password.toCharArray());
-                    options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-                    clientFactory.setConnectionOptions(options);
-                    log.info("MQTT连接密码已更新,用户名:{}", username);
-                }
-            }
-        } catch (Exception e) {
+            // 存储到映射中
+            mqttGatewayMap.put(username, gateway);
+            log.info("MQTT连接创建/刷新成功,用户名:{}", username);
+            } catch (Exception e) {
             log.error("初始化MQTT连接失败: {}", e.getMessage(), e);
             throw new RuntimeException("初始化MQTT连接失败", e);
         }
@@ -994,7 +943,13 @@ public class IotDataTransferService {
      * @return 是否初始化
      */
     private boolean validateMqttGateway(String username) {
-        if (username == null || !mqttGatewayMap.containsKey(username) || mqttGatewayMap.get(username) == null) {
+        if (username == null) {
+            log.warn("MQTT Gateway未初始化,无法发送消息,用户名:null");
+            return false;
+        }
+        // 一次性获取网关实例,避免竞态条件
+        MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
+        if (gateway == null) {
             log.warn("MQTT Gateway未初始化,无法发送消息,用户名:{}", username);
             return false;
         }
@@ -1027,7 +982,7 @@ public class IotDataTransferService {
         String json = JSON.toJSONString(vo);
         String topic = topicEnum.getTopic();
         // 不再记录每条数据的详情,只记录发送操作
-        MqttOutConfig.MqttGateway gateway = mqttGatewayMap.get(username);
+        MqttConnectionTool.MqttGateway gateway = mqttGatewayMap.get(username);
         if (gateway != null) {
             gateway.sendToMqtt(topic, json);
         } else {

+ 230 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/mqtt/MqttConnectionTool.java

@@ -0,0 +1,230 @@
+package com.usky.cdi.service.mqtt;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.endpoint.EventDrivenConsumer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.SubscribableChannel;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @author fyc
+ * @email yuchuan.fu@chinausky.com
+ * @date 2025/12/22
+ * 动态 MQTT 连接工具类
+ * 用法:注入后调用 connectOrRefresh(...) 即可
+ *
+ */
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class MqttConnectionTool {
+
+    private final GenericApplicationContext context;
+
+    /* 默认 topic,可外部再 set */
+    private String defaultTopic = "testTopic";
+
+    /* 默认 keep-alive,可外部再 set */
+    private int keepAlive = 60;
+
+    private static final String MQTT_URL = "ssl://114.80.201.143:8883";
+
+    /**
+     * 存储每个用户名对应的连接信息
+     */
+    private final Map<String, ConnectionInfo> connectionMap = new ConcurrentHashMap<>();
+
+    /**
+     * 连接信息内部类
+     */
+    private static class ConnectionInfo {
+        private final String handlerBeanName;
+        private final String consumerBeanName;
+        private final String factoryBeanName;
+        private final String gatewayBeanName;
+
+        public ConnectionInfo(String username) {
+            this.handlerBeanName = "mqttHandler_" + username;
+            this.consumerBeanName = "mqttConsumer_" + username;
+            this.factoryBeanName = "mqttFactory_" + username;
+            this.gatewayBeanName = "mqttGateway_" + username;
+        }
+    }
+
+    /**
+     * 一键创建/刷新连接
+     *
+     * @param username  用户名
+     * @param password  密码
+     * @return 可直接发消息的 MqttGateway
+     */
+    public synchronized MqttGateway connectOrRefresh(String username, String password) {
+        Assert.notNull(username, "username cannot be null");
+        Assert.notNull(password, "password cannot be null");
+
+        String clientId = "mqttx-" + username;
+        try {
+            /* 1. 获取或创建连接信息 */
+            ConnectionInfo connectionInfo = connectionMap.computeIfAbsent(username, ConnectionInfo::new);
+
+            /* 2. 创建或更新专属工厂 */
+            DefaultMqttPahoClientFactory factory;
+            if (context.containsBean(connectionInfo.factoryBeanName)) {
+                factory = context.getBean(connectionInfo.factoryBeanName, DefaultMqttPahoClientFactory.class);
+                factory.setConnectionOptions(buildOptions(username, password, MQTT_URL));
+                log.info("已更新 MQTT 客户端工厂 -> {}", connectionInfo.factoryBeanName);
+            } else {
+                factory = new DefaultMqttPahoClientFactory();
+                factory.setConnectionOptions(buildOptions(username, password, MQTT_URL));
+                context.registerBean(connectionInfo.factoryBeanName, DefaultMqttPahoClientFactory.class, () -> factory);
+                log.info("已创建 MQTT 客户端工厂 -> {}", connectionInfo.factoryBeanName);
+            }
+
+            /* 3. 移除旧的 Handler 和 Consumer */
+            removeOldConnection(connectionInfo);
+
+            /* 4. 创建新的 Handler */
+            MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, factory);
+            handler.setAsync(true);
+            handler.setDefaultTopic(defaultTopic);
+            handler.afterPropertiesSet();
+
+            /* 5. 注册新的 Handler */
+            context.registerBean(connectionInfo.handlerBeanName, MqttPahoMessageHandler.class, () -> handler);
+
+            /* 6. 创建并注册新的专属网关 */
+            // 创建一个简单的Gateway实现,直接使用Handler发送消息
+            MqttGateway gateway = new MqttGateway() {
+                @Override
+                public void sendToMqtt(String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, defaultTopic)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+
+                @Override
+                public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, topic)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+
+                @Override
+                public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
+                                        @Header(MqttHeaders.QOS) int qos, String payload) {
+                    try {
+                        handler.handleMessage(org.springframework.messaging.support.MessageBuilder
+                                .withPayload(payload)
+                                .setHeader(MqttHeaders.TOPIC, topic)
+                                .setHeader(MqttHeaders.QOS, qos)
+                                .build());
+                    } catch (Exception e) {
+                        log.error("发送MQTT消息失败: {}", e.getMessage(), e);
+                        throw new RuntimeException("发送MQTT消息失败", e);
+                    }
+                }
+            };
+
+            log.info("MQTT 连接刷新完成 -> {} / {}", username, clientId);
+            return gateway;
+        } catch (Exception e) {
+            log.error("MQTT 连接失败 -> {}", clientId, e);
+            throw new RuntimeException("MQTT 连接失败", e);
+        }
+    }
+
+    /* ---------- 私有辅助 ---------- */
+
+    private org.eclipse.paho.client.mqttv3.MqttConnectOptions
+    buildOptions(String u, String p, String url) {
+        org.eclipse.paho.client.mqttv3.MqttConnectOptions opt = 
+                new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
+        opt.setServerURIs(new String[]{url});
+        opt.setUserName(u);
+        if (p != null) opt.setPassword(p.toCharArray());
+        opt.setKeepAliveInterval(keepAlive);
+        return opt;
+    }
+
+    /**
+     * 移除旧的连接实例
+     * @param connectionInfo 连接信息
+     */
+    private void removeOldConnection(ConnectionInfo connectionInfo) {
+        // 移除旧的 Handler
+        if (context.containsBeanDefinition(connectionInfo.handlerBeanName)) {
+            try {
+                MqttPahoMessageHandler oldHandler = context.getBean(connectionInfo.handlerBeanName, MqttPahoMessageHandler.class);
+                oldHandler.stop();
+            } catch (Exception e) {
+                log.warn("停止旧的MQTT处理器时出错: {}", e.getMessage(), e);
+            }
+            context.removeBeanDefinition(connectionInfo.handlerBeanName);
+            log.info("已移除旧的 MQTT 处理器 -> {}", connectionInfo.handlerBeanName);
+        }
+        
+        // 从单例缓存中移除旧的 Handler
+        if (context.getDefaultListableBeanFactory().containsSingleton(connectionInfo.handlerBeanName)) {
+            context.getDefaultListableBeanFactory().destroySingleton(connectionInfo.handlerBeanName);
+        }
+
+        // 移除旧的 Factory
+        if (context.containsBeanDefinition(connectionInfo.factoryBeanName)) {
+            context.removeBeanDefinition(connectionInfo.factoryBeanName);
+            log.info("已移除旧的 MQTT 工厂 -> {}", connectionInfo.factoryBeanName);
+        }
+        
+        // 从单例缓存中移除旧的 Factory
+        if (context.getDefaultListableBeanFactory().containsSingleton(connectionInfo.factoryBeanName)) {
+            context.getDefaultListableBeanFactory().destroySingleton(connectionInfo.factoryBeanName);
+        }
+    }
+
+    /* ---------- 对外可调用的 setter ---------- */
+
+    public MqttConnectionTool defaultTopic(String topic) {
+        this.defaultTopic = topic;
+        return this;
+    }
+
+    public MqttConnectionTool keepAlive(int keepAlive) {
+        this.keepAlive = keepAlive;
+        return this;
+    }
+
+    /* ---------- 复用原来的 Gateway 接口 ---------- */
+    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+    public interface MqttGateway {
+        void sendToMqtt(String payload);
+
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
+                        @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 6 - 6
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,13 +26,13 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    // @Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
+    // @Scheduled(fixedDelay = 26 * 60 * 1000, initialDelay = 0)
     // public void scheduledDeviceDataSync() {
-    //     Integer tenantId = 1208;
-    //     Long engineeringId = 3101130019L;
-    //     String username = "3101130019";
-    //     String password = "ptrEQZK2";
-    //     log.info("开始执行设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
+    //     Integer tenantId = 1205;
+    //     Long engineeringId = 3101070011L;
+    //     String username = "3101070011";
+    //     String password = "5RqhJ7VG";
+    //     log.info("开始执行桃浦象屿人防设备数据同步定时任务,租户ID:{},工程ID:{}", tenantId, engineeringId);
     //
     //     try {
     //         iotDataTransferService.synchronizeDeviceData(tenantId, engineeringId, username, password);