Prechádzať zdrojové kódy

将tdengine设备数据源查询接入到设备实时查询last接口和设备历史数据查询history接口,做到兼容influxdb和tdengine两个时序数据库切换

james 1 mesiac pred
rodič
commit
9d83538acb

+ 5 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/pom.xml

@@ -52,6 +52,11 @@
             <artifactId>data-tsdb-proxy-api</artifactId>
             <version>0.0.1</version>
         </dependency>
+        <dependency>
+            <groupId>com.taosdata.jdbc</groupId>
+            <artifactId>taos-jdbcdriver</artifactId>
+            <version>3.6.3</version>
+        </dependency>
 
     </dependencies>
 

+ 17 - 2
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java

@@ -4,8 +4,10 @@ package com.usky.demo.controller.web;
 import com.usky.common.core.bean.ApiResult;
 import com.usky.demo.domain.*;
 import com.usky.demo.service.QueryInfluxdbDataService;
+import com.usky.demo.service.QueryTdengineDataService;
 import io.swagger.annotations.Api;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
@@ -25,6 +27,11 @@ import java.util.Map;
 public class QueryInfluxdbDataController {
     @Autowired
     private QueryInfluxdbDataService queryInfluxdbDataService;
+    @Autowired
+    private QueryTdengineDataService queryTdengineDataService;
+
+    @Value("${spring.sourcetype}")
+    private String sourcetype;
 
     /**
      * 获取设备实时数据(对内)
@@ -33,7 +40,11 @@ public class QueryInfluxdbDataController {
      */
     @PostMapping("/last")
     public ApiResult<List<LastInnerResultVO>> last(@RequestBody LastInnerQueryVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.last(requestVO));
+        if(sourcetype.equals("taos")){
+            return ApiResult.success(queryTdengineDataService.tdengineLast(requestVO));
+        }else{
+            return ApiResult.success(queryInfluxdbDataService.last(requestVO));
+        }
     }
 
     /**
@@ -43,7 +54,11 @@ public class QueryInfluxdbDataController {
      */
     @PostMapping("/history")
     public ApiResult<List<HistorysInnerResultVO>> history(@RequestBody HistorysInnerRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.history(requestVO));
+        if(sourcetype.equals("taos")) {
+            return ApiResult.success(queryTdengineDataService.tdengineHistory(requestVO));
+        }else {
+            return ApiResult.success(queryInfluxdbDataService.history(requestVO));
+        }
     }
 
     /**

+ 27 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/domain/QueryTdengineData.java

@@ -0,0 +1,27 @@
+package com.usky.demo.domain;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.io.Serializable;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author ya
+ * @since 2024-07-29
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+public class QueryTdengineData implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Long id;
+
+    private String tdengineName;
+
+
+}

+ 16 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/mapper/QueryTdengineDataMapper.java

@@ -0,0 +1,16 @@
+package com.usky.demo.mapper;
+
+import com.usky.common.mybatis.core.CrudMapper;
+import com.usky.demo.domain.QueryTdengineData;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author ya
+ * @since 2024-07-29
+ */
+public interface QueryTdengineDataMapper extends CrudMapper<QueryTdengineData> {
+
+}

+ 24 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryTdengineDataService.java

@@ -0,0 +1,24 @@
+package com.usky.demo.service;
+
+import com.usky.common.mybatis.core.CrudService;
+import com.usky.demo.domain.*;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author ya
+ * @since 2024-07-29
+ */
+public interface QueryTdengineDataService extends CrudService<QueryTdengineData> {
+
+
+    // 对内接口  begin
+    List<LastInnerResultVO> tdengineLast(LastInnerQueryVO requestVO);
+    List<HistorysInnerResultVO> tdengineHistory(HistorysInnerRequestVO requestVO);
+    // 对内接口  end
+}

+ 299 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryTdengineDataServiceImpl.java

@@ -0,0 +1,299 @@
+package com.usky.demo.service.impl;
+
+import com.baomidou.dynamic.datasource.annotation.DS;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.usky.common.core.exception.BusinessException;
+import com.usky.common.mybatis.core.AbstractCrudService;
+import com.usky.demo.domain.*;
+import com.usky.demo.mapper.QueryInfluxdbDataMapper;
+import com.usky.demo.mapper.QueryTdengineDataMapper;
+import com.usky.demo.service.DmpDeviceService;
+import com.usky.demo.service.DmpProductService;
+import com.usky.demo.service.QueryInfluxdbDataService;
+import com.usky.demo.service.QueryTdengineDataService;
+import com.usky.demo.service.utils.TsdbUtils;
+import com.usky.demo.service.vo.DeviceMapVO;
+import com.usky.demo.service.vo.ProductMapVO;
+import com.usky.demo.service.vo.SuperTableVO;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.dto.QueryResult;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Service;
+
+import javax.sql.DataSource;
+import java.sql.*;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author ya
+ * @since 2024-07-29
+ */
+@Service
+@DS("taos")
+public class QueryTdengineDataServiceImpl extends AbstractCrudService<QueryTdengineDataMapper, QueryTdengineData> implements QueryTdengineDataService {
+    @Autowired
+    private TsdbUtils tsdbUtils;
+    @Autowired
+    private DataSource dataSource;
+
+    @Override
+    public List<LastInnerResultVO> tdengineLast(LastInnerQueryVO requestVO){
+        List<LastInnerResultVO> list = new ArrayList<>();
+        List<String> deviceUUIds = requestVO.getDeviceuuid();
+        List<String> metrics = requestVO.getMetrics();
+
+        try {
+            Connection connection = dataSource.getConnection();
+            Statement statement = connection.createStatement();
+
+            // 将查询数据整理到各个超级表的数据集合中
+            List<SuperTableVO> supertablelist = new ArrayList<>();
+
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                String superTablename = deviceUUIds.get(i).substring(0,deviceUUIds.get(i).lastIndexOf("_"));
+                if(supertablelist.stream().noneMatch(item -> item.getSuperTableName().equals(superTablename))){
+                    SuperTableVO superTableVO = new SuperTableVO();
+                    superTableVO.setSuperTableName(superTablename);
+                    List<String> tableNames = new ArrayList<>();
+                    tableNames.add(deviceUUIds.get(i));
+                    superTableVO.setTableNames(tableNames);
+                    supertablelist.add(superTableVO);
+                }else{
+                    SuperTableVO superTableVO = supertablelist.stream().filter(item -> item.getSuperTableName().equals(superTablename)).findFirst().get();
+                    List<String> tableNames = superTableVO.getTableNames();
+                    tableNames.add(deviceUUIds.get(i));
+                    superTableVO.setTableNames(tableNames);
+                }
+            }
+
+            // 遍历超级表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
+            for (int i = 0; i < supertablelist.size(); i++) {
+                String superTablename = supertablelist.get(i).getSuperTableName();
+
+                String sql = "describe " + superTablename;
+                ResultSet resultSet = statement.executeQuery(sql);
+                List<Map<String, Object>> descList = new ArrayList<>();
+                while (resultSet.next()) {
+                    Map<String, Object> map = new HashMap<>();
+                    map.put("field", resultSet.getString("field"));
+                    map.put("note", resultSet.getString("note"));
+                    descList.add(map);
+                }
+                resultSet.close();
+
+                String tagStr = "";
+                List<String> taglist = new ArrayList<>();
+                List<String> fieldKeys = new ArrayList<>();
+                for(Map<String, Object> map: descList){
+                    String note = map.get("note").toString();
+                    if("TAG".equals(note)){
+                        tagStr = tagStr.concat(map.get("field").toString()+",");
+                        taglist.add(map.get("field").toString());
+                    }else{
+                        fieldKeys.add(map.get("field").toString());
+                    }
+
+                }
+
+                // 过滤有效字段
+                List<String> filterValidFields = new ArrayList<>();
+                if(CollectionUtils.isNotEmpty(metrics)){
+                    filterValidFields = metrics.stream().filter(item -> fieldKeys.contains(item)).collect(Collectors.toList());
+                }else{
+                    filterValidFields = fieldKeys;
+                }
+                String filterValidFieldStr = filterValidFields.stream().collect(Collectors.joining(","));
+                if(CollectionUtils.isEmpty(filterValidFields)){
+                    throw new BusinessException(superTablename+"没有可用属性,请添加");
+                }
+
+                String tableNames = "";
+                for (int j = 0; j < supertablelist.get(i).getTableNames().size(); j++) {
+                    if(j == 0){
+                        tableNames = "'"+supertablelist.get(i).getTableNames().get(j)+"'";
+                    }else{
+                        tableNames = tableNames.concat(",'"+supertablelist.get(i).getTableNames().get(j)+"'");
+                    }
+                }
+
+                String query = "SELECT tbname,ts,"+tagStr+filterValidFieldStr+" FROM "+superTablename+" where tbname in ("+tableNames+") group by tbname having ts = last(ts)";
+                ResultSet metricList = statement.executeQuery(query);
+                while (metricList.next()) {
+                    LastInnerResultVO resultVO = new LastInnerResultVO();
+                    resultVO.setDeviceuuid(metricList.getString("tbname"));
+                    Map<String, String> tag = new HashMap<String,String>();
+                    for(int k=0;k<taglist.size();k++){
+                        tag.put(taglist.get(k),metricList.getString(taglist.get(k)));
+                    }
+                    resultVO.setTags(tag);
+
+                    Map<String, Object> field = new HashMap<String,Object>();
+                    field.put("time",metricList.getString("ts"));
+                    for(int k =0;k<filterValidFields.size();k++){
+                        field.put(filterValidFields.get(k),metricList.getString(filterValidFields.get(k)));
+                    }
+                    resultVO.setMetrics(field);
+                    list.add(resultVO);
+                }
+                metricList.close();
+            }
+            statement.close();
+            connection.close();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+
+
+        return list;
+    }
+
+    @Override
+    public List<HistorysInnerResultVO> tdengineHistory(HistorysInnerRequestVO requestVO){
+        List<HistorysInnerResultVO> list = new ArrayList<>();
+        String startTime = requestVO.getStartTime();
+        String endTime = requestVO.getEndTime();
+        List<String> deviceUUIds = requestVO.getDeviceuuid();
+        List<String> metrics = requestVO.getMetrics();
+
+        try {
+            Connection connection = dataSource.getConnection();
+            Statement statement = connection.createStatement();
+
+            // 将查询数据整理到各个超级表的数据集合中
+            List<SuperTableVO> supertablelist = new ArrayList<>();
+
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                String superTablename = deviceUUIds.get(i).substring(0,deviceUUIds.get(i).lastIndexOf("_"));
+                if(supertablelist.stream().noneMatch(item -> item.getSuperTableName().equals(superTablename))){
+                    SuperTableVO superTableVO = new SuperTableVO();
+                    superTableVO.setSuperTableName(superTablename);
+                    List<String> tableNames = new ArrayList<>();
+                    tableNames.add(deviceUUIds.get(i));
+                    superTableVO.setTableNames(tableNames);
+                    supertablelist.add(superTableVO);
+                }else{
+                    SuperTableVO superTableVO = supertablelist.stream().filter(item -> item.getSuperTableName().equals(superTablename)).findFirst().get();
+                    List<String> tableNames = superTableVO.getTableNames();
+                    tableNames.add(deviceUUIds.get(i));
+                    superTableVO.setTableNames(tableNames);
+                }
+            }
+
+            // 遍历超级表集合,自动忽略表中不存在的字段,用有效字段构建查询,避免错误
+            for (int i = 0; i < supertablelist.size(); i++) {
+                String superTablename = supertablelist.get(i).getSuperTableName();
+
+                String sql = "describe " + superTablename;
+                ResultSet resultSet = statement.executeQuery(sql);
+                List<Map<String, Object>> descList = new ArrayList<>();
+                while (resultSet.next()) {
+                    Map<String, Object> map = new HashMap<>();
+                    map.put("field", resultSet.getString("field"));
+                    map.put("note", resultSet.getString("note"));
+                    descList.add(map);
+                }
+                resultSet.close();
+
+                String tagStr = "";
+                List<String> taglist = new ArrayList<>();
+                List<String> fieldKeys = new ArrayList<>();
+                for(Map<String, Object> map: descList){
+                    String note = map.get("note").toString();
+                    if("TAG".equals(note)){
+                        tagStr = tagStr.concat(map.get("field").toString()+",");
+                        taglist.add(map.get("field").toString());
+                    }else{
+                        fieldKeys.add(map.get("field").toString());
+                    }
+
+                }
+
+                // 过滤有效字段
+                List<String> filterValidFields = new ArrayList<>();
+                if(CollectionUtils.isNotEmpty(metrics)){
+                    filterValidFields = metrics.stream().filter(item -> fieldKeys.contains(item)).collect(Collectors.toList());
+                }else{
+                    filterValidFields = fieldKeys;
+                }
+                String filterValidFieldStr = filterValidFields.stream().collect(Collectors.joining(","));
+                if(CollectionUtils.isEmpty(filterValidFields)){
+                    throw new BusinessException(superTablename+"没有可用属性,请添加");
+                }
+
+                for (int j = 0; j < supertablelist.get(i).getTableNames().size(); j++) {
+                    String tableName = supertablelist.get(i).getTableNames().get(j);
+                    HistorysInnerResultVO resultVO = new HistorysInnerResultVO();
+                    resultVO.setDeviceuuid(tableName);
+
+                    String query = "SELECT ts,"+tagStr+filterValidFieldStr+" FROM "+tableName+" where ts >= '"+startTime+"' and ts <= '"+endTime+"'";
+                    ResultSet resultSet1 = statement.executeQuery(query);
+                    int count = resultSet1.getMetaData().getColumnCount();
+                    List<Map<String, Object>> resultList = new ArrayList<>();
+                    while (resultSet1.next()) {
+                        Map<String, Object> map = new HashMap<>();
+                        for (int k = 1; k <= count; k++) {
+                            String key = resultSet1.getMetaData().getColumnName(k);
+                            Object value = resultSet1.getString(k);
+                            map.put(key, value);
+                        }
+                        resultList.add(map);
+                    }
+                    resultSet1.close();
+
+                    if(CollectionUtils.isNotEmpty(resultList)){
+                        Map<String, String> tag = new HashMap<>();
+                        for (int k = 0; k < taglist.size(); k++) {
+                            tag.put(taglist.get(k), resultList.get(0).get(taglist.get(k)).toString());
+                        }
+                        resultVO.setTags(tag);
+
+                        List<MetricVO> metricList = new ArrayList<>();
+                        for (int h = 0; h < filterValidFields.size(); h++) {
+                            if(!"ts".equals(filterValidFields.get(h))){
+                                MetricVO metricVO = new MetricVO();
+                                metricVO.setMetric(filterValidFields.get(h));
+                                List<Map<String,Object>> metircItems = new ArrayList<>();
+                                for (int k = 0; k < resultList.size(); k++) {
+                                    String field = filterValidFields.get(h);
+                                    Map<String,Object> map = new HashMap<>();
+                                    map.put("timestamp",resultList.get(k).get("ts"));
+                                    map.put("value",resultList.get(k).get(field));
+                                    metircItems.add(map);
+                                }
+                                metricVO.setMetricItems(metircItems);
+                                metricList.add(metricVO);
+                            }
+
+                        }
+                        resultVO.setMetrics(metricList);
+                        list.add(resultVO);
+                    }
+
+                }
+
+            }
+            statement.close();
+            connection.close();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+
+        return list;
+
+    }
+
+
+}

+ 21 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/vo/SuperTableVO.java

@@ -0,0 +1,21 @@
+package com.usky.demo.service.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class SuperTableVO implements Serializable {
+    // 超级表名
+    private String superTableName;
+
+    // 子表集合
+    private List<String> tableNames;
+
+    // 子表tag集合
+    private List<String> tags;
+
+    // 子表field集合
+    private List<String> fields;
+}