Przeglądaj źródła

开发创建超级表以及列、新增超级表列、删除超级表列和查询超级表详情,同时提供Api对接接口;优化批量设备实时数据查询(对外)和批量设备历史数据查询(对外)两个接口

james 2 dni temu
rodzic
commit
0eef712a2b
23 zmienionych plików z 1342 dodań i 313 usunięć
  1. 63 66
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/web/QueryDeviceDataController.java
  2. 52 25
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java
  3. 34 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/constant/TdsConstants.java
  4. 59 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/DataTypeEnum.java
  5. 29 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/DeviceRealDataVO.java
  6. 30 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/Fields.java
  7. 57 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/FieldsVO.java
  8. 45 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/SuperTableDTO.java
  9. 52 0
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/SuperTableDescribeVO.java
  10. 32 14
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java
  11. 6 0
      data-tsdb-proxy/data-tsdb-proxy-biz/pom.xml
  12. 93 29
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java
  13. 81 1
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java
  14. 60 1
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/mapper/QueryTdengineDataMapper.java
  15. 17 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryTdengineDataService.java
  16. 318 131
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryTdengineDataServiceImpl.java
  17. 20 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/MqttStrategy.java
  18. 16 1
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/MyConsumer.java
  19. 10 45
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/SimpleContext.java
  20. 67 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/influxdb/influxdb.java
  21. 77 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/tdengine/tdengine.java
  22. 13 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/vo/MQDataVO.java
  23. 111 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/resources/mapper/demo/QueryTdengineDataMapper.xml

+ 63 - 66
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/web/QueryDeviceDataController.java

@@ -7,10 +7,7 @@ import com.usky.common.core.bean.ApiResult;
 import com.usky.common.core.bean.CommonPage;
 import com.usky.common.core.exception.BusinessException;
 import com.usky.demo.RemoteTsdbProxyService;
-import com.usky.demo.domain.HistoryRequestVO;
-import com.usky.demo.domain.HistoryResultVO;
-import com.usky.demo.domain.LastRequestVO;
-import com.usky.demo.domain.LastResultVO;
+import com.usky.demo.domain.*;
 import com.usky.transfer.domain.DmpDevice;
 import com.usky.transfer.domain.DmpDeviceCommand;
 import com.usky.transfer.service.DmpDeviceCommandService;
@@ -60,32 +57,32 @@ public class QueryDeviceDataController {
         return deviceMap;
     }
 
-    /**
-     * 单个设备实时数据查询(对外)
-     * @param productCode
-     * @param deviceId
-     * @return
-     */
-    @GetMapping("/externalLast")
-    public ApiResult<LastResultVO> queryLastDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                       @RequestParam(value = "deviceId") String deviceId){
-        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
-        if(!deviceMapList.containsKey(deviceId)){
-            throw new BusinessException(deviceId+"无设备信息");
-        }
-
-        String deviceUUId = "";
-
-        for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
-            String productCode1 = map.getValue().getProductCode();
-            String deviceId1 = map.getKey();
-            if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
-                deviceUUId = map.getValue().getDeviceUuid();
-            }
-        }
-
-        return ApiResult.success(remoteTsdbProxyService.queryLastDeviceData(deviceUUId));
-    }
+//    /**
+//     * 单个设备实时数据查询(对外)
+//     * @param productCode
+//     * @param deviceId
+//     * @return
+//     */
+//    @GetMapping("/externalLast")
+//    public ApiResult<LastResultVO> queryLastDeviceData(@RequestParam(value = "productCode") String productCode,
+//                                                       @RequestParam(value = "deviceId") String deviceId){
+//        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
+//        if(!deviceMapList.containsKey(deviceId)){
+//            throw new BusinessException(deviceId+"无设备信息");
+//        }
+//
+//        String deviceUUId = "";
+//
+//        for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+//            String productCode1 = map.getValue().getProductCode();
+//            String deviceId1 = map.getKey();
+//            if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+//                deviceUUId = map.getValue().getDeviceUuid();
+//            }
+//        }
+//
+//        return ApiResult.success(remoteTsdbProxyService.queryLastDeviceData(deviceUUId));
+//    }
 
     /**
      * 批量设备实时数据查询(对外)
@@ -93,9 +90,9 @@ public class QueryDeviceDataController {
      * @return
      */
     @PostMapping("/externalLast")
-    public ApiResult<List<LastResultVO>> queryLastDeviceData(@RequestBody ExternalLastRequestVO requestVO){
+    public ApiResult<List<LastInnerResultVO>> queryLastDeviceData(@RequestBody ExternalLastRequestVO requestVO){
         Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
-        LastRequestVO reqVO = new LastRequestVO();
+        LastInnerQueryVO reqVO = new LastInnerQueryVO();
 
         String productCode = requestVO.getProductCode();
         List<String> deviceIds = requestVO.getDeviceId();
@@ -115,41 +112,41 @@ public class QueryDeviceDataController {
                 }
             }
         }
-        reqVO.setDeviceUUId(deviceUUIds);
+        reqVO.setDeviceuuid(deviceUUIds);
 
         return ApiResult.success(remoteTsdbProxyService.queryLastDeviceData(reqVO));
     }
 
-    /**
-     * 单个设备历史数据查询(对外)
-     * @param productCode
-     * @param deviceId
-     * @param startTime
-     * @param endTime
-     * @return
-     */
-    @GetMapping("/externalHistory")
-    public ApiResult<HistoryResultVO> queryHistoryDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                             @RequestParam(value = "deviceId") String deviceId,
-                                                             @RequestParam(value = "startTime") String startTime,
-                                                             @RequestParam(value = "endTime") String endTime){
-        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
-        if(!deviceMapList.containsKey(deviceId)){
-            throw new BusinessException(deviceId+"无设备信息");
-        }
-
-        String deviceUUId = "";
-
-        for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
-            String productCode1 = map.getValue().getProductCode();
-            String deviceId1 = map.getKey();
-            if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
-                deviceUUId = map.getValue().getDeviceUuid();
-            }
-        }
-
-        return ApiResult.success(remoteTsdbProxyService.queryHistoryDeviceData(deviceUUId,startTime,endTime));
-    }
+//    /**
+//     * 单个设备历史数据查询(对外)
+//     * @param productCode
+//     * @param deviceId
+//     * @param startTime
+//     * @param endTime
+//     * @return
+//     */
+//    @GetMapping("/externalHistory")
+//    public ApiResult<HistoryResultVO> queryHistoryDeviceData(@RequestParam(value = "productCode") String productCode,
+//                                                             @RequestParam(value = "deviceId") String deviceId,
+//                                                             @RequestParam(value = "startTime") String startTime,
+//                                                             @RequestParam(value = "endTime") String endTime){
+//        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
+//        if(!deviceMapList.containsKey(deviceId)){
+//            throw new BusinessException(deviceId+"无设备信息");
+//        }
+//
+//        String deviceUUId = "";
+//
+//        for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+//            String productCode1 = map.getValue().getProductCode();
+//            String deviceId1 = map.getKey();
+//            if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+//                deviceUUId = map.getValue().getDeviceUuid();
+//            }
+//        }
+//
+//        return ApiResult.success(remoteTsdbProxyService.queryHistoryDeviceData(deviceUUId,startTime,endTime));
+//    }
 
     /**
      * 批量设备历史数据查询(对外)
@@ -157,9 +154,9 @@ public class QueryDeviceDataController {
      * @return
      */
     @PostMapping("/externalHistory")
-    public ApiResult<List<HistoryResultVO>> queryHistoryDeviceData(@RequestBody ExternalHistoryRequestVO requestVO){
+    public ApiResult<List<HistorysInnerResultVO>> queryHistoryDeviceData(@RequestBody ExternalHistoryRequestVO requestVO){
         Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
-        HistoryRequestVO reqVO = new HistoryRequestVO();
+        HistorysInnerRequestVO reqVO = new HistorysInnerRequestVO();
 
         String productCode = requestVO.getProductCode();
         List<String> deviceIds = requestVO.getDeviceId();
@@ -179,7 +176,7 @@ public class QueryDeviceDataController {
                 }
             }
         }
-        reqVO.setDeviceUUId(deviceUUIds);
+        reqVO.setDeviceuuid(deviceUUIds);
         reqVO.setStartTime(requestVO.getStartTime());
         reqVO.setEndTime(requestVO.getEndTime());
 

+ 52 - 25
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java

@@ -13,11 +13,11 @@ import java.util.Map;
 @FeignClient(contextId = "remoteTsdbProxyService", value = "data-tsdb-proxy", fallbackFactory = RemoteTsdbProxyFallbackFactory.class)
 public interface RemoteTsdbProxyService {
 
-    /**
-     * 设备实时数据查询
-     */
-    @PostMapping("/last")
-    List<LastInnerResultVO> last(@RequestBody LastInnerQueryVO requestVO);
+//    /**
+//     * 设备实时数据查询
+//     */
+//    @PostMapping("/last")
+//    List<LastInnerResultVO> last(@RequestBody LastInnerQueryVO requestVO);
 
     /**
      * 查询influxdb所有表实时数据
@@ -26,13 +26,13 @@ public interface RemoteTsdbProxyService {
     @PostMapping("getAllDeviceRealTime")
     ApiResult<List<Map<String, Object>>> getAllDeviceRealTime();
 
-    /**
-     * 单个设备实时数据查询
-     * @param deviceUUId
-     * @return
-     */
-    @GetMapping("/last")
-    LastResultVO queryLastDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId);
+//    /**
+//     * 单个设备实时数据查询
+//     * @param deviceUUId
+//     * @return
+//     */
+//    @GetMapping("/last")
+//    LastResultVO queryLastDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId);
 
     /**
      * 批量设备实时数据查询
@@ -40,19 +40,19 @@ public interface RemoteTsdbProxyService {
      * @return
      */
     @PostMapping("/externalLast")
-    List<LastResultVO> queryLastDeviceData(@RequestBody LastRequestVO requestVO);
+    List<LastInnerResultVO> queryLastDeviceData(@RequestBody LastInnerQueryVO requestVO);
 
-    /**
-     * 单个设备历史数据查询
-     * @param deviceUUId
-     * @param startTime
-     * @param endTime
-     * @return
-     */
-    @GetMapping("/history")
-    HistoryResultVO queryHistoryDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId,
-                                                             @RequestParam(value = "startTime") String startTime,
-                                                             @RequestParam(value = "endTime") String endTime);
+//    /**
+//     * 单个设备历史数据查询
+//     * @param deviceUUId
+//     * @param startTime
+//     * @param endTime
+//     * @return
+//     */
+//    @GetMapping("/history")
+//    HistoryResultVO queryHistoryDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId,
+//                                                             @RequestParam(value = "startTime") String startTime,
+//                                                             @RequestParam(value = "endTime") String endTime);
 
     /**
      * 批量设备历史数据查询
@@ -60,5 +60,32 @@ public interface RemoteTsdbProxyService {
      * @return
      */
     @PostMapping("/history")
-    List<HistoryResultVO> queryHistoryDeviceData(@RequestBody HistoryRequestVO requestVO);
+    List<HistorysInnerResultVO> queryHistoryDeviceData(@RequestBody HistorysInnerRequestVO requestVO);
+
+
+    /**
+     * 创建超级表和列
+     * @param superTableDTO 超级表信息
+     * @return
+     */
+    @PostMapping("/createSuperTableAndColumn")
+    Void createSuperTableAndColumn(@RequestBody SuperTableDTO superTableDTO);
+
+    /**
+     * 超级表新增字段
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
+     */
+    @PostMapping("/addSuperTableColumn")
+    Void addSuperTableColumn(@RequestBody SuperTableDTO superTableDTO) ;
+
+    /**
+     * 超级表删除字段
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
+     */
+    @PostMapping("/dropSuperTableColumn")
+    Void dropSuperTableColumn(@RequestBody SuperTableDTO superTableDTO);
 }

+ 34 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/constant/TdsConstants.java

@@ -0,0 +1,34 @@
+package com.usky.demo.constant;
+
+/**
+ * 全局基础常量
+ *
+ * @author mqttsnet
+ * @version v1.0
+ * @date 2022/4/12 11:05 PM
+ * @create [2022/4/12 11:05 PM ] [mqttsnet] [初始创建]
+ */
+public interface TdsConstants {
+
+    String DATA_BASE = "thingdata";
+
+    /**
+     * 时序数据库主键ID
+     */
+    String TS = "ts";
+
+    /**
+     * 时序数据库标签
+     */
+    String TAG = "TAG";
+
+    /**
+     * 事件上报时间
+     */
+    String EVENT_TIME = "event_time";
+
+    /**
+     * TAG ——》设备标签名
+     */
+    String DEVICE_IDENTIFICATION = "device_id";
+}

+ 59 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/DataTypeEnum.java

@@ -0,0 +1,59 @@
+package com.usky.demo.domain;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Getter;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Getter
+@ApiModel(value = "DataTypeEnum", description = "TD数据类型-枚举")
+public enum DataTypeEnum {
+    /**
+     * 时间戳 缺省精度毫秒(格林威治时间开始)
+     */
+    TIMESTAMP("timestamp", false),
+
+    /**
+     * 整形 范围[-2^31+1, 2^31-1] -2^31用作于NULL
+     */
+    INT("int", false),
+
+    /**
+     * 双精度浮点型 有效位数15-16 范围[-1.7E308, 1.7E308]
+     */
+    DOUBLE("double", false),
+
+    /**
+     * 记录包含多字节字符在内的字符串(如中文字符)最大长度4093
+     */
+    VARCHAR("varchar", true),
+
+    /**
+     * 布尔型 {true, false}
+     */
+    BOOL("bool", false);
+
+    private final String dataType;
+    private final boolean quoted;
+
+
+    /**
+     * 判断类型是否一致
+     *
+     * @param otherDataType
+     * @return {@link Boolean} ture|false
+     */
+    public boolean isTypeEqual(String otherDataType) {
+        return Optional.ofNullable(otherDataType)
+                .map(String::trim)
+                .map(this.dataType::equalsIgnoreCase)
+                .orElse(false);
+    }
+
+    DataTypeEnum(String dataType, boolean quoted) {
+        this.dataType = dataType;
+        this.quoted = quoted;
+    }
+
+}

+ 29 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/DeviceRealDataVO.java

@@ -0,0 +1,29 @@
+package com.usky.demo.domain;
+
+import cn.hutool.core.date.DateTime;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+@Data
+public class DeviceRealDataVO implements Serializable {
+    /**
+     * 数据时间
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime ts;
+
+    private String deviceUuid;
+    /**
+     * 插入时间
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime realtime;
+
+
+    private String deviceId;
+
+
+}

+ 30 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/Fields.java

@@ -0,0 +1,30 @@
+package com.usky.demo.domain;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class Fields implements Serializable {
+
+    /**
+     * 字段名称
+     */
+    private String fieldName;
+
+    /**
+     * 字段值
+     */
+    private Object fieldValue;
+
+    /**
+     * 字段数据类型
+     */
+    private DataTypeEnum dataType;
+
+    /**
+     * 字段字节大小
+     */
+    private Integer size;
+
+}

+ 57 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/FieldsVO.java

@@ -0,0 +1,57 @@
+package com.usky.demo.domain;
+
+import cn.hutool.core.bean.BeanUtil;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Data
+public class FieldsVO implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 字段名称
+     */
+    private String fieldName;
+
+    /**
+     * 字段数据类型
+     */
+    private String dataType;
+
+    /**
+     * 字段字节大小
+     */
+    private Integer size;
+
+    public FieldsVO() {
+    }
+
+    public FieldsVO(String fieldName) {
+        this.fieldName = fieldName;
+    }
+
+    public FieldsVO(String fieldName, String dataType) {
+        this.fieldName = fieldName;
+        this.dataType = dataType;
+    }
+
+    public FieldsVO(String fieldName, String dataType, Integer size) {
+        this.fieldName = fieldName;
+        this.dataType = dataType;
+        this.size = size;
+    }
+
+    public static List<Fields> toFieldsList(List<FieldsVO> fieldsVOList) {
+        return fieldsVOList.stream()
+                .map(fieldsVO -> BeanUtil.toBeanIgnoreError(fieldsVO, Fields.class))
+                .collect(Collectors.toList());
+    }
+
+    public static Fields toFields(FieldsVO fieldsVo) {
+        return BeanUtil.toBeanIgnoreError(fieldsVo, Fields.class);
+    }
+}

+ 45 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/SuperTableDTO.java

@@ -0,0 +1,45 @@
+package com.usky.demo.domain;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@Builder
+public class SuperTableDTO implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    /**
+     * 超级表的表结构(业务相关) 第一个字段的数据类型必须为timestamp 字符相关数据类型必须指定大小 字段名称和字段数据类型不能为空
+     */
+    private List<Fields> schemaFields;
+
+    /**
+     * 超级表的标签字段,可以作为子表在超级表里的标识 字符相关数据类型必须指定大小 字段名称和字段数据类型不能为空
+     */
+    private List<Fields> tagsFields;
+
+    /**
+     * 字段信息对象,超级表添加列时使用该属性
+     */
+    private Fields fields;
+
+    /**
+     * 超级表名称
+     */
+    private String superTableName;
+
+    public SuperTableDTO() {
+
+    }
+
+    public SuperTableDTO(List<Fields> schemaFields, List<Fields> tagsFields, Fields fields, String superTableName) {
+        this.schemaFields = schemaFields;
+        this.tagsFields = tagsFields;
+        this.fields = fields;
+        this.superTableName = superTableName;
+    }
+}

+ 52 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/SuperTableDescribeVO.java

@@ -0,0 +1,52 @@
+package com.usky.demo.domain;
+
+import cn.hutool.core.map.MapUtil;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @program: thinglinks
+ * @description: 超级表结构VO
+ * @packagename: com.mqttsnet.thinglinks.tds.vo.result
+ * @author: ShiHuan Sun
+ * @e-mainl: 13733918655@163.com
+ * @date: 2023-09-17 21:12
+ **/
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@ToString(callSuper = true)
+@Accessors(chain = true)
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@ApiModel(value = "SuperTableDescribeVO", description = "超级表结构VO")
+public class SuperTableDescribeVO implements Serializable {
+
+    /**
+     * 标记
+     */
+    @ApiModelProperty(value = "标记")
+    private String note;
+    /**
+     * 字段名
+     */
+    @ApiModelProperty(value = "字段名")
+    private String field;
+    /**
+     * 字段长度
+     */
+    @ApiModelProperty(value = "字段长度")
+    private Integer length;
+    /**
+     * 字段类型
+     */
+    @ApiModelProperty(value = "字段类型")
+    private String type;
+
+
+}

+ 32 - 14
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java

@@ -35,44 +35,62 @@ public class RemoteTsdbProxyFallbackFactory implements FallbackFactory<RemoteTsd
 //            @Override
 //            public ApiResult<Void> sendDeviceData(DeviceDataWriteVO writeVO) {
 //                throw new BusinessException(throwable.getMessage());
+//            }
+
+//            @Override
+//            public List<LastInnerResultVO> last(LastInnerQueryVO requestVO)
+//            {
+//                throw new BusinessException("设备实时数据查询:" + throwable.getMessage());
 //            }
 
             @Override
-            public List<LastInnerResultVO> last(LastInnerQueryVO requestVO)
+            public ApiResult<List<Map<String, Object>>> getAllDeviceRealTime()
             {
-                throw new BusinessException("设备实时数据查询:" + throwable.getMessage());
+                throw new BusinessException("查询influxdb所有表实时数据:" + throwable.getMessage());
             }
 
+//            @Override
+//            public LastResultVO queryLastDeviceData(String deviceUUId)
+//            {
+//                throw new BusinessException("单个设备实时数据查询:" + throwable.getMessage());
+//            }
+
             @Override
-            public ApiResult<List<Map<String, Object>>> getAllDeviceRealTime()
+            public List<LastInnerResultVO> queryLastDeviceData(LastInnerQueryVO requestVO)
             {
-                throw new BusinessException("查询influxdb所有表实时数据:" + throwable.getMessage());
+                throw new BusinessException("批量设备实时数据查询:" + throwable.getMessage());
             }
 
+//            @Override
+//            public HistoryResultVO queryHistoryDeviceData(String deviceUUId,
+//                                                                     String startTime,
+//                                                                     String endTime)
+//            {
+//                throw new BusinessException("单个设备历史数据查询:" + throwable.getMessage());
+//            }
+
             @Override
-            public LastResultVO queryLastDeviceData(String deviceUUId)
+            public List<HistorysInnerResultVO> queryHistoryDeviceData(HistorysInnerRequestVO requestVO)
             {
-                throw new BusinessException("单个设备实时数据查询:" + throwable.getMessage());
+                throw new BusinessException("批量设备历史数据查询:" + throwable.getMessage());
             }
 
             @Override
-            public List<LastResultVO> queryLastDeviceData(LastRequestVO requestVO)
+            public Void createSuperTableAndColumn(SuperTableDTO superTableDTO)
             {
-                throw new BusinessException("批量设备实时数据查询:" + throwable.getMessage());
+                throw new BusinessException("创建超级表和列:" + throwable.getMessage());
             }
 
             @Override
-            public HistoryResultVO queryHistoryDeviceData(String deviceUUId,
-                                                                     String startTime,
-                                                                     String endTime)
+            public Void addSuperTableColumn(SuperTableDTO superTableDTO)
             {
-                throw new BusinessException("单个设备历史数据查询:" + throwable.getMessage());
+                throw new BusinessException("添加超级表列:" + throwable.getMessage());
             }
 
             @Override
-            public List<HistoryResultVO> queryHistoryDeviceData(HistoryRequestVO requestVO)
+            public Void dropSuperTableColumn(SuperTableDTO superTableDTO)
             {
-                throw new BusinessException("批量设备历史数据查询:" + throwable.getMessage());
+                throw new BusinessException("删除超级表列:" + throwable.getMessage());
             }
         };
     }

+ 6 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/pom.xml

@@ -52,6 +52,12 @@
             <artifactId>data-tsdb-proxy-api</artifactId>
             <version>0.0.1</version>
         </dependency>
+        <!-- MyBatis -->
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>2.3.0</version>
+        </dependency>
         <dependency>
             <groupId>com.taosdata.jdbc</groupId>
             <artifactId>taos-jdbcdriver</artifactId>

+ 93 - 29
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java

@@ -10,10 +10,12 @@ import com.usky.common.core.util.GlobalUtils;
 import com.usky.demo.RemoteTsdbProxyService;
 import com.usky.demo.domain.*;
 import com.usky.demo.service.QueryInfluxdbDataService;
+import com.usky.demo.service.QueryTdengineDataService;
 import com.usky.demo.service.SysUserService;
 import com.usky.system.RemoteUserService;
 import com.usky.system.domain.SysUserVO;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartFile;
 
@@ -26,70 +28,132 @@ import java.util.Optional;
 
 @RestController
 public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
+    @Value("${spring.sourcetype}")
+    private String sourcetype;
 
     @Autowired
     private QueryInfluxdbDataService queryInfluxdbDataService;
+    @Autowired
+    private QueryTdengineDataService queryTdengineDataService;
+
+//    /**
+//     * 获取设备实时数据(对内)
+//     * @param requestVO
+//     * @return
+//     */
+//    @Override
+//    public List<LastInnerResultVO> last(LastInnerQueryVO requestVO){
+//        return queryInfluxdbDataService.last(requestVO);
+//    }
 
     /**
-     * 获取设备实时数据(对内)
-     * @param requestVO
+     * 查询influxdb所有表实时数据
      * @return
      */
     @Override
-    public List<LastInnerResultVO> last(LastInnerQueryVO requestVO){
-        return queryInfluxdbDataService.last(requestVO);
+    public ApiResult<List<Map<String, Object>>> getAllDeviceRealTime(){
+        if(sourcetype.equals("taos")){
+            return ApiResult.success(queryTdengineDataService.getAllDeviceRealTime());
+        }else{
+            return ApiResult.success(queryInfluxdbDataService.getAllDeviceRealTime());
+        }
     }
 
+//    /**
+//     * 单个设备实时数据查询
+//     * @param deviceUUId
+//     * @return
+//     */
+//    @Override
+//    public LastResultVO queryLastDeviceData(String deviceUUId){
+//        return queryInfluxdbDataService.queryLastDeviceData(deviceUUId);
+//    }
+
     /**
-     * 查询influxdb所有表实时数据
+     * 批量设备实时数据查询
+     * @param requestVO
      * @return
      */
     @Override
-    public ApiResult<List<Map<String, Object>>> getAllDeviceRealTime(){
-        return ApiResult.success(queryInfluxdbDataService.getAllDeviceRealTime());
+    public List<LastInnerResultVO> queryLastDeviceData(LastInnerQueryVO requestVO){
+        if(sourcetype.equals("taos")){
+            return queryTdengineDataService.tdengineLast(requestVO);
+        }else{
+            return queryInfluxdbDataService.last(requestVO);
+        }
     }
 
+//    /**
+//     * 单个设备历史数据查询
+//     * @param deviceUUId
+//     * @param startTime
+//     * @param endTime
+//     * @return
+//     */
+//    @Override
+//    public HistoryResultVO queryHistoryDeviceData(String deviceUUId,
+//                                                             String startTime,
+//                                                             String endTime){
+//        return queryInfluxdbDataService.queryHistoryDeviceData(deviceUUId,startTime,endTime);
+//    }
+
     /**
-     * 单个设备实时数据查询
-     * @param deviceUUId
+     * 批量设备历史数据查询
+     * @param requestVO
      * @return
      */
     @Override
-    public LastResultVO queryLastDeviceData(String deviceUUId){
-        return queryInfluxdbDataService.queryLastDeviceData(deviceUUId);
+    public List<HistorysInnerResultVO> queryHistoryDeviceData(HistorysInnerRequestVO requestVO){
+        if(sourcetype.equals("taos")) {
+            return queryTdengineDataService.tdengineHistory(requestVO);
+        }else {
+            return queryInfluxdbDataService.history(requestVO);
+        }
     }
 
+
     /**
-     * 批量设备实时数据查询
-     * @param requestVO
+     * 创建超级表和列
+     * @param superTableDTO 超级表信息
      * @return
      */
     @Override
-    public List<LastResultVO> queryLastDeviceData(LastRequestVO requestVO){
-        return queryInfluxdbDataService.queryLastDeviceData(requestVO);
+    public Void createSuperTableAndColumn(@RequestBody SuperTableDTO superTableDTO){
+        if(sourcetype.equals("taos")){
+            queryTdengineDataService.createSuperTableAndColumn(superTableDTO);
+        }
+
+        return null;
     }
 
     /**
-     * 单个设备历史数据查询
-     * @param deviceUUId
-     * @param startTime
-     * @param endTime
-     * @return
+     * 超级表新增字段
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
      */
     @Override
-    public HistoryResultVO queryHistoryDeviceData(String deviceUUId,
-                                                             String startTime,
-                                                             String endTime){
-        return queryInfluxdbDataService.queryHistoryDeviceData(deviceUUId,startTime,endTime);
+    public Void addSuperTableColumn(@RequestBody SuperTableDTO superTableDTO) {
+        if(sourcetype.equals("taos")){
+            queryTdengineDataService.addSuperTableColumn(superTableDTO.getSuperTableName(), superTableDTO.getFields());
+        }
+
+        return null;
     }
 
     /**
-     * 批量设备历史数据查询
-     * @param requestVO
-     * @return
+     * 超级表删除字段
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
      */
     @Override
-    public List<HistoryResultVO> queryHistoryDeviceData(HistoryRequestVO requestVO){
-        return queryInfluxdbDataService.queryHistoryDeviceData(requestVO);
+    public Void dropSuperTableColumn(@RequestBody SuperTableDTO superTableDTO) {
+        if(sourcetype.equals("taos")){
+            queryTdengineDataService.dropSuperTableColumn(superTableDTO.getSuperTableName(), superTableDTO.getFields());
+        }
+
+        return null;
     }
+
 }

+ 81 - 1
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java

@@ -1,11 +1,16 @@
 package com.usky.demo.controller.web;
 
 
+import cn.hutool.core.util.StrUtil;
 import com.usky.common.core.bean.ApiResult;
+import com.usky.common.core.domain.R;
+import com.usky.demo.constant.TdsConstants;
 import com.usky.demo.domain.*;
 import com.usky.demo.service.QueryInfluxdbDataService;
 import com.usky.demo.service.QueryTdengineDataService;
+import com.usky.demo.service.vo.SuperTableVO;
 import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.*;
@@ -67,7 +72,82 @@ public class QueryInfluxdbDataController {
      */
     @PostMapping("getAllDeviceRealTime")
     public ApiResult<List<Map<String, Object>>> getAllDeviceRealTime(){
-        return ApiResult.success(queryInfluxdbDataService.getAllDeviceRealTime());
+        if(sourcetype.equals("taos")){
+            return ApiResult.success(queryTdengineDataService.getAllDeviceRealTime());
+        }else{
+            return ApiResult.success(queryInfluxdbDataService.getAllDeviceRealTime());
+        }
+    }
+
+    /**
+     * 创建超级表和列
+     * @param superTableDTO 超级表信息
+     * @return
+     */
+    @PostMapping("/createSuperTableAndColumn")
+    public ApiResult<Void> createSuperTableAndColumn(@RequestBody SuperTableDTO superTableDTO){
+        queryTdengineDataService.createSuperTableAndColumn(superTableDTO);
+        return ApiResult.success();
+    }
+
+    /**
+     * 超级表新增字段
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
+     */
+    @PostMapping("/addSuperTableColumn")
+    public ApiResult<Void> addSuperTableColumn(@RequestBody SuperTableDTO superTableDTO) {
+        queryTdengineDataService.addSuperTableColumn(superTableDTO.getSuperTableName(), superTableDTO.getFields());
+        return ApiResult.success();
+    }
+
+    /**
+     * 超级表删除字段
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
+     */
+    @PostMapping("/dropSuperTableColumn")
+    public ApiResult<Void> dropSuperTableColumn(@RequestBody SuperTableDTO superTableDTO) {
+        queryTdengineDataService.dropSuperTableColumn(superTableDTO.getSuperTableName(), superTableDTO.getFields());
+        return ApiResult.success();
+    }
+
+    /**
+     * 超级表新增标签
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
+     */
+    @PostMapping("/addSuperTableTag")
+    public ApiResult<Void> addSuperTableTag(@RequestBody SuperTableDTO superTableDTO) {
+        queryTdengineDataService.addSuperTableTag(superTableDTO.getSuperTableName(), superTableDTO.getFields());
+        return ApiResult.success();
+    }
+
+    /**
+     * 超级表删除标签
+     *
+     * @param superTableDTO 数据信息
+     * @return 执行结果
+     */
+    @PostMapping("/dropSuperTableTag")
+    public ApiResult<Void> dropSuperTableTag(@RequestBody SuperTableDTO superTableDTO) {
+        queryTdengineDataService.dropSuperTableTag(superTableDTO.getSuperTableName(), superTableDTO.getFields());
+        return ApiResult.success();
+    }
+
+    /**
+     * 查询超级表、子表结构
+     *
+     * @param tableName
+     * @return
+     */
+    @GetMapping("/describeSuperOrSubTable")
+    public ApiResult<List<SuperTableDescribeVO>> describeSuperOrSubTable(@RequestParam(value = "tableName") String tableName) {
+        List<SuperTableDescribeVO> superTableDescribeVOS = queryTdengineDataService.describeSuperOrSubTable( tableName);
+        return ApiResult.success(superTableDescribeVOS);
     }
 
 }

+ 60 - 1
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/mapper/QueryTdengineDataMapper.java

@@ -1,7 +1,14 @@
 package com.usky.demo.mapper;
 
 import com.usky.common.mybatis.core.CrudMapper;
-import com.usky.demo.domain.QueryTdengineData;
+import com.usky.demo.domain.*;
+import com.usky.demo.service.vo.SuperTableVO;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+import java.util.Map;
 
 /**
  * <p>
@@ -11,6 +18,58 @@ import com.usky.demo.domain.QueryTdengineData;
  * @author ya
  * @since 2024-07-29
  */
+@Repository
 public interface QueryTdengineDataMapper extends CrudMapper<QueryTdengineData> {
 
+    /**
+     * 查询所有设备实时数据
+     *
+     * @return
+     * */
+    List<DeviceRealDataVO> getAllDeviceRealTime();
+
+    /**
+     * 创建超级表及字段
+     *
+     * @param superTableDTO
+     */
+    void createSuperTableAndColumn(SuperTableDTO superTableDTO);
+    /**
+     * 新增字段
+     *
+     * @param superTableName
+     * @param fields
+     */
+    void addSuperTableColumn(@Param("superTableName") String superTableName, @Param("fields") Fields fields);
+
+    /**
+     * 删除字段
+     *
+     * @param superTableName
+     * @param fields
+     */
+    void dropSuperTableColumn(@Param("superTableName") String superTableName, @Param("fields") Fields fields);
+
+    /**
+     * 查询表结构
+     *
+     * @param tableName
+     */
+    List<SuperTableDescribeVO> describeSuperOrSubTable(@Param("tableName") String tableName);
+
+    /**
+     * 添加标签
+     *
+     * @param superTableName
+     * @param fields
+     */
+    void addSuperTableTag(@Param("superTableName") String superTableName, @Param("fields") Fields fields);
+
+    /**
+     * 删除标签
+     *
+     * @param superTableName
+     * @param fields
+     */
+    void dropSuperTableTag(@Param("superTableName") String superTableName, @Param("fields") Fields fields);
 }

+ 17 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryTdengineDataService.java

@@ -2,6 +2,10 @@ package com.usky.demo.service;
 
 import com.usky.common.mybatis.core.CrudService;
 import com.usky.demo.domain.*;
+import com.usky.demo.service.vo.SuperTableVO;
+import org.apache.ibatis.annotations.Param;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestParam;
 
 import java.util.List;
 import java.util.Map;
@@ -21,4 +25,17 @@ public interface QueryTdengineDataService extends CrudService<QueryTdengineData>
     List<LastInnerResultVO> tdengineLast(LastInnerQueryVO requestVO);
     List<HistorysInnerResultVO> tdengineHistory(HistorysInnerRequestVO requestVO);
     // 对内接口  end
+    List<Map<String, Object>> getAllDeviceRealTime();
+
+    void createSuperTableAndColumn(SuperTableDTO superTableDTO);
+
+    void addSuperTableColumn(String superTableName,Fields fields);
+
+    void dropSuperTableColumn(String superTableName, Fields fields);
+
+    void addSuperTableTag(String superTableName, Fields fields);
+
+    void dropSuperTableTag(String superTableName, Fields fields);
+
+    List<SuperTableDescribeVO> describeSuperOrSubTable(String tableName);
 }

+ 318 - 131
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryTdengineDataServiceImpl.java

@@ -44,6 +44,8 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
     private TsdbUtils tsdbUtils;
     @Autowired
     private DataSource dataSource;
+    @Autowired
+    private QueryTdengineDataMapper queryTdengineDataMapper;
 
     @Override
     public List<LastInnerResultVO> tdengineLast(LastInnerQueryVO requestVO){
@@ -55,54 +57,118 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
             Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement();
 
-            // 将查询数据整理到各个超级表的数据集合中
-            List<SuperTableVO> supertablelist = new ArrayList<>();
-
+//            // 将查询数据整理到各个超级表的数据集合中
+//            List<SuperTableVO> supertablelist = new ArrayList<>();
+//
+//            for (int i = 0; i < deviceUUIds.size(); i++) {
+//                String superTablename = deviceUUIds.get(i);
+//                if(supertablelist.stream().noneMatch(item -> item.getSuperTableName().equals(superTablename))){
+//                    SuperTableVO superTableVO = new SuperTableVO();
+//                    superTableVO.setSuperTableName(superTablename);
+//                    List<String> tableNames = new ArrayList<>();
+//                    tableNames.add(deviceUUIds.get(i));
+//                    superTableVO.setTableNames(tableNames);
+//                    supertablelist.add(superTableVO);
+//                }else{
+//                    SuperTableVO superTableVO = supertablelist.stream().filter(item -> item.getSuperTableName().equals(superTablename)).findFirst().get();
+//                    List<String> tableNames = superTableVO.getTableNames();
+//                    tableNames.add(deviceUUIds.get(i));
+//                    superTableVO.setTableNames(tableNames);
+//                }
+//            }
+//
+//            // 遍历超级表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
+//            for (int i = 0; i < supertablelist.size(); i++) {
+//                String superTablename = supertablelist.get(i).getSuperTableName();
+//
+//                String sql = "describe " + superTablename;
+//                ResultSet resultSet = statement.executeQuery(sql);
+//                List<Map<String, Object>> descList = new ArrayList<>();
+//                while (resultSet.next()) {
+//                    Map<String, Object> map = new HashMap<>();
+//                    map.put("field", resultSet.getString("field"));
+//                    map.put("note", resultSet.getString("note"));
+//                    descList.add(map);
+//                }
+//                resultSet.close();
+//
+//                String tagStr = "";
+//                List<String> taglist = new ArrayList<>();
+//                List<String> fieldKeys = new ArrayList<>();
+//                for(Map<String, Object> map: descList){
+//                    String note = map.get("note").toString();
+//                    if("TAG".equals(note)){
+//                        tagStr = tagStr.concat(map.get("field").toString()+",");
+//                        taglist.add(map.get("field").toString());
+//                    }else{
+//                        fieldKeys.add(map.get("field").toString());
+//                    }
+//
+//                }
+//
+//                // 过滤有效字段
+//                List<String> filterValidFields = new ArrayList<>();
+//                if(CollectionUtils.isNotEmpty(metrics)){
+//                    filterValidFields = metrics.stream().filter(item -> fieldKeys.contains(item)).collect(Collectors.toList());
+//                }else{
+//                    filterValidFields = fieldKeys;
+//                }
+//                String filterValidFieldStr = filterValidFields.stream().collect(Collectors.joining(","));
+//                if(CollectionUtils.isEmpty(filterValidFields)){
+//                    throw new BusinessException(superTablename+"没有可用属性,请添加");
+//                }
+//
+//                String tableNames = "";
+//                for (int j = 0; j < supertablelist.get(i).getTableNames().size(); j++) {
+//                    if(j == 0){
+//                        tableNames = "'"+supertablelist.get(i).getTableNames().get(j)+"'";
+//                    }else{
+//                        tableNames = tableNames.concat(",'"+supertablelist.get(i).getTableNames().get(j)+"'");
+//                    }
+//                }
+//
+//                String query = "SELECT tbname,ts,"+tagStr+filterValidFieldStr+" FROM "+superTablename+" where tbname in ("+tableNames+") group by tbname having ts = last(ts)";
+//                ResultSet metricList = statement.executeQuery(query);
+//                while (metricList.next()) {
+//                    LastInnerResultVO resultVO = new LastInnerResultVO();
+//                    resultVO.setDeviceuuid(metricList.getString("tbname"));
+//                    Map<String, String> tag = new HashMap<String,String>();
+//                    for(int k=0;k<taglist.size();k++){
+//                        tag.put(taglist.get(k),metricList.getString(taglist.get(k)));
+//                    }
+//                    resultVO.setTags(tag);
+//
+//                    Map<String, Object> field = new HashMap<String,Object>();
+//                    field.put("time",metricList.getString("ts"));
+//                    for(int k =0;k<filterValidFields.size();k++){
+//                        field.put(filterValidFields.get(k),metricList.getString(filterValidFields.get(k)));
+//                    }
+//                    resultVO.setMetrics(field);
+//                    list.add(resultVO);
+//                }
+//                metricList.close();
+//            }
+
+
+            // 遍历设备表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
             for (int i = 0; i < deviceUUIds.size(); i++) {
-                String superTablename = deviceUUIds.get(i).substring(0,deviceUUIds.get(i).lastIndexOf("_"));
-                if(supertablelist.stream().noneMatch(item -> item.getSuperTableName().equals(superTablename))){
-                    SuperTableVO superTableVO = new SuperTableVO();
-                    superTableVO.setSuperTableName(superTablename);
-                    List<String> tableNames = new ArrayList<>();
-                    tableNames.add(deviceUUIds.get(i));
-                    superTableVO.setTableNames(tableNames);
-                    supertablelist.add(superTableVO);
-                }else{
-                    SuperTableVO superTableVO = supertablelist.stream().filter(item -> item.getSuperTableName().equals(superTablename)).findFirst().get();
-                    List<String> tableNames = superTableVO.getTableNames();
-                    tableNames.add(deviceUUIds.get(i));
-                    superTableVO.setTableNames(tableNames);
-                }
-            }
+                String tablename = "_"+deviceUUIds.get(i);
 
-            // 遍历超级表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
-            for (int i = 0; i < supertablelist.size(); i++) {
-                String superTablename = supertablelist.get(i).getSuperTableName();
-
-                String sql = "describe " + superTablename;
+                String sql = "describe " + tablename;
                 ResultSet resultSet = statement.executeQuery(sql);
-                List<Map<String, Object>> descList = new ArrayList<>();
-                while (resultSet.next()) {
-                    Map<String, Object> map = new HashMap<>();
-                    map.put("field", resultSet.getString("field"));
-                    map.put("note", resultSet.getString("note"));
-                    descList.add(map);
-                }
-                resultSet.close();
-
                 String tagStr = "";
                 List<String> taglist = new ArrayList<>();
                 List<String> fieldKeys = new ArrayList<>();
-                for(Map<String, Object> map: descList){
-                    String note = map.get("note").toString();
+                while (resultSet.next()) {
+                    String note = resultSet.getString("note");
                     if("TAG".equals(note)){
-                        tagStr = tagStr.concat(map.get("field").toString()+",");
-                        taglist.add(map.get("field").toString());
+                        tagStr = tagStr.concat(resultSet.getString("field")+",");
+                        taglist.add(resultSet.getString("field"));
                     }else{
-                        fieldKeys.add(map.get("field").toString());
+                        fieldKeys.add(resultSet.getString("field"));
                     }
-
                 }
+                resultSet.close();
 
                 // 过滤有效字段
                 List<String> filterValidFields = new ArrayList<>();
@@ -113,23 +179,14 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
                 }
                 String filterValidFieldStr = filterValidFields.stream().collect(Collectors.joining(","));
                 if(CollectionUtils.isEmpty(filterValidFields)){
-                    throw new BusinessException(superTablename+"没有可用属性,请添加");
-                }
-
-                String tableNames = "";
-                for (int j = 0; j < supertablelist.get(i).getTableNames().size(); j++) {
-                    if(j == 0){
-                        tableNames = "'"+supertablelist.get(i).getTableNames().get(j)+"'";
-                    }else{
-                        tableNames = tableNames.concat(",'"+supertablelist.get(i).getTableNames().get(j)+"'");
-                    }
+                    throw new BusinessException(tablename+"没有可用属性,请添加");
                 }
 
-                String query = "SELECT tbname,ts,"+tagStr+filterValidFieldStr+" FROM "+superTablename+" where tbname in ("+tableNames+") group by tbname having ts = last(ts)";
+                String query = "SELECT ts,"+tagStr+filterValidFieldStr+" FROM "+tablename+" order by ts desc limit 1";
                 ResultSet metricList = statement.executeQuery(query);
                 while (metricList.next()) {
                     LastInnerResultVO resultVO = new LastInnerResultVO();
-                    resultVO.setDeviceuuid(metricList.getString("tbname"));
+                    resultVO.setDeviceuuid(deviceUUIds.get(i));
                     Map<String, String> tag = new HashMap<String,String>();
                     for(int k=0;k<taglist.size();k++){
                         tag.put(taglist.get(k),metricList.getString(taglist.get(k)));
@@ -146,6 +203,7 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
                 }
                 metricList.close();
             }
+
             statement.close();
             connection.close();
 
@@ -170,54 +228,140 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
             Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement();
 
-            // 将查询数据整理到各个超级表的数据集合中
-            List<SuperTableVO> supertablelist = new ArrayList<>();
+//            // 将查询数据整理到各个超级表的数据集合中
+//            List<SuperTableVO> supertablelist = new ArrayList<>();
+//
+//            for (int i = 0; i < deviceUUIds.size(); i++) {
+//                String superTablename = deviceUUIds.get(i).substring(0,deviceUUIds.get(i).lastIndexOf("_"));
+//                if(supertablelist.stream().noneMatch(item -> item.getSuperTableName().equals(superTablename))){
+//                    SuperTableVO superTableVO = new SuperTableVO();
+//                    superTableVO.setSuperTableName(superTablename);
+//                    List<String> tableNames = new ArrayList<>();
+//                    tableNames.add(deviceUUIds.get(i));
+//                    superTableVO.setTableNames(tableNames);
+//                    supertablelist.add(superTableVO);
+//                }else{
+//                    SuperTableVO superTableVO = supertablelist.stream().filter(item -> item.getSuperTableName().equals(superTablename)).findFirst().get();
+//                    List<String> tableNames = superTableVO.getTableNames();
+//                    tableNames.add(deviceUUIds.get(i));
+//                    superTableVO.setTableNames(tableNames);
+//                }
+//            }
+//
+//            // 遍历超级表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
+//            for (int i = 0; i < supertablelist.size(); i++) {
+//                String superTablename = supertablelist.get(i).getSuperTableName();
+//
+//                String sql = "describe " + superTablename;
+//                ResultSet resultSet = statement.executeQuery(sql);
+//                List<Map<String, Object>> descList = new ArrayList<>();
+//                while (resultSet.next()) {
+//                    Map<String, Object> map = new HashMap<>();
+//                    map.put("field", resultSet.getString("field"));
+//                    map.put("note", resultSet.getString("note"));
+//                    descList.add(map);
+//                }
+//                resultSet.close();
+//
+//                String tagStr = "";
+//                List<String> taglist = new ArrayList<>();
+//                List<String> fieldKeys = new ArrayList<>();
+//                for(Map<String, Object> map: descList){
+//                    String note = map.get("note").toString();
+//                    if("TAG".equals(note)){
+//                        tagStr = tagStr.concat(map.get("field").toString()+",");
+//                        taglist.add(map.get("field").toString());
+//                    }else{
+//                        fieldKeys.add(map.get("field").toString());
+//                    }
+//
+//                }
+//
+//                // 过滤有效字段
+//                List<String> filterValidFields = new ArrayList<>();
+//                if(CollectionUtils.isNotEmpty(metrics)){
+//                    filterValidFields = metrics.stream().filter(item -> fieldKeys.contains(item)).collect(Collectors.toList());
+//                }else{
+//                    filterValidFields = fieldKeys;
+//                }
+//                String filterValidFieldStr = filterValidFields.stream().collect(Collectors.joining(","));
+//                if(CollectionUtils.isEmpty(filterValidFields)){
+//                    throw new BusinessException(superTablename+"没有可用属性,请添加");
+//                }
+//
+//                for (int j = 0; j < supertablelist.get(i).getTableNames().size(); j++) {
+//                    String tableName = supertablelist.get(i).getTableNames().get(j);
+//                    HistorysInnerResultVO resultVO = new HistorysInnerResultVO();
+//                    resultVO.setDeviceuuid(tableName);
+//
+//                    String query = "SELECT ts,"+tagStr+filterValidFieldStr+" FROM "+tableName+" where ts >= '"+startTime+"' and ts <= '"+endTime+"'";
+//                    ResultSet resultSet1 = statement.executeQuery(query);
+//                    int count = resultSet1.getMetaData().getColumnCount();
+//                    List<Map<String, Object>> resultList = new ArrayList<>();
+//                    while (resultSet1.next()) {
+//                        Map<String, Object> map = new HashMap<>();
+//                        for (int k = 1; k <= count; k++) {
+//                            String key = resultSet1.getMetaData().getColumnName(k);
+//                            Object value = resultSet1.getString(k);
+//                            map.put(key, value);
+//                        }
+//                        resultList.add(map);
+//                    }
+//                    resultSet1.close();
+//
+//                    if(CollectionUtils.isNotEmpty(resultList)){
+//                        Map<String, String> tag = new HashMap<>();
+//                        for (int k = 0; k < taglist.size(); k++) {
+//                            tag.put(taglist.get(k), resultList.get(0).get(taglist.get(k)).toString());
+//                        }
+//                        resultVO.setTags(tag);
+//
+//                        List<MetricVO> metricList = new ArrayList<>();
+//                        for (int h = 0; h < filterValidFields.size(); h++) {
+//                            if(!"ts".equals(filterValidFields.get(h))){
+//                                MetricVO metricVO = new MetricVO();
+//                                metricVO.setMetric(filterValidFields.get(h));
+//                                List<Map<String,Object>> metircItems = new ArrayList<>();
+//                                for (int k = 0; k < resultList.size(); k++) {
+//                                    String field = filterValidFields.get(h);
+//                                    Map<String,Object> map = new HashMap<>();
+//                                    map.put("timestamp",resultList.get(k).get("ts"));
+//                                    map.put("value",resultList.get(k).get(field));
+//                                    metircItems.add(map);
+//                                }
+//                                metricVO.setMetricItems(metircItems);
+//                                metricList.add(metricVO);
+//                            }
+//
+//                        }
+//                        resultVO.setMetrics(metricList);
+//                        list.add(resultVO);
+//                    }
+//
+//                }
+//
+//            }
 
-            for (int i = 0; i < deviceUUIds.size(); i++) {
-                String superTablename = deviceUUIds.get(i).substring(0,deviceUUIds.get(i).lastIndexOf("_"));
-                if(supertablelist.stream().noneMatch(item -> item.getSuperTableName().equals(superTablename))){
-                    SuperTableVO superTableVO = new SuperTableVO();
-                    superTableVO.setSuperTableName(superTablename);
-                    List<String> tableNames = new ArrayList<>();
-                    tableNames.add(deviceUUIds.get(i));
-                    superTableVO.setTableNames(tableNames);
-                    supertablelist.add(superTableVO);
-                }else{
-                    SuperTableVO superTableVO = supertablelist.stream().filter(item -> item.getSuperTableName().equals(superTablename)).findFirst().get();
-                    List<String> tableNames = superTableVO.getTableNames();
-                    tableNames.add(deviceUUIds.get(i));
-                    superTableVO.setTableNames(tableNames);
-                }
-            }
 
             // 遍历超级表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
-            for (int i = 0; i < supertablelist.size(); i++) {
-                String superTablename = supertablelist.get(i).getSuperTableName();
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                String tablename = "_"+deviceUUIds.get(i);
 
-                String sql = "describe " + superTablename;
+                String sql = "describe " + tablename;
                 ResultSet resultSet = statement.executeQuery(sql);
-                List<Map<String, Object>> descList = new ArrayList<>();
-                while (resultSet.next()) {
-                    Map<String, Object> map = new HashMap<>();
-                    map.put("field", resultSet.getString("field"));
-                    map.put("note", resultSet.getString("note"));
-                    descList.add(map);
-                }
-                resultSet.close();
-
                 String tagStr = "";
                 List<String> taglist = new ArrayList<>();
                 List<String> fieldKeys = new ArrayList<>();
-                for(Map<String, Object> map: descList){
-                    String note = map.get("note").toString();
+                while (resultSet.next()) {
+                    String note = resultSet.getString("note");
                     if("TAG".equals(note)){
-                        tagStr = tagStr.concat(map.get("field").toString()+",");
-                        taglist.add(map.get("field").toString());
+                        tagStr = tagStr.concat(resultSet.getString("field")+",");
+                        taglist.add(resultSet.getString("field"));
                     }else{
-                        fieldKeys.add(map.get("field").toString());
+                        fieldKeys.add(resultSet.getString("field"));
                     }
-
                 }
+                resultSet.close();
 
                 // 过滤有效字段
                 List<String> filterValidFields = new ArrayList<>();
@@ -228,61 +372,58 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
                 }
                 String filterValidFieldStr = filterValidFields.stream().collect(Collectors.joining(","));
                 if(CollectionUtils.isEmpty(filterValidFields)){
-                    throw new BusinessException(superTablename+"没有可用属性,请添加");
+                    throw new BusinessException(tablename+"没有可用属性,请添加");
                 }
 
-                for (int j = 0; j < supertablelist.get(i).getTableNames().size(); j++) {
-                    String tableName = supertablelist.get(i).getTableNames().get(j);
-                    HistorysInnerResultVO resultVO = new HistorysInnerResultVO();
-                    resultVO.setDeviceuuid(tableName);
-
-                    String query = "SELECT ts,"+tagStr+filterValidFieldStr+" FROM "+tableName+" where ts >= '"+startTime+"' and ts <= '"+endTime+"'";
-                    ResultSet resultSet1 = statement.executeQuery(query);
-                    int count = resultSet1.getMetaData().getColumnCount();
-                    List<Map<String, Object>> resultList = new ArrayList<>();
-                    while (resultSet1.next()) {
-                        Map<String, Object> map = new HashMap<>();
-                        for (int k = 1; k <= count; k++) {
-                            String key = resultSet1.getMetaData().getColumnName(k);
-                            Object value = resultSet1.getString(k);
-                            map.put(key, value);
-                        }
-                        resultList.add(map);
+                HistorysInnerResultVO resultVO = new HistorysInnerResultVO();
+                resultVO.setDeviceuuid(deviceUUIds.get(i));
+
+                String query = "SELECT ts,"+tagStr+filterValidFieldStr+" FROM "+tablename+" where ts >= '"+startTime+"' and ts <= '"+endTime+"'";
+                ResultSet resultSet1 = statement.executeQuery(query);
+                int count = resultSet1.getMetaData().getColumnCount();
+                List<Map<String, Object>> resultList = new ArrayList<>();
+                while (resultSet1.next()) {
+                    Map<String, Object> map = new HashMap<>();
+                    for (int k = 1; k <= count; k++) {
+                        String key = resultSet1.getMetaData().getColumnName(k);
+                        Object value = resultSet1.getString(k);
+                        map.put(key, value);
                     }
-                    resultSet1.close();
+                    resultList.add(map);
+                }
+                resultSet1.close();
 
-                    if(CollectionUtils.isNotEmpty(resultList)){
-                        Map<String, String> tag = new HashMap<>();
-                        for (int k = 0; k < taglist.size(); k++) {
-                            tag.put(taglist.get(k), resultList.get(0).get(taglist.get(k)).toString());
-                        }
-                        resultVO.setTags(tag);
-
-                        List<MetricVO> metricList = new ArrayList<>();
-                        for (int h = 0; h < filterValidFields.size(); h++) {
-                            if(!"ts".equals(filterValidFields.get(h))){
-                                MetricVO metricVO = new MetricVO();
-                                metricVO.setMetric(filterValidFields.get(h));
-                                List<Map<String,Object>> metircItems = new ArrayList<>();
-                                for (int k = 0; k < resultList.size(); k++) {
-                                    String field = filterValidFields.get(h);
-                                    Map<String,Object> map = new HashMap<>();
-                                    map.put("timestamp",resultList.get(k).get("ts"));
-                                    map.put("value",resultList.get(k).get(field));
-                                    metircItems.add(map);
-                                }
-                                metricVO.setMetricItems(metircItems);
-                                metricList.add(metricVO);
-                            }
+                if(CollectionUtils.isNotEmpty(resultList)){
+                    Map<String, String> tag = new HashMap<>();
+                    for (int k = 0; k < taglist.size(); k++) {
+                        tag.put(taglist.get(k), resultList.get(0).get(taglist.get(k)).toString());
+                    }
+                    resultVO.setTags(tag);
 
+                    List<MetricVO> metricList = new ArrayList<>();
+                    for (int h = 0; h < filterValidFields.size(); h++) {
+                        if(!"ts".equals(filterValidFields.get(h))){
+                            MetricVO metricVO = new MetricVO();
+                            metricVO.setMetric(filterValidFields.get(h));
+                            List<Map<String,Object>> metircItems = new ArrayList<>();
+                            for (int k = 0; k < resultList.size(); k++) {
+                                String field = filterValidFields.get(h);
+                                Map<String,Object> map = new HashMap<>();
+                                map.put("timestamp",resultList.get(k).get("ts"));
+                                map.put("value",resultList.get(k).get(field));
+                                metircItems.add(map);
+                            }
+                            metricVO.setMetricItems(metircItems);
+                            metricList.add(metricVO);
                         }
-                        resultVO.setMetrics(metricList);
-                        list.add(resultVO);
-                    }
 
+                    }
+                    resultVO.setMetrics(metricList);
+                    list.add(resultVO);
                 }
 
             }
+
             statement.close();
             connection.close();
 
@@ -295,5 +436,51 @@ public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdeng
 
     }
 
+    @Override
+    public List<Map<String, Object>> getAllDeviceRealTime(){
+        List<DeviceRealDataVO> realData =  queryTdengineDataMapper.getAllDeviceRealTime();
+        return realData.stream().map(item -> {
+            Map<String, Object> map = new HashMap<>();
+            map.put("device_uuid", item.getDeviceUuid());
+            map.put("device_id", item.getDeviceId());
+            map.put("ts", item.getTs());
+            map.put("realtime", item.getRealtime());
+            return map;
+        }).collect(Collectors.toList());
+    }
+
+    @Override
+    public void createSuperTableAndColumn(SuperTableDTO superTableDTO){
+        queryTdengineDataMapper.createSuperTableAndColumn(superTableDTO);
+    }
+
+    @Override
+    public void addSuperTableColumn(String superTableName, Fields fields) {
+        queryTdengineDataMapper.addSuperTableColumn(superTableName, fields);
+    }
 
+    @Override
+    public void dropSuperTableColumn(String superTableName, Fields fields) {
+        queryTdengineDataMapper.dropSuperTableColumn(superTableName, fields);
+    }
+
+    @Override
+    public List<SuperTableDescribeVO> describeSuperOrSubTable(String tableName) {
+        try {
+            return queryTdengineDataMapper.describeSuperOrSubTable(tableName);
+        } catch (Exception e) {
+            log.warn("Error describing super or sub table. Database: {}, Table: {}, Error: {}"+tableName+", "+e.getMessage());
+            return Collections.emptyList();
+        }
+    }
+
+    @Override
+    public void addSuperTableTag(String superTableName, Fields fields) {
+        queryTdengineDataMapper.addSuperTableTag(superTableName, fields);
+    }
+
+    @Override
+    public void dropSuperTableTag(String superTableName, Fields fields) {
+        queryTdengineDataMapper.dropSuperTableTag(superTableName, fields);
+    }
 }

+ 20 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/MqttStrategy.java

@@ -0,0 +1,20 @@
+package com.usky.demo.service.rocketmq;
+
+
+import com.usky.demo.service.vo.MQDataVO;
+
+/**
+ * 策略类
+ *
+ * @author yq
+ * @date 2021/11/3 8:27
+ */
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqBaseVO
+     * @return
+     */
+    String disposeMessage(MQDataVO mqBaseVO);
+}

+ 16 - 1
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/MyConsumer.java

@@ -3,12 +3,15 @@ package com.usky.demo.service.rocketmq;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
 import com.usky.demo.service.rocketmq.SimpleContext;
+import com.usky.demo.service.vo.MQDataVO;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.util.Map;
 
 @Slf4j
@@ -18,9 +21,21 @@ public class MyConsumer implements RocketMQListener<String> {
     @Autowired
     private SimpleContext simpleContext;
 
+    @Value("${spring.sourcetype}")
+    private String sourcetype;
+
     @Override
     public void onMessage(String message){
         System.out.println("DirectReceiver消费者收到消息: " + message);
-        simpleContext.disposeMessageToDB(message);
+
+        MQDataVO mqDataVO = new MQDataVO();
+        mqDataVO.setData(message);
+        if(sourcetype.equals("taos")){
+            mqDataVO.setDescribe("tdengine");
+        }else{
+            mqDataVO.setDescribe("influx");
+        }
+
+        simpleContext.getResource(mqDataVO);
     }
 }

+ 10 - 45
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/SimpleContext.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.usky.common.core.util.JsonUtils;
 import com.usky.demo.service.utils.TsdbUtils;
+import com.usky.demo.service.vo.MQDataVO;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 import org.springframework.stereotype.Service;
@@ -11,58 +12,22 @@ import org.springframework.stereotype.Service;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 中间处理消息转发
  */
 @Service
-@Repository
 public class SimpleContext {
     @Autowired
-    private TsdbUtils influxDBUtils;
-
-    /**
-     * 设备数据存influxdb时序数据库
-     * @param
-     */
-    public void disposeMessageToDB(String message){
-        Map<String, String> tags = new HashMap<>();
-        Map<String, Object> fields = new HashMap<>();
-        Map<String, String> realtags = new HashMap<>();
-        Map<String, Object> realfields = new HashMap<>();
-        Map map_data = JsonUtils.fromJson(message,Map.class);
-        System.out.println("test11111");
-        String deviceUUId = map_data.get("deviceUUId").toString();
-        String productCode = map_data.get("productCode").toString().toLowerCase();
-        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
-        String tableName = deviceUUId;
-        System.out.println("test22222");
-
-        Object tg = JSONObject.toJSONString(map_data.get("tags"));
-        JSONObject tag = JSON.parseObject(tg.toString());
-        for (String entry : tag.keySet()){
-            tags.put(entry.toLowerCase(),tag.get(entry).toString());
-        }
-        System.out.println("test33333"+tg.toString());
-
-        Object met = JSONObject.toJSONString(map_data.get("metrics"));
-        JSONObject metrics = JSON.parseObject(met.toString());
-        for(String entry : metrics.keySet()){
-            fields.put(entry.toLowerCase(),metrics.get(entry));
-            realfields.put(entry.toLowerCase(),Float.valueOf(metrics.get(entry).toString()));
-        }
-        System.out.println("test44444"+met.toString());
-
-        influxDBUtils.insertOne1(tableName,tags,fields,timestamp);
-
-        //创建设备实时数据表,用于保存每个设备最新一条数据记录
-        String realtimeTableName = "datarealtime";
-        long realtimestamp = 1744164000001L;
-        realtags.put("device_uuid",deviceUUId);
-        realfields.put("realtime", (map_data.get("timestamp").toString()));
-        realfields.put("device_uuid",deviceUUId);
-        realfields.put("device_id",tags.get("device_id"));
-        influxDBUtils.insertOne1(realtimeTableName,realtags,realfields,realtimestamp);
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MQDataVO mqDataVO) {
+        return strategyMap.get(mqDataVO.getDescribe()).disposeMessage(mqDataVO);
     }
 }
 

+ 67 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/influxdb/influxdb.java

@@ -0,0 +1,67 @@
+package com.usky.demo.service.rocketmq.influxdb;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.demo.service.rocketmq.MqttStrategy;
+import com.usky.demo.service.utils.TsdbUtils;
+import com.usky.demo.service.vo.MQDataVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+@Service("influx")
+public class influxdb implements MqttStrategy {
+    @Autowired
+    private TsdbUtils influxDBUtils;
+
+    /**
+     * 设备数据存influxdb时序数据库
+     * @param
+     */
+    @Override
+    public String disposeMessage(MQDataVO mqBaseVO) {
+        Map<String, String> tags = new HashMap<>();
+        Map<String, Object> fields = new HashMap<>();
+        Map<String, String> realtags = new HashMap<>();
+        Map<String, Object> realfields = new HashMap<>();
+        Map map_data = JsonUtils.fromJson(mqBaseVO.getData().toString(),Map.class);
+        System.out.println("test11111");
+        String deviceUUId = map_data.get("deviceUUId").toString();
+        String productCode = map_data.get("productCode").toString().toLowerCase();
+        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+        String tableName = deviceUUId;
+        System.out.println("test22222");
+
+        Object tg = JSONObject.toJSONString(map_data.get("tags"));
+        JSONObject tag = JSON.parseObject(tg.toString());
+        for (String entry : tag.keySet()){
+            tags.put(entry.toLowerCase(),tag.get(entry).toString());
+        }
+        System.out.println("test33333"+tg.toString());
+
+        Object met = JSONObject.toJSONString(map_data.get("metrics"));
+        JSONObject metrics = JSON.parseObject(met.toString());
+        for(String entry : metrics.keySet()){
+            fields.put(entry.toLowerCase(),metrics.get(entry));
+            realfields.put(entry.toLowerCase(),Float.valueOf(metrics.get(entry).toString()));
+        }
+        System.out.println("test44444"+met.toString());
+
+        influxDBUtils.insertOne1(tableName,tags,fields,timestamp);
+
+        //创建设备实时数据表,用于保存每个设备最新一条数据记录
+        String realtimeTableName = "datarealtime";
+        long realtimestamp = 1744164000001L;
+        realtags.put("device_uuid",deviceUUId);
+        realfields.put("realtime", (map_data.get("timestamp").toString()));
+        realfields.put("device_uuid",deviceUUId);
+        realfields.put("device_id",tags.get("device_id"));
+        influxDBUtils.insertOne1(realtimeTableName,realtags,realfields,realtimestamp);
+        return "";
+    }
+}

+ 77 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/tdengine/tdengine.java

@@ -0,0 +1,77 @@
+package com.usky.demo.service.rocketmq.tdengine;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.dynamic.datasource.annotation.DS;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.demo.service.rocketmq.MqttStrategy;
+import com.usky.demo.service.vo.MQDataVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.sql.DataSource;
+import java.sql.*;
+import java.util.HashMap;
+import java.util.Map;
+
+@Service("tdengine")
+@DS("taos")
+public class tdengine implements MqttStrategy {
+    @Autowired
+    private DataSource dataSource;
+
+    @Override
+    public String disposeMessage(MQDataVO mqBaseVO) {
+        try {
+            Connection conn = dataSource.getConnection();
+            Statement stmt = conn.createStatement();
+
+            Map<String, String> tags = new HashMap<>();
+            Map<String, Object> fields = new HashMap<>();
+            Map<String, String> realtags = new HashMap<>();
+            Map<String, Object> realfields = new HashMap<>();
+            Map map_data = JsonUtils.fromJson(mqBaseVO.getData().toString(),Map.class);
+            System.out.println("test11111");
+            String deviceUUId = map_data.get("deviceUUId").toString();
+            String productCode = map_data.get("productCode").toString().toLowerCase();
+            Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+            String tableName = "_"+deviceUUId;
+            String STableName = "sup_"+productCode;
+
+            StringBuilder tagValue = new StringBuilder();
+            Object tg = JSONObject.toJSONString(map_data.get("tags"));
+            JSONObject tag = JSON.parseObject(tg.toString());
+            for (String entry : tag.keySet()){
+                tags.put(entry.toLowerCase(),tag.get(entry).toString());
+                tagValue.append("'"+tag.get(entry).toString()+"',");
+            }
+            System.out.println("test33333"+tg.toString());
+
+            Object met = JSONObject.toJSONString(map_data.get("metrics"));
+            StringBuilder fieldName = new StringBuilder();
+            StringBuilder fieldValue = new StringBuilder();
+            JSONObject metrics = JSON.parseObject(met.toString());
+            for(String entry : metrics.keySet()){
+                fields.put(entry.toLowerCase(),metrics.get(entry));
+                realfields.put(entry.toLowerCase(),Float.valueOf(metrics.get(entry).toString()));
+                fieldName.append(","+entry.toLowerCase());
+                fieldValue.append(","+metrics.get(entry).toString());
+            }
+            System.out.println("test44444"+met.toString());
+            String sql = "insert into "+tableName+" using "+STableName+ " tags("+tagValue.toString().substring(0,tagValue.length()-1)+") (ts"+fieldName.toString()+") values ("+timestamp+fieldValue.toString()+")";
+            stmt.execute(sql);
+
+            //创建设备实时数据表,用于保存每个设备最新一条数据记录
+            long realtimestamp = 1756828800000L;
+            String realsql = "insert into datarealtime values("+ realtimestamp + " , '"+ deviceUUId + "', " + timestamp + " , '" + tags.get("device_id").toString() + "' )";
+            stmt.execute(realsql);
+
+            stmt.close();
+            conn.close();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return "";
+    }
+}

+ 13 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/vo/MQDataVO.java

@@ -0,0 +1,13 @@
+package com.usky.demo.service.vo;
+
+import lombok.Data;
+
+@Data
+public class MQDataVO {
+
+    private String describe;
+    /**
+     * 数据内容
+     */
+    private Object data;
+}

+ 111 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/resources/mapper/demo/QueryTdengineDataMapper.xml

@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.usky.demo.mapper.QueryTdengineDataMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.usky.demo.domain.QueryTdengineData">
+        <result column="id" property="id" />
+        <result column="tdengine_name" property="tdengineName" />
+    </resultMap>
+
+    <update id="createSuperTableAndColumn"
+            parameterType="com.usky.demo.service.vo.SuperTableVO">
+        create stable if not exists ${superTableName}
+        <foreach item="item" collection="schemaFields" separator=","
+                 open="(" close=")" index="">
+            <if test="item.fieldName != null || item.fieldName != ''">
+                ${item.fieldName}
+            </if>
+            <if test="item.dataType != null">
+                ${item.dataType}
+                <!-- 当dataType.quoted为true时添加size -->
+                <if test="item.dataType.quoted == true and item.size != null">
+                    (${item.size})
+                </if>
+            </if>
+        </foreach>
+        tags
+        <!-- tdEngine不支持动态tags里的数据类型,只能使用choose标签比对 -->
+        <foreach item="item" collection="tagsFields" separator=","
+                 open="(" close=")" index="">
+            <if test="item.fieldName != null || item.fieldName != ''">
+                ${item.fieldName}
+            </if>
+            <if test="item.dataType != null">
+                ${item.dataType}
+                <!-- 当dataType.quoted为true时添加size -->
+                <if test="item.dataType.quoted == true and item.size != null">
+                    (${item.size})
+                </if>
+            </if>
+        </foreach>
+    </update>
+
+    <update id="addSuperTableColumn">
+        ALTER
+        STABLE
+        ${superTableName}
+        ADD
+        COLUMN
+        <if test="fields.fieldName != null || fields.fieldName != ''">
+            ${fields.fieldName}
+        </if>
+        <if test="fields.dataType != null || fields.dataType != ''">
+            ${fields.dataType}
+            <!-- 当dataType.quoted为true时添加size -->
+            <if test="fields.dataType.quoted == true and fields.size != null">
+                (${fields.size})
+            </if>
+        </if>
+    </update>
+
+    <update id="addSuperTableTag">
+        ALTER
+        STABLE
+        ${superTableName}
+        ADD
+        TAG
+        <if test="fields.fieldName != null || fields.fieldName != ''">
+            ${fields.fieldName}
+        </if>
+        <if test="fields.dataType != null || fields.dataType != ''">
+            ${fields.dataType}
+            <!-- 当dataType.quoted为true时添加size -->
+            <if test="fields.dataType.quoted == true and fields.size != null">
+                (${fields.size})
+            </if>
+        </if>
+    </update>
+
+    <delete id="dropSuperTableColumn">
+        ALTER
+        STABLE
+        ${superTableName}
+        DROP
+        COLUMN
+        <if test="fields.fieldName != null || fields.fieldName != ''">
+            ${fields.fieldName}
+        </if>
+    </delete>
+
+    <delete id="dropSuperTableTag">
+        ALTER
+        STABLE
+        ${superTableName}
+        DROP
+        TAG
+        <if test="fields.fieldName != null || fields.fieldName != ''">
+            ${fields.fieldName}
+        </if>
+    </delete>
+
+    <select id="describeSuperOrSubTable" resultType="com.usky.demo.domain.SuperTableDescribeVO">
+        DESCRIBE ${tableName};
+    </select>
+
+    <select id="getAllDeviceRealTime" resultType="com.usky.demo.domain.DeviceRealDataVO">
+        select ts,device_uuid,realtime,device_id from datarealtime;
+    </select>
+
+
+</mapper>