yq 3 gadi atpakaļ
vecāks
revīzija
824299e936

+ 7 - 0
mhfire-controller/src/main/resources/application-dev.properties

@@ -67,6 +67,13 @@ spring.jackson.parser.allow-single-quotes=true
 server.compression.enabled=true
 server.compression.mime-types=application/javascript,text/css,application/json,application/xml,text/html,text/xml,text/plain
 
+#mqttt
+mqtt.completionTimeout=5000
+mqtt.keep-alive-interval=60
+mqtt.username=wjzn2021
+mqtt.password=wjzn2021
+mqtt.sub-topics=usky
+mqtt.url=tcp://124.71.175.91:1883
 
 
 # eureka

+ 7 - 0
mhfire-controller/src/main/resources/application-prod.properties

@@ -63,6 +63,13 @@ spring.jackson.parser.allow-single-quotes=true
 server.compression.enabled=true
 server.compression.mime-types=application/javascript,text/css,application/json,application/xml,text/html,text/xml,text/plain
 
+#mqttt
+mqtt.completionTimeout=5000
+mqtt.keep-alive-interval=60
+mqtt.username=usky
+mqtt.password=usky
+mqtt.sub-topics=/usky/ytDP0007/+/+/info
+mqtt.url=tcp://47.98.201.73:1883
 
 # eureka
 #eureka.client.service-url.defaultZone=http://172.31.101.251:8099/eureka/,http://172.31.101.252:8099/eureka/

+ 10 - 0
mhfire-service/pom.xml

@@ -74,6 +74,16 @@
             <artifactId>bcprov-jdk15on</artifactId>
             <version>1.54</version>
         </dependency>
+
+        <!--MQTT依赖-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
     </dependencies>
 
 

+ 20 - 0
mhfire-service/src/main/java/com/bizmatics/mhfire/service/api/OneCardApi.java

@@ -0,0 +1,20 @@
+package com.bizmatics.mhfire.service.api;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 一网通卡api
+ * @author yq
+ * @date 2021/11/4 11:24
+ */
+@Slf4j
+public class OneCardApi {
+    private static final String URL = "http://32.1.7.42";
+
+    /**
+     * 推送地址
+     */
+    private static final String LOGIN_URL = String.format("%s%s",URL,"/mhxfzd/mhapi/bsUser/user/login");
+
+
+}

+ 48 - 0
mhfire-service/src/main/java/com/bizmatics/mhfire/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,48 @@
+package com.bizmatics.mhfire.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+	@Value("${mqtt.username}")
+	private String username;
+
+	@Value("${mqtt.password}")
+	private String password;
+
+	@Value("${mqtt.url}")
+	private String hostUrl;
+
+	@Value("${mqtt.sub-topics}")
+	private String msgTopic;
+
+	@Value("${mqtt.keep-alive-interval}")
+	//心跳间隔
+	private int keepAliveInterval;
+	@Value("${mqtt.completionTimeout}")
+	//心跳间隔
+	private int completionTimeout;
+
+
+	@Bean
+	public MqttPahoClientFactory mqttClientFactory() {
+		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+		MqttConnectOptions options = new MqttConnectOptions();
+		options.setServerURIs(new String[]{this.getHostUrl()});
+		options.setUserName(this.getUsername());
+		options.setPassword(this.getPassword().toCharArray());
+		factory.setConnectionOptions(options);
+		return factory;
+	}
+
+}

+ 46 - 0
mhfire-service/src/main/java/com/bizmatics/mhfire/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,46 @@
+package com.bizmatics.mhfire.service.config.mqtt;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+
+/**
+ * @author yq
+ * @date 2021/11/1 16:37
+ */
+@Configuration
+public class MqttInConfig {
+
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        //创建消息适配器 TODO 这里一定要注意,多端部署时id不能重复
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), mqttBaseConfig.getMsgTopic());
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(1);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 74 - 0
mhfire-service/src/main/java/com/bizmatics/mhfire/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,74 @@
+package com.bizmatics.mhfire.service.config.mqtt;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+@Configuration
+public class MqttOutConfig {
+
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+    /**
+     * 连接通道
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}