Переглянути джерело

'新增data-transfer和data-gateway两个服务模块,并实现门禁设备操作控制'

james 6 місяців тому
батько
коміт
aebd5aee06
46 змінених файлів з 1739 додано та 41 видалено
  1. 85 0
      data-gateway/data-gateway-eg-kat/pom.xml
  2. 43 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/ApplicationRun.java
  3. 50 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttBaseConfig.java
  4. 53 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttInConfig.java
  5. 84 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttOutConfig.java
  6. 19 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/udp/UdpBaseConfig.java
  7. 53 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/listener/MqttListener.java
  8. 22 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/MqttStrategy.java
  9. 26 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/SimpleContext.java
  10. 81 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/control/control.java
  11. 54 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/info/Info.java
  12. 22 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/utils/ByteUtil.java
  13. 54 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/utils/UdpUtil.java
  14. 21 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/vo/MqttBaseVO.java
  15. 34 0
      data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/vo/ProductMapVO.java
  16. 47 0
      data-gateway/data-gateway-eg-kat/src/main/resources/application.yml
  17. 108 0
      data-gateway/data-gateway-eg-kat/src/main/resources/doc/index.adoc
  18. 74 0
      data-gateway/data-gateway-eg-kat/src/main/resources/logback.xml
  19. 15 0
      data-gateway/data-gateway-eg-kat/src/main/resources/smart-doc.json
  20. 16 0
      data-gateway/pom.xml
  21. 7 0
      data-transfer/data-transfer-api/src/main/java/com/usky/transfer/RemoteTransferService.java
  22. 5 0
      data-transfer/data-transfer-api/src/main/java/com/usky/transfer/factory/RemoteTransferFallbackFactory.java
  23. 16 0
      data-transfer/data-transfer-biz/pom.xml
  24. 1 1
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/MybatisGenerator.java
  25. 9 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/api/DataTransferControllerApi.java
  26. 21 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/web/DmpDeviceCommandController.java
  27. 86 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/domain/DmpDeviceCommand.java
  28. 16 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/mapper/DmpDeviceCommandMapper.java
  29. 16 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/DmpDeviceCommandService.java
  30. 1 1
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/QueryInfluxdbDataService.java
  31. 50 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/mqtt/MqttBaseConfig.java
  32. 56 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/mqtt/MqttInConfig.java
  33. 84 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/mqtt/MqttOutConfig.java
  34. 60 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/enums/TopListener.java
  35. 20 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/DmpDeviceCommandServiceImpl.java
  36. 45 8
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java
  37. 60 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/listener/MqttListener.java
  38. 21 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/MqttStrategy.java
  39. 26 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/SimpleContext.java
  40. 54 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/control/control.java
  41. 73 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java
  42. 1 6
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/MyConsumer.java
  43. 1 1
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/RocketMQSimpleContext.java
  44. 21 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/MqttBaseVO.java
  45. 21 0
      data-transfer/data-transfer-biz/src/main/resources/mapper/transfer/DmpDeviceCommandMapper.xml
  46. 7 24
      pom.xml

+ 85 - 0
data-gateway/data-gateway-eg-kat/pom.xml

@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>data-gateway</artifactId>
+        <groupId>com.usky</groupId>
+        <version>0.0.1</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>data-gateway-eg-kat</artifactId>
+    <dependencies>
+        <!--MQTT依赖-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+            <optional>true</optional>
+        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>com.usky</groupId>-->
+<!--            <artifactId>common-cloud-starter</artifactId>-->
+<!--        </dependency>-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-openfeign-core</artifactId>
+        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>com.usky</groupId>-->
+<!--            <artifactId>usky-common-core</artifactId>-->
+<!--        </dependency>-->
+
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+
+        <!--udp-->
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-ip</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>org.springframework.boot</groupId>-->
+<!--            <artifactId>spring-boot-starter-test</artifactId>-->
+<!--            <scope>test</scope>-->
+<!--        </dependency>-->
+        <!-- Pagehelper -->
+<!--        <dependency>-->
+<!--            <groupId>com.github.pagehelper</groupId>-->
+<!--            <artifactId>pagehelper-spring-boot-starter</artifactId>-->
+<!--        </dependency>-->
+
+    </dependencies>
+
+    <build>
+        <finalName>${project.artifactId}</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>2.2.6.RELEASE</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 43 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/ApplicationRun.java

@@ -0,0 +1,43 @@
+package com.usky.gateway;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.core.env.Environment;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * 应用启动模块
+ * 
+ */
+
+
+@EnableFeignClients(basePackages = "com.usky")
+@ComponentScan("com.usky")
+@SpringBootApplication
+public class ApplicationRun
+{
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationRun.class);
+
+    public static void main(String[] args) throws UnknownHostException {
+        ConfigurableApplicationContext application = SpringApplication.run(ApplicationRun.class, args);
+        Environment env = application.getEnvironment();
+        String ip = InetAddress.getLocalHost().getHostAddress();
+        String port = env.getProperty("server.port");
+        String path = env.getProperty("server.servlet.context-path");
+        LOGGER.info("\n----------------------------------------------------------\n\t" +
+                "Application is running! Access URLs:\n\t" +
+                "Local: \t\thttp://localhost:" + port + (null==path?"":path) + "/\n\t" +
+                "External: \thttp://" + ip + ":" + port + (null==path?"":path) + "/\n\t" +
+                "Api: \t\thttp://" + ip + ":" + port + (null==path?"":path) + "/swagger-ui/index.html\n\t" +
+                "----------------------------------------------------------");
+    }
+}

+ 50 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,50 @@
+package com.usky.gateway.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+	@Value("${mqtt.username}")
+	private String username;
+
+	@Value("${mqtt.password}")
+	private String password;
+
+	@Value("${mqtt.url}")
+	private String hostUrl;
+
+	@Value("${mqtt.sub-topics}")
+	private String msgTopic;
+
+	@Value("${mqtt.keep-alive-interval}")
+	//心跳间隔
+	private int keepAliveInterval;
+	@Value("${mqtt.completionTimeout}")
+	//心跳间隔
+	private int completionTimeout;
+
+
+	@Bean
+	public MqttPahoClientFactory mqttClientFactory() {
+		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+		MqttConnectOptions options = new MqttConnectOptions();
+		options.setServerURIs(new String[]{this.getHostUrl()});
+		options.setUserName(this.getUsername());
+		options.setPassword(this.getPassword().toCharArray());
+		factory.setConnectionOptions(options);
+		return factory;
+	}
+
+}

+ 53 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,53 @@
+package com.usky.gateway.service.config.mqtt;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/11/1 16:37
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttInConfig {
+
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     *
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        String[] tops = mqttBaseConfig.getMsgTopic().split(",");
+        String clientId = "gateway-eg-kat-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), tops);
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 84 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,84 @@
+package com.usky.gateway.service.config.mqtt;
+
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttOutConfig {
+
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+
+    /**
+     * 连接通道
+     *
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     *
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "gateway-eg-kat-mqtt-out-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         *
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         *
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         *
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 19 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/udp/UdpBaseConfig.java

@@ -0,0 +1,19 @@
+package com.usky.gateway.service.config.udp;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "udp", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "udp")
+public class UdpBaseConfig {
+    @Value("${udp.deviceIp}")
+    private String deviceIp;
+
+    @Value("${udp.sendingPort}")
+    private Integer sendingPort;
+}

+ 53 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/listener/MqttListener.java

@@ -0,0 +1,53 @@
+package com.usky.gateway.service.listener;
+
+
+import com.usky.gateway.service.config.mqtt.MqttInConfig;
+import com.usky.gateway.service.mqtt.SimpleContext;
+import com.usky.gateway.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:13
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Slf4j
+@Component
+public class MqttListener {
+
+    public static final String MESSAGE_NAME = "messageInput";
+
+    @Autowired
+    private SimpleContext simpleContext;
+
+    /**
+     * 处理消息-消费者
+     *
+     * @return
+     */
+    @Bean(MESSAGE_NAME)
+    @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
+    public MessageHandler handler() {
+        return message -> {
+            String payload = message.getPayload().toString();
+            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+            if (null != mqttReceivedTopic) {
+                String topic = mqttReceivedTopic.toString();
+                MqttBaseVO mqttBaseVO = new MqttBaseVO();
+                mqttBaseVO.setTopic(topic);
+                if (topic.indexOf("control") != -1 ) {
+                    mqttBaseVO.setDescribe("control");
+                    mqttBaseVO.setData(payload);
+                }
+                //统一处理数据
+                simpleContext.getResource(mqttBaseVO);
+            }
+        };
+    }
+}

+ 22 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/MqttStrategy.java

@@ -0,0 +1,22 @@
+package com.usky.gateway.service.mqtt;
+
+
+import com.usky.gateway.service.vo.MqttBaseVO;
+
+
+/**
+ * 策略类
+ *
+ * @author yq
+ * @date 2021/11/3 8:27
+ */
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqttBaseVO
+     * @return
+     */
+    String disposeMessage(MqttBaseVO mqttBaseVO) ;
+
+}

+ 26 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/SimpleContext.java

@@ -0,0 +1,26 @@
+package com.usky.gateway.service.mqtt;
+
+
+import com.usky.gateway.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 中间处理消息转发
+ */
+@Service
+public class SimpleContext {
+    @Autowired
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MqttBaseVO mqttBaseVO) {
+        return strategyMap.get(mqttBaseVO.getDescribe()).disposeMessage(mqttBaseVO);
+    }
+}

+ 81 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/control/control.java

@@ -0,0 +1,81 @@
+package com.usky.gateway.service.mqtt.control;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.gateway.service.config.mqtt.MqttOutConfig;
+import com.usky.gateway.service.config.udp.UdpBaseConfig;
+import com.usky.gateway.service.mqtt.MqttStrategy;
+import com.usky.gateway.service.utils.ByteUtil;
+import com.usky.gateway.service.utils.UdpUtil;
+import com.usky.gateway.service.vo.MqttBaseVO;
+import feign.form.util.CharsetUtil;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+@Service("control")
+@Configuration
+@Data
+public class control implements MqttStrategy {
+    @Value("${udp.deviceIp}")
+    private String deviceIp;
+
+    @Value("${udp.sendingPort}")
+    private Integer sendingPort;
+
+    @Autowired
+    private UdpUtil udpUtil;
+    @Autowired
+    private ByteUtil byteUtil;
+    @Resource
+    private MqttOutConfig.MqttGateway mqttGateway;
+
+    //处理下发命令消息,下发命令控制设备
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+//        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+        JSONObject obj_data = JSONObject.parseObject(mqttBaseVO.getData().toString());
+        Integer commandId = Integer.parseInt(obj_data.get("id").toString());
+        Object params = JSONObject.toJSONString(obj_data.get("params"));
+        JSONObject params_data = JSON.parseObject(params.toString());
+        Integer deviceId = Integer.parseInt(params_data.getString("device_id"));
+
+        //开门   17 40 00 00 E0 F4 4D 0D 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+        byte[] deviceByte = byteUtil.toLH(deviceId);
+        byte[] requestBytes = new byte[] { 0x17, 0x40, 0x00, 0x00, deviceByte[0], deviceByte[1], deviceByte[2], deviceByte[3], 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
+        byteUtil.printBytes(requestBytes);
+        String responseData = udpUtil.sendUDPRequest(this.getDeviceIp(), this.getSendingPort(), requestBytes);
+
+        //推送下发命令响应mqtt
+        JSONObject jsonObject = new JSONObject();
+        byte[] responseByte = responseData.getBytes(Charset.forName("ISO-8859-1"));  //"US-ASCII"
+        byteUtil.printBytes(responseByte);
+
+        if(responseByte[8] == 0x01){
+            jsonObject.put("result","success");
+
+        }else{
+            JSONObject obj1 = new JSONObject();
+            obj1.put("code",-1);
+            obj1.put("message","open failed");
+            jsonObject.put("error",obj1);
+
+        }
+
+        jsonObject.put("timeStamp",System.currentTimeMillis());
+        jsonObject.put("id",commandId);
+
+        String res_topic = mqttBaseVO.getTopic().replace("control","controlResponse");
+        mqttGateway.sendToMqtt(res_topic,jsonObject.toJSONString());
+
+        return null;
+    }
+
+
+}

+ 54 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/info/Info.java

@@ -0,0 +1,54 @@
+package com.usky.gateway.service.mqtt.info;
+
+import com.usky.gateway.service.mqtt.MqttStrategy;
+import com.usky.gateway.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author zyj
+ * @date 2022/12/6 15:07
+ */
+@Service("info")
+public class Info implements MqttStrategy {
+//    @Resource
+//    private MyProducer myProducer;
+//    @Autowired
+//    private QueryInfluxdbDataService queryInfluxdbDataService;
+//
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+//        Map<String, Object> tags = new HashMap<>();
+//        Map<String, Object> fields = new HashMap<>();
+//        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+//        String deviceId = map_data.get("device_id").toString();
+//        String productCode = map_data.get("product_code").toString().toLowerCase();
+//        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+//
+//        String tableName = deviceId;
+//
+//        Object tg = JSONObject.toJSONString(map_data.get("tags"));
+//        JSONObject tag = JSON.parseObject(tg.toString());
+//        for (String entry : tag.keySet()){
+//            tags.put(entry.toLowerCase(),tag.get(entry).toString());
+//        }
+//
+//        Object met = JSONObject.toJSONString(map_data.get("metrics"));
+//        JSONObject metrics = JSON.parseObject(met.toString());
+//        for(String entry : metrics.keySet()){
+//            fields.put(entry.toLowerCase(),metrics.get(entry));
+//        }
+//
+//        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+//        deviceDataWriteVO.setDeviceId(deviceId);
+//        deviceDataWriteVO.setProductCode(productCode);
+//        deviceDataWriteVO.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
+//        deviceDataWriteVO.setTags(tags);
+//        deviceDataWriteVO.setMetrics(metrics);
+//
+//        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+//
+        return null;
+    }
+
+
+}

+ 22 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/utils/ByteUtil.java

@@ -0,0 +1,22 @@
+package com.usky.gateway.service.utils;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class ByteUtil {
+    public byte[] toLH(int n) {
+        byte[] b = new byte[4];
+        b[0] = (byte) (n & 0xff);
+        b[1] = (byte) (n >> 8 & 0xff);
+        b[2] = (byte) (n >> 16 & 0xff);
+        b[3] = (byte) (n >> 24 & 0xff);
+        return b;
+    }
+
+    // 打印字节数组的十六进制表示
+    public void printBytes(byte[] bytes) {
+        for (byte b : bytes) {
+            System.out.print(Integer.toHexString(b & 0xFF).toUpperCase() + " ");
+        }
+    }
+}

+ 54 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/utils/UdpUtil.java

@@ -0,0 +1,54 @@
+package com.usky.gateway.service.utils;
+
+import feign.form.util.CharsetUtil;
+import org.springframework.stereotype.Component;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.nio.charset.Charset;
+
+
+@Component
+public class UdpUtil {
+
+    /**
+     * 发送UDP请求并打印返回结果
+     *
+     * @param requestBytes 要发送的自定义命令
+     */
+    public static String sendUDPRequest(String host, int port, byte[] requestBytes) {
+        try {
+            // 创建DatagramSocket随机端口
+            DatagramSocket socket = new DatagramSocket();
+
+            // 创建InetAddress对象
+            InetAddress address = InetAddress.getByName(host);
+
+            // 创建DatagramPacket,包含发送的数据和目的地
+            DatagramPacket OutPacket = new DatagramPacket(requestBytes, requestBytes.length, address, port);
+
+            // 发送DatagramPacket
+            socket.send(OutPacket);
+
+            // 创建一个DatagramPacket来接收响应,大小与发送的数据包相同
+            DatagramPacket inPacket = new DatagramPacket(requestBytes, requestBytes.length);
+
+            // 接收响应
+            socket.receive(inPacket);
+
+            // 打印接收到的数据
+            String receivedData = new String(inPacket.getData(), 0, inPacket.getLength(), Charset.forName("ISO-8859-1"));
+            System.out.println("接收到的响应: " + receivedData+ ": ");
+
+            // 关闭socket
+            socket.close();
+
+            return receivedData;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+}

+ 21 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/vo/MqttBaseVO.java

@@ -0,0 +1,21 @@
+package com.usky.gateway.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:32
+ */
+@Data
+public class MqttBaseVO {
+    /**
+     * 接口描述
+     */
+    private String describe;
+
+    private String topic;
+    /**
+     * 数据内容
+      */
+    private Object data;
+}

+ 34 - 0
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/vo/ProductMapVO.java

@@ -0,0 +1,34 @@
+package com.usky.gateway.service.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class ProductMapVO implements Serializable {
+    /**
+     * 产品ID
+     */
+    private Integer productId;
+
+    /**
+     * 产品编码
+     */
+    private String productCode;
+
+    /**
+     * 创建人
+     */
+    private String createdBy;
+
+    /**
+     * 租户号
+     */
+    private Integer tenantId;
+
+    /**
+     * 设备类型
+     */
+    private Integer deviceType;
+
+}

+ 47 - 0
data-gateway/data-gateway-eg-kat/src/main/resources/application.yml

@@ -0,0 +1,47 @@
+# Tomcat
+server:
+  port: 21000
+  servlet:
+    context-path: /data-gateway
+spring:
+  application:
+    # 应用名称
+    name: agbox-energy
+  cache:
+    ehcache:
+      config: classpath:ehcache.xml
+      enabled: false
+    redis:
+      enabled: true
+  jackson:
+    date-format: yyyy-MM-dd HH:mm:ss
+    default-property-inclusion: always
+    deserialization:
+      fail-on-unknown-properties: false
+    parser:
+      allow-single-quotes: true
+      allow-unquoted-control-chars: true
+    serialization:
+      fail-on-empty-beans: false
+    time-zone: GMT+8
+  tenant:
+    enable: false
+  servlet:
+    multipart:
+      max-file-size: 10MB
+      max-request-size: 15MB
+
+temp:
+  basedir: C:/Users/pc/Desktop/
+mqtt:
+  completionTimeout: 5000
+  enabled: true
+  keep-alive-interval: 60
+  password: public
+  sub-topics: /502_KAT/+/control
+  url: tcp://172.16.120.165:1883
+  username: admin
+# 和嵌入式udp通信的发送端口和监听端口地址
+udp:
+  deviceIp: 172.16.120.172
+  sendingPort: 60000

+ 108 - 0
data-gateway/data-gateway-eg-kat/src/main/resources/doc/index.adoc

@@ -0,0 +1,108 @@
+= 安防项目
+
+[width="100%",options="header"]
+[stripes=even]
+|====================
+|Version |  Update Time  | Status | Author |  Description
+|v2022-04-21 16:57:08|2022-04-21 16:57:08|auto|@yq|Created by smart-doc
+|====================
+
+
+== &lt;p&gt;参数配置表 前端控制器&lt;/p&gt;
+== &lt;p&gt;部门信息&lt;/p&gt;
+=== 查看部门信息
+*URL:* http:10.23.39.1:8082/sysDept/list
+
+*Type:* POST
+
+*Author:* ya
+
+*Content-Type:* application/json; charset=utf-8
+
+
+
+
+*Body-parameters:*
+
+[width="100%",options="header"]
+[stripes=even]
+|====================
+|Parameter | Type|Description|Required|Since
+|deptId|int64|部门id|false|-
+|parentId|int64|父部门id|false|-
+|ancestors|string|祖级列表|false|-
+|deptName|string|部门名称|false|-
+|orderNum|int32|显示顺序|false|-
+|leader|string|负责人|false|-
+|phone|string|联系电话|false|-
+|email|string|邮箱|false|-
+|status|string|部门状态(0正常 1停用)|false|-
+|delFlag|string|删除标志(0代表存在 2代表删除)|false|-
+|createBy|string|创建者|false|-
+|createTime|string|创建时间|false|-
+|updateBy|string|更新者|false|-
+|updateTime|string|更新时间|false|-
+|bId|int64|建筑id|false|-
+|====================
+
+*Response-fields:*
+
+[width="100%",options="header"]
+[stripes=even]
+|====================
+|Field | Type|Description|Since
+|status|object|No comments found.|-
+|code|string|No comments found.|-
+|msg|string|No comments found.|-
+|data|object|No comments found.|-
+|└─deptId|int64|部门id|-
+|└─parentId|int64|父部门id|-
+|└─ancestors|string|祖级列表|-
+|└─deptName|string|部门名称|-
+|└─orderNum|int32|显示顺序|-
+|└─leader|string|负责人|-
+|└─phone|string|联系电话|-
+|└─email|string|邮箱|-
+|└─status|string|部门状态(0正常 1停用)|-
+|└─delFlag|string|删除标志(0代表存在 2代表删除)|-
+|└─createBy|string|创建者|-
+|└─createTime|string|创建时间|-
+|└─updateBy|string|更新者|-
+|└─updateTime|string|更新时间|-
+|└─bId|int64|建筑id|-
+|exception|string|No comments found.|-
+|====================
+
+*Response-example:*
+----
+{
+	"status": {
+		
+	},
+	"code": "97564",
+	"msg": "wnr5qt",
+	"data": [
+		{
+			"deptId": 540,
+			"parentId": 858,
+			"ancestors": "o5lg60",
+			"deptName": "文.沈",
+			"orderNum": 260,
+			"leader": "ufz93p",
+			"phone": "17852835049",
+			"email": "智渊.徐@yahoo.com",
+			"status": "nu6cnp",
+			"delFlag": "72oiji",
+			"createBy": "5fxr6j",
+			"createTime": "2022-04-21 16:57:10",
+			"updateBy": "4kcs4e",
+			"updateTime": "2022-04-21 16:57:10",
+			"bId": 977
+		}
+	],
+	"exception": "53u6bg"
+}
+----
+
+== &lt;p&gt;用户信息表 前端控制器&lt;/p&gt;
+

+ 74 - 0
data-gateway/data-gateway-eg-kat/src/main/resources/logback.xml

@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration scan="true" scanPeriod="60 seconds" debug="false">
+    <!-- 日志存放路径 -->
+	<property name="log.path" value="/var/log/uskycloud/usky-demo" />
+   <!-- 日志输出格式 -->
+	<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
+
+    <!-- 控制台输出 -->
+	<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>${log.pattern}</pattern>
+		</encoder>
+	</appender>
+
+    <!-- 系统日志输出 -->
+	<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
+	    <file>${log.path}/info.log</file>
+        <!-- 循环政策:基于时间创建日志文件 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 日志文件名格式 -->
+			<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
+			<!-- 日志最大的历史 60天 -->
+			<maxHistory>60</maxHistory>
+		</rollingPolicy>
+		<encoder>
+			<pattern>${log.pattern}</pattern>
+		</encoder>
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <!-- 过滤的级别 -->
+            <level>INFO</level>
+            <!-- 匹配时的操作:接收(记录) -->
+            <onMatch>ACCEPT</onMatch>
+            <!-- 不匹配时的操作:拒绝(不记录) -->
+            <onMismatch>DENY</onMismatch>
+        </filter>
+	</appender>
+
+    <appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
+	    <file>${log.path}/error.log</file>
+        <!-- 循环政策:基于时间创建日志文件 -->
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 日志文件名格式 -->
+            <fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
+			<!-- 日志最大的历史 60天 -->
+			<maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <!-- 过滤的级别 -->
+            <level>ERROR</level>
+			<!-- 匹配时的操作:接收(记录) -->
+            <onMatch>ACCEPT</onMatch>
+			<!-- 不匹配时的操作:拒绝(不记录) -->
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+
+    <!-- 系统模块日志级别控制  -->
+	<logger name="com.usky" level="info" />
+	<!-- Spring日志级别控制  -->
+	<logger name="org.springframework" level="warn" />
+
+	<root level="info">
+		<appender-ref ref="console" />
+	</root>
+	
+	<!--系统操作日志-->
+    <root level="info">
+        <appender-ref ref="file_info" />
+        <appender-ref ref="file_error" />
+    </root>
+</configuration>

+ 15 - 0
data-gateway/data-gateway-eg-kat/src/main/resources/smart-doc.json

@@ -0,0 +1,15 @@
+{
+  "outPath":"./src/main/resources/doc",
+  "serverUrl": "http:10.23.39.1:8082/",
+  "isStrict": false,
+  "coverOld": true,
+  "allInOne": true,
+  "packageFilters": "com.usky.demo.controller.web",
+  "requestExample":"false",
+  "responseExample":"true",
+  "projectName": "安防项目",
+  "appKey": "20211216921084883495813120",
+  "appToken":"36bde2426ad546a5a50311bb747e7e61",
+  "secret": "N@Pd,KXAHki*BW3=zK.XPNykf!=CM79J",
+  "openUrl": "http://101.133.214.75:7700/api"
+}

+ 16 - 0
data-gateway/pom.xml

@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>usky-data</artifactId>
+        <groupId>com.usky</groupId>
+        <version>0.0.1</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>data-gateway</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>data-gateway-eg-kat</module><!--科奥特门禁-->
+  </modules>
+
+</project>

+ 7 - 0
data-transfer/data-transfer-api/src/main/java/com/usky/transfer/RemoteTransferService.java

@@ -8,6 +8,7 @@ import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
+import java.util.Map;
 
 @FeignClient(contextId = "remoteTransferService", value = "data-transfer", fallbackFactory = RemoteTransferFallbackFactory.class)
 public interface RemoteTransferService {
@@ -18,4 +19,10 @@ public interface RemoteTransferService {
      */
     @PostMapping("/sendDeviceData")
     ApiResult<Void> sendDeviceData(@RequestBody DeviceDataWriteVO writeVO);
+
+    /**
+     * 下发单个设备控制命令
+     */
+    @PostMapping("/deviceControl")
+    Map<String,Object> deviceControl(@RequestParam("topic") String topic, @RequestParam("dataStr") String dataStr);
 }

+ 5 - 0
data-transfer/data-transfer-api/src/main/java/com/usky/transfer/factory/RemoteTransferFallbackFactory.java

@@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 
 import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -34,6 +35,10 @@ public class RemoteTransferFallbackFactory implements FallbackFactory<RemoteTran
             public ApiResult<Void> sendDeviceData(DeviceDataWriteVO writeVO) {
                 throw new BusinessException(throwable.getMessage());
             }
+            @Override
+            public Map<String,Object> deviceControl(String topic, String dataStr) {
+                throw new BusinessException(throwable.getMessage());
+            }
         };
     }
 }

+ 16 - 0
data-transfer/data-transfer-biz/pom.xml

@@ -11,11 +11,27 @@
 
     <artifactId>data-transfer-biz</artifactId>
     <dependencies>
+        <!--MQTT依赖-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+            <optional>true</optional>
+        </dependency>
         <dependency>
             <groupId>com.usky</groupId>
             <artifactId>common-cloud-starter</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>

+ 1 - 1
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/MybatisGenerator.java

@@ -70,7 +70,7 @@ public class MybatisGenerator {
         // strategy.setTablePrefix("t_"); // 表名前缀
         strategy.setEntityLombokModel(true); //使用lombok
         //修改自己想要生成的表
-        strategy.setInclude("dmp_device_status");  // 逆向工程使用的表   如果要生成多个,这里可以传入String[]
+        strategy.setInclude("dmp_device_command");  // 逆向工程使用的表   如果要生成多个,这里可以传入String[]
         mpg.setStrategy(strategy);
 
         // 关闭默认 xml 生成,调整生成 至 根目录

+ 9 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/api/DataTransferControllerApi.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 @RestController
@@ -38,4 +39,12 @@ public class DataTransferControllerApi implements RemoteTransferService {
         queryInfluxdbDataService.sendDeviceDataToMQ(writeVO);
         return ApiResult.success();
     }
+
+    /**
+     * 下发单个设备控制命令
+     */
+    @Override
+    public Map<String,Object> deviceControl(String topic,String dataStr){
+        return queryInfluxdbDataService.deviceControl(topic,dataStr);
+    }
 }

+ 21 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/web/DmpDeviceCommandController.java

@@ -0,0 +1,21 @@
+package com.usky.transfer.controller.web;
+
+
+import org.springframework.web.bind.annotation.RequestMapping;
+
+import org.springframework.stereotype.Controller;
+
+/**
+ * <p>
+ * 创建下发命令历史记录表 前端控制器
+ * </p>
+ *
+ * @author ya
+ * @since 2024-09-24
+ */
+@Controller
+@RequestMapping("/dmpDeviceCommand")
+public class DmpDeviceCommandController {
+
+}
+

+ 86 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/domain/DmpDeviceCommand.java

@@ -0,0 +1,86 @@
+package com.usky.transfer.domain;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import java.time.LocalDateTime;
+import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * <p>
+ * 创建下发命令历史记录表
+ * </p>
+ *
+ * @author ya
+ * @since 2024-09-24
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+public class DmpDeviceCommand implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键id
+     */
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * 产品编码
+     */
+    private String productCode;
+
+    /**
+     * 设备编码
+     */
+    private String deviceId;
+
+    /**
+     * 下发命令内容
+     */
+    private String commandContent;
+
+    /**
+     * 下发命令响应内容
+     */
+    private String commandResponse;
+
+    /**
+     * 命令状态;0 命令执行中,1 命令成功,2 命令失败
+     */
+    private Integer commandStatus;
+
+    /**
+     * 创建人
+     */
+    private String createdBy;
+
+    /**
+     * 创建时间
+     */
+    private LocalDateTime createdTime;
+
+    /**
+     * 更新人
+     */
+    private String updatedBy;
+
+    /**
+     * 更新时间
+     */
+    private LocalDateTime updatedTime;
+
+    /**
+     * 组织结构ID
+     */
+    private Integer deptId;
+
+    /**
+     * 租户ID
+     */
+    private Integer tenantId;
+
+
+}

+ 16 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/mapper/DmpDeviceCommandMapper.java

@@ -0,0 +1,16 @@
+package com.usky.transfer.mapper;
+
+import com.usky.transfer.domain.DmpDeviceCommand;
+import com.usky.common.mybatis.core.CrudMapper;
+
+/**
+ * <p>
+ * 创建下发命令历史记录表 Mapper 接口
+ * </p>
+ *
+ * @author ya
+ * @since 2024-09-24
+ */
+public interface DmpDeviceCommandMapper extends CrudMapper<DmpDeviceCommand> {
+
+}

+ 16 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/DmpDeviceCommandService.java

@@ -0,0 +1,16 @@
+package com.usky.transfer.service;
+
+import com.usky.transfer.domain.DmpDeviceCommand;
+import com.usky.common.mybatis.core.CrudService;
+
+/**
+ * <p>
+ * 创建下发命令历史记录表 服务类
+ * </p>
+ *
+ * @author ya
+ * @since 2024-09-24
+ */
+public interface DmpDeviceCommandService extends CrudService<DmpDeviceCommand> {
+
+}

+ 1 - 1
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/QueryInfluxdbDataService.java

@@ -18,6 +18,6 @@ public interface QueryInfluxdbDataService extends CrudService<QueryInfluxdbData>
 
     Map<String,Object> sendDeviceDataToMQ(DeviceDataWriteVO writeVO);
 
-
+    Map<String,Object> deviceControl(String topic,String dataStr);
 
 }

+ 50 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,50 @@
+package com.usky.transfer.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+	@Value("${mqtt.username}")
+	private String username;
+
+	@Value("${mqtt.password}")
+	private String password;
+
+	@Value("${mqtt.url}")
+	private String hostUrl;
+
+	@Value("${mqtt.sub-topics}")
+	private String msgTopic;
+
+	@Value("${mqtt.keep-alive-interval}")
+	//心跳间隔
+	private int keepAliveInterval;
+	@Value("${mqtt.completionTimeout}")
+	//心跳间隔
+	private int completionTimeout;
+
+
+	@Bean
+	public MqttPahoClientFactory mqttClientFactory() {
+		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+		MqttConnectOptions options = new MqttConnectOptions();
+		options.setServerURIs(new String[]{this.getHostUrl()});
+		options.setUserName(this.getUsername());
+		options.setPassword(this.getPassword().toCharArray());
+		factory.setConnectionOptions(options);
+		return factory;
+	}
+
+}

+ 56 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,56 @@
+package com.usky.transfer.service.config.mqtt;
+
+import com.usky.transfer.service.enums.TopListener;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/11/1 16:37
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttInConfig {
+
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     *
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        String[] tops = mqttBaseConfig.getMsgTopic().split(",");
+        String clientId = "h-transfer-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), tops);
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 84 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,84 @@
+package com.usky.transfer.service.config.mqtt;
+
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttOutConfig {
+
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+
+    /**
+     * 连接通道
+     *
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     *
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "h-transfer-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         *
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         *
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         *
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 60 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/enums/TopListener.java

@@ -0,0 +1,60 @@
+package com.usky.transfer.service.enums;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author ZYJ
+ * @date 2024/9/23 15:11
+ */
+public enum TopListener {
+
+    /**
+     * 对接设备info信息和下发命令响应信息
+     */
+    DEVICE_INFO("deviceInfo","/+/+/info",1),
+    DEVICE_CONTROLRESPONSE("deviceControlResponse","/+/+/controlResponse",1);
+
+
+
+
+    private String name;
+    private String code;
+    //0发送队列,1监听队列2都是
+    private Integer type;
+
+    TopListener(String name, String code, Integer type){
+        this.name = name;
+        this.code = code;
+        this.type = type;
+    }
+
+    public static TopListener parse(String code){
+        TopListener topListener = null;
+        for (TopListener t:TopListener.values()) {
+            if (t.getCode().equals(code)){
+                topListener = t;
+                break;
+            }
+        }
+        return topListener;
+    }
+    public static List<TopListener> parse(Integer type){
+        List<TopListener> listeners = new ArrayList<>();
+        for (TopListener t:TopListener.values()) {
+            if (t.getType().equals(type)){
+                listeners.add(t);
+            }
+        }
+        return listeners;
+    }
+    public String getCode(){
+        return code;
+    }
+    public String getName(){
+        return name;
+    }
+    public Integer getType(){
+        return type;
+    }
+}

+ 20 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/DmpDeviceCommandServiceImpl.java

@@ -0,0 +1,20 @@
+package com.usky.transfer.service.impl;
+
+import com.usky.transfer.domain.DmpDeviceCommand;
+import com.usky.transfer.mapper.DmpDeviceCommandMapper;
+import com.usky.transfer.service.DmpDeviceCommandService;
+import com.usky.common.mybatis.core.AbstractCrudService;
+import org.springframework.stereotype.Service;
+
+/**
+ * <p>
+ * 创建下发命令历史记录表 服务实现类
+ * </p>
+ *
+ * @author ya
+ * @since 2024-09-24
+ */
+@Service
+public class DmpDeviceCommandServiceImpl extends AbstractCrudService<DmpDeviceCommandMapper, DmpDeviceCommand> implements DmpDeviceCommandService {
+
+}

+ 45 - 8
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -1,19 +1,19 @@
 package com.usky.transfer.service.impl;
 
 import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.sun.media.jfxmedia.logging.Logger;
 import com.usky.common.core.util.UUIDUtils;
 import com.usky.common.security.utils.SecurityUtils;
 import com.usky.transfer.domain.*;
 import com.usky.transfer.mapper.QueryInfluxdbDataMapper;
-import com.usky.transfer.service.DmpDeviceService;
-import com.usky.transfer.service.DmpDeviceStatusService;
-import com.usky.transfer.service.DmpProductService;
-import com.usky.transfer.service.QueryInfluxdbDataService;
+import com.usky.transfer.service.*;
 import com.usky.common.mybatis.core.AbstractCrudService;
+import com.usky.transfer.service.config.mqtt.MqttOutConfig;
 import com.usky.transfer.service.rocketmq.MyProducer;
 import com.usky.transfer.service.utils.TsdbUtils;
 import com.usky.transfer.service.vo.DeviceMapVO;
@@ -26,10 +26,7 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * <p>
@@ -50,11 +47,51 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     private DmpDeviceService dmpDeviceService;
     @Autowired
     private DmpDeviceStatusService dmpDeviceStatusService;
+    @Autowired
+    private DmpDeviceCommandService dmpDeviceCommandService;
 
     @Resource
     private MyProducer myProducer;
+    @Resource
+    private MqttOutConfig.MqttGateway mqttGateway;
+
+    @Override
+    public Map<String,Object> deviceControl(String topic,String dataStr){
+        Map<String,Object> rec_map = new HashMap<>();
+        String[] str = topic.split("/");
+
+        //存储下发设备控制命令到数据库表中
+        DmpDeviceCommand command = new DmpDeviceCommand();
+        command.setProductCode(str[1]);
+        command.setDeviceId(str[2]);
+        command.setCommandContent(dataStr);
+        command.setCreatedBy(SecurityUtils.getUsername());
+        command.setCreatedTime(LocalDateTime.now());
+//        if (Objects.nonNull(SecurityUtils.getLoginUser().getSysUser().getDeptId())){
+//            command.setDeptId(SecurityUtils.getLoginUser().getSysUser().getDeptId().intValue());
+//        }
+
+        command.setTenantId(SecurityUtils.getTenantId());
+        dmpDeviceCommandService.save(command);
+        int commandId = command.getId();
+
+        JSONObject dataJson = JSONObject.parseObject(dataStr);
+        dataJson.put("id",commandId);
+
+        command.setCommandContent(dataJson.toJSONString());
+        dmpDeviceCommandService.updateById(command);
+        //推送下发设备控制mqtt
+        if(StringUtils.isNotBlank(dataStr)){
+            mqttGateway.sendToMqtt(topic,dataJson.toJSONString());
+        }
 
 
+        rec_map.put("code",200);
+        rec_map.put("message","操作成功");
+
+        return rec_map;
+    }
+
     @Override
     public Map<String,Object> sendDeviceDataToMQ(DeviceDataWriteVO writeVO){
         Map<String,Object> rec_map = new HashMap<>();

+ 60 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/listener/MqttListener.java

@@ -0,0 +1,60 @@
+package com.usky.transfer.service.listener;
+
+
+import com.usky.transfer.service.config.mqtt.MqttInConfig;
+import com.usky.transfer.service.enums.TopListener;
+import com.usky.transfer.service.mqtt.SimpleContext;
+import com.usky.transfer.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:13
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Slf4j
+@Component
+public class MqttListener {
+
+    public static final String MESSAGE_NAME = "messageInput";
+
+    @Autowired
+    private SimpleContext simpleContext;
+
+    /**
+     * 处理消息-消费者
+     *
+     * @return
+     */
+    @Bean(MESSAGE_NAME)
+    @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
+    public MessageHandler handler() {
+        return message -> {
+            String payload = message.getPayload().toString();
+            //进行接口推送
+//            String[] infoCode = TopListener.DEVICE_INFO.getCode().split("/");
+//            String[] controlCode = TopListener.DEVICE_CONTROLRESPONSE.getCode().split("/");
+            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+            if (null != mqttReceivedTopic) {
+                String topic = mqttReceivedTopic.toString();
+                MqttBaseVO mqttBaseVO = new MqttBaseVO();
+                mqttBaseVO.setTopic(topic);
+                if (topic.indexOf("info") != -1 ) {
+                    mqttBaseVO.setDescribe("info");
+                    mqttBaseVO.setData(payload);
+                }else if(topic.indexOf("controlResponse") != -1 ) {
+                    mqttBaseVO.setDescribe("controlResponse");
+                    mqttBaseVO.setData(payload);
+                }
+                //统一处理数据
+                simpleContext.getResource(mqttBaseVO);
+            }
+        };
+    }
+}

+ 21 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/MqttStrategy.java

@@ -0,0 +1,21 @@
+package com.usky.transfer.service.mqtt;
+
+
+import com.usky.transfer.service.vo.MqttBaseVO;
+
+/**
+ * 策略类
+ *
+ * @author yq
+ * @date 2021/11/3 8:27
+ */
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqttBaseVO
+     * @return
+     */
+    String disposeMessage(MqttBaseVO mqttBaseVO);
+
+}

+ 26 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/SimpleContext.java

@@ -0,0 +1,26 @@
+package com.usky.transfer.service.mqtt;
+
+
+import com.usky.transfer.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 中间处理消息转发
+ */
+@Service
+public class SimpleContext {
+    @Autowired
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MqttBaseVO mqttBaseVO) {
+        return strategyMap.get(mqttBaseVO.getDescribe()).disposeMessage(mqttBaseVO);
+    }
+}

+ 54 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/control/control.java

@@ -0,0 +1,54 @@
+package com.usky.transfer.service.mqtt.control;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.common.security.utils.SecurityUtils;
+import com.usky.transfer.domain.DeviceDataWriteVO;
+import com.usky.transfer.domain.DmpDeviceCommand;
+import com.usky.transfer.service.DmpDeviceCommandService;
+import com.usky.transfer.service.QueryInfluxdbDataService;
+import com.usky.transfer.service.mqtt.MqttStrategy;
+import com.usky.transfer.service.rocketmq.MyProducer;
+import com.usky.transfer.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+
+@Service("controlResponse")
+public class control implements MqttStrategy {
+    @Autowired
+    private DmpDeviceCommandService dmpDeviceCommandService;
+
+    //处理下发命令响应消息
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+
+        //存储下发设备控制命令到数据库表中
+        DmpDeviceCommand command = new DmpDeviceCommand();
+
+        String recData = mqttBaseVO.getData().toString();
+        JSONObject dataJson = JSONObject.parseObject(recData);
+        Integer commandId = Integer.valueOf(dataJson.get("id").toString());
+        command.setId(commandId);
+        command.setCommandResponse(recData);
+        if(recData.contains("error")){
+            command.setCommandStatus(2);
+        }else{
+            command.setCommandStatus(1);
+        }
+
+        command.setUpdatedBy(SecurityUtils.getUsername());
+        command.setUpdatedTime(LocalDateTime.now());
+
+        dmpDeviceCommandService.updateById(command);
+
+        return null;
+    }
+
+
+}

+ 73 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java

@@ -0,0 +1,73 @@
+package com.usky.transfer.service.mqtt.info;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.usky.common.core.exception.BusinessException;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.transfer.domain.*;
+import com.usky.transfer.service.*;
+import com.usky.transfer.service.mqtt.MqttStrategy;
+import com.usky.transfer.service.rocketmq.MyProducer;
+import com.usky.transfer.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author zyj
+ * @date 2022/12/6 15:07
+ */
+@Service("info")
+public class Info implements MqttStrategy {
+    @Resource
+    private MyProducer myProducer;
+    @Autowired
+    private QueryInfluxdbDataService queryInfluxdbDataService;
+
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+        Map<String, Object> tags = new HashMap<>();
+        Map<String, Object> fields = new HashMap<>();
+        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+        String deviceId = map_data.get("device_id").toString();
+        String productCode = map_data.get("product_code").toString().toLowerCase();
+        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+
+        String tableName = deviceId;
+
+        Object tg = JSONObject.toJSONString(map_data.get("tags"));
+        JSONObject tag = JSON.parseObject(tg.toString());
+        for (String entry : tag.keySet()){
+            tags.put(entry.toLowerCase(),tag.get(entry).toString());
+        }
+
+        Object met = JSONObject.toJSONString(map_data.get("metrics"));
+        JSONObject metrics = JSON.parseObject(met.toString());
+        for(String entry : metrics.keySet()){
+            fields.put(entry.toLowerCase(),metrics.get(entry));
+        }
+
+        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+        deviceDataWriteVO.setDeviceId(deviceId);
+        deviceDataWriteVO.setProductCode(productCode);
+        deviceDataWriteVO.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
+        deviceDataWriteVO.setTags(tags);
+        deviceDataWriteVO.setMetrics(metrics);
+
+        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+
+        return null;
+    }
+
+
+}

+ 1 - 6
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/MyConsumer.java

@@ -1,22 +1,17 @@
 package com.usky.transfer.service.rocketmq;
 
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
-import com.usky.transfer.service.rocketmq.SimpleContext;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.Map;
-
 @Slf4j
 @Component
 @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}")
 public class MyConsumer implements RocketMQListener<String> {
     @Autowired
-    private SimpleContext simpleContext;
+    private RocketMQSimpleContext simpleContext;
 
     @Override
     public void onMessage(String message){

+ 1 - 1
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/SimpleContext.java → data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/RocketMQSimpleContext.java

@@ -22,7 +22,7 @@ import java.util.*;
  */
 @Service
 @Repository
-public class SimpleContext {
+public class RocketMQSimpleContext {
     @Resource
     private MyProducer myProducer;
     @Autowired

+ 21 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/MqttBaseVO.java

@@ -0,0 +1,21 @@
+package com.usky.transfer.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:32
+ */
+@Data
+public class MqttBaseVO {
+    /**
+     * 接口描述
+     */
+    private String describe;
+
+    private String topic;
+    /**
+     * 数据内容
+      */
+    private Object data;
+}

+ 21 - 0
data-transfer/data-transfer-biz/src/main/resources/mapper/transfer/DmpDeviceCommandMapper.xml

@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.usky.transfer.mapper.DmpDeviceCommandMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.usky.transfer.domain.DmpDeviceCommand">
+        <id column="id" property="id" />
+        <result column="product_code" property="productCode" />
+        <result column="device_id" property="deviceId" />
+        <result column="command_content" property="commandContent" />
+        <result column="command_response" property="commandResponse" />
+        <result column="command_status" property="commandStatus" />
+        <result column="created_by" property="createdBy" />
+        <result column="created_time" property="createdTime" />
+        <result column="updated_by" property="updatedBy" />
+        <result column="updated_time" property="updatedTime" />
+        <result column="dept_id" property="deptId" />
+        <result column="tenant_id" property="tenantId" />
+    </resultMap>
+
+</mapper>

+ 7 - 24
pom.xml

@@ -20,21 +20,10 @@
   
   
   </parent>
-          
-  
-  
+
   <modelVersion>4.0.0</modelVersion>
-          
-  
-  
   <artifactId>usky-data</artifactId>
-          
-  
-  
   <packaging>pom</packaging>
-          
-  
-  
   <description>usky-data</description>
           
   
@@ -47,28 +36,22 @@
 
     <module>data-tsdb-proxy</module>
 
+    <module>data-transfer</module>
+
     <module>data-protocol-http</module>
 
+    <module>data-gateway</module>
+
   </modules>
           
   
   
   <dependencies>
-                    
-    
-    
+
+
     <dependency>
-                              
-      
-      
       <groupId>org.projectlombok</groupId>
-                              
-      
-      
       <artifactId>lombok</artifactId>
-                          
-    
-    
     </dependency>