Kaynağa Gözat

优化tsdb实时数据(last)和历史数据(history)两个查询接口

james 1 ay önce
ebeveyn
işleme
88900c4765

+ 1 - 1
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -109,7 +109,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
             long endTimeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
             if((endTimeStamp - startTimeStamp) >= 3){ //请求超时3秒,返回失败
                 rec_map.put("code",-1);
-                rec_map.put("message","下发命令失败");
+                rec_map.put("message","下发命令响应超时");
 
                 dmpDeviceCommandService.lambdaUpdate().set(DmpDeviceCommand::getCommandStatus,2).eq(DmpDeviceCommand::getId,commandId).update();
                 break;

+ 5 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java

@@ -12,6 +12,11 @@ import java.util.List;
 @FeignClient(contextId = "remoteTsdbProxyService", value = "data-tsdb-proxy", fallbackFactory = RemoteTsdbProxyFallbackFactory.class)
 public interface RemoteTsdbProxyService {
 
+    /**
+     * 设备实时数据查询
+     */
+    List<LastInnerResultVO> last(@RequestBody LastInnerQueryVO requestVO);
+
     /**
      * 单个设备实时数据查询
      * @param deviceUUId

+ 3 - 3
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/HistorysInnerRequestVO.java

@@ -23,12 +23,12 @@ public class HistorysInnerRequestVO implements Serializable {
     private String endTime;
 
     /**
-     * 设备UUId
+     * 设备UUId集合
      */
-    private String  deviceUUId;
+    private List<String>  deviceuuid;
 
     /**
-     * 指标名
+     * 属性集合
      */
     private List<String> metrics;
 }

+ 4 - 9
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/HistorysInnerResultVO.java

@@ -5,17 +5,12 @@ import lombok.EqualsAndHashCode;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 @Data
 @EqualsAndHashCode(callSuper = false)
 public class HistorysInnerResultVO implements Serializable {
-    private String deviceUUId;
-    private String metric;
-    private List<MetricItemVO> metricItems;
-
-    public HistorysInnerResultVO(String deviceUUId, String metric, List<MetricItemVO> metricItems) {
-        this.deviceUUId = deviceUUId;
-        this.metric = metric;
-        this.metricItems = metricItems;
-    }
+    private String deviceuuid;
+    private Map<String,String> tags;
+    private List<MetricVO> metrics;
 }

+ 3 - 3
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/LastInnerQueryVO.java

@@ -11,12 +11,12 @@ import java.util.List;
 public class LastInnerQueryVO implements Serializable  {
 
     /**
-     * 设备UUId
+     * 设备UUId集合
      */
-    private String  deviceUUId;
+    private List<String>  deviceuuid;
 
     /**
-     * 指标名
+     * 属性集合
      */
     private List<String> metrics;
 }

+ 5 - 9
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/LastInnerResultVO.java

@@ -5,17 +5,13 @@ import lombok.EqualsAndHashCode;
 
 import java.io.Serializable;
 import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
 
 @Data
 @EqualsAndHashCode(callSuper = false)
 public class LastInnerResultVO implements Serializable {
-    private String metric;
-    private Long timestamp;
-    private Object value;
-
-    public LastInnerResultVO(String metric, Long timestamp, Object value) {
-        this.metric = metric;
-        this.timestamp = timestamp;
-        this.value = value;
-    }
+    private String deviceuuid;
+    private Map<String,String> tags;
+    private Map<String,Object> metrics;
 }

+ 6 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java

@@ -36,6 +36,12 @@ public class RemoteTsdbProxyFallbackFactory implements FallbackFactory<RemoteTsd
 //                throw new BusinessException(throwable.getMessage());
 //            }
 
+            @Override
+            public List<LastInnerResultVO> last(LastInnerQueryVO requestVO)
+            {
+                throw new BusinessException("设备实时数据查询:" + throwable.getMessage());
+            }
+
             @Override
             public LastResultVO queryLastDeviceData(String deviceUUId)
             {

+ 10 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java

@@ -29,6 +29,16 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
     @Autowired
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
+    /**
+     * 获取设备实时数据(对内)
+     * @param requestVO
+     * @return
+     */
+    @Override
+    public List<LastInnerResultVO> last(LastInnerQueryVO requestVO){
+        return queryInfluxdbDataService.last(requestVO);
+    }
+
     /**
      * 单个设备实时数据查询
      * @param deviceUUId

+ 5 - 15
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java

@@ -26,7 +26,7 @@ public class QueryInfluxdbDataController {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     /**
-     * 获取单个设备多指标实时数据(对内)
+     * 获取设备实时数据(对内)
      * @param requestVO
      * @return
      */
@@ -36,23 +36,13 @@ public class QueryInfluxdbDataController {
     }
 
     /**
-     * 获取单个设备单属性历史数据(对内)
+     * 获取设备历史数据(对内)
      * @param requestVO
      * @return
      */
-    @PostMapping("/historyMetric")
-    public ApiResult<HistorysInnerResultVO> historyMetric(@RequestBody HistoryInnerRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.historyMetric(requestVO));
-    }
-
-    /**
-     * 获取单个设备多指标实时数据(对内)
-     * @param requestVO
-     * @return
-     */
-    @PostMapping("/historyMetrics")
-    public ApiResult<List<HistorysInnerResultVO>> historyMetrics(@RequestBody HistorysInnerRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.historyMetrics(requestVO));
+    @PostMapping("/history")
+    public ApiResult<List<HistorysInnerResultVO>> history(@RequestBody HistorysInnerRequestVO requestVO){
+        return ApiResult.success(queryInfluxdbDataService.history(requestVO));
     }
 
 }

+ 1 - 2
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryInfluxdbDataService.java

@@ -29,7 +29,6 @@ public interface QueryInfluxdbDataService extends CrudService<QueryInfluxdbData>
 
     // 对内接口  begin
     List<LastInnerResultVO> last(LastInnerQueryVO requestVO);
-    HistorysInnerResultVO historyMetric(HistoryInnerRequestVO requestVO);
-    List<HistorysInnerResultVO> historyMetrics(HistorysInnerRequestVO requestVO);
+    List<HistorysInnerResultVO> history(HistorysInnerRequestVO requestVO);
     // 对内接口  end
 }

+ 121 - 45
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.usky.common.core.exception.BusinessException;
 import com.usky.demo.domain.*;
 import com.usky.demo.mapper.QueryInfluxdbDataMapper;
 import com.usky.demo.service.DmpDeviceService;
@@ -23,10 +24,7 @@ import org.springframework.cache.annotation.Cacheable;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * <p>
@@ -244,66 +242,144 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     @Override
     public List<LastInnerResultVO> last(LastInnerQueryVO requestVO){
         List<LastInnerResultVO> list = new ArrayList<>();
+        List<String> deviceUUIds = requestVO.getDeviceuuid();
         List<String> metrics = requestVO.getMetrics();
-        String tableName = requestVO.getDeviceUUId();
-        for (int i = 0; i < metrics.size(); i++) {
-            String query = "SELECT time,"+metrics.get(i)+" as attributeData FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-            List<Map<String, Object>> list1 = tsdbUtils.fetchRecords(query);
-            if(CollectionUtils.isNotEmpty(list1)){
-                LastInnerResultVO lastInnerResultVO = new LastInnerResultVO(metrics.get(i),Long.valueOf(list1.get(0).get("time").toString()),list1.get(0).get("attributeData"));
-                list.add(lastInnerResultVO);
+        if(CollectionUtils.isNotEmpty(metrics)){
+            String metricStr = "\""+metrics.get(0)+"\"";
+            for (int i = 1; i < metrics.size(); i++) {
+                metricStr=metricStr.concat(",\""+metrics.get(i)+"\"");
             }
-        }
-
-        return list;
-    }
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                String tableName = deviceUUIds.get(i);
+                String query = "SELECT time,"+metricStr+" FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
+                if(CollectionUtils.isEmpty(metricList)){
+                    throw new BusinessException(tableName+"设备没有对应属性,请添加");
+                }
 
-    @Override
-    public HistorysInnerResultVO historyMetric(HistoryInnerRequestVO requestVO){
-        String startTime = requestVO.getStartTime();
-        String endTime = requestVO.getEndTime();
-        String metric = requestVO.getMetric();
-        String tableName = requestVO.getDeviceUUId();
-
-        String query = "SELECT time,"+metric+" as attributeData FROM \""+tableName+"\" where time >='"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
-        List<Map<String, Object>> list1 = tsdbUtils.fetchRecords(query);
-        List<MetricItemVO> metricItemVOList = new ArrayList<>();
-        if(CollectionUtils.isNotEmpty(list1)){
-            for (int i = 0; i < list1.size(); i++) {
-                MetricItemVO itemVO = new MetricItemVO(Long.valueOf(list1.get(i).get("time").toString()), list1.get(i).get("attributeData"));
-                metricItemVOList.add(itemVO);
+                String tagQuery = "SELECT \"device_id\","+metricStr+" FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+                LastInnerResultVO resultVO = new LastInnerResultVO();
+                resultVO.setDeviceuuid(deviceUUIds.get(i));
+                resultVO.setTags(tag.get(0));
+                resultVO.setMetrics(metricList.get(0));
+                list.add(resultVO);
             }
 
+        }else{
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                String tableName = deviceUUIds.get(i);
+                String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
+                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+                LastInnerResultVO resultVO = new LastInnerResultVO();
+                resultVO.setDeviceuuid(deviceUUIds.get(i));
+                resultVO.setTags(tag.get(0));
+                resultVO.setMetrics(metricList.get(0));
+                list.add(resultVO);
+            }
         }
-        HistorysInnerResultVO resultVO = new HistorysInnerResultVO(tableName,metric,metricItemVOList);
 
-        return resultVO;
+        return list;
     }
 
     @Override
-    public List<HistorysInnerResultVO> historyMetrics(HistorysInnerRequestVO requestVO){
+    public List<HistorysInnerResultVO> history(HistorysInnerRequestVO requestVO){
         List<HistorysInnerResultVO> list = new ArrayList<>();
         String startTime = requestVO.getStartTime();
         String endTime = requestVO.getEndTime();
+        List<String> deviceUUIds = requestVO.getDeviceuuid();
         List<String> metrics = requestVO.getMetrics();
-        String tableName = requestVO.getDeviceUUId();
-
-        for (int i = 0; i < metrics.size(); i++) {
-            String metric = metrics.get(i);
-            String query = "SELECT time,"+metric+" as attributeData FROM \""+tableName+"\" where time >='"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
-            List<Map<String, Object>> list1 = tsdbUtils.fetchRecords(query);
-            List<MetricItemVO> metricItemVOList = new ArrayList<>();
-            if(CollectionUtils.isNotEmpty(list1)){
-                for (int j = 0; j < list1.size(); j++) {
-                    MetricItemVO itemVO = new MetricItemVO(Long.valueOf(list1.get(j).get("time").toString()), list1.get(j).get("attributeData"));
-                    metricItemVOList.add(itemVO);
+        if(CollectionUtils.isNotEmpty(metrics)){
+            String metricStr = "\""+metrics.get(0)+"\"";
+            for (int i = 1; i < metrics.size(); i++) {
+                metricStr=metricStr.concat(",\""+metrics.get(i)+"\"");
+            }
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                List<MetricVO> metricList = new ArrayList<>();
+                String tableName = deviceUUIds.get(i);
+
+                String fieldQuery = "SELECT time,"+metricStr+" FROM \""+tableName+"\" where time >= '"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
+                List<Map<String,Object>> metircList = tsdbUtils.fetchRecords(fieldQuery);
+
+                for (int j = 0; j < metrics.size(); j++) {
+                    if(!"time".equals(metrics.get(j))){
+                        List<Map<String,Object>> metircItems = new ArrayList<>();
+                        MetricVO metricVO = new MetricVO();
+                        metricVO.setMetric(metrics.get(j));
+                        if(CollectionUtils.isNotEmpty(metircList)){
+                            for (int k = 0; k < metircList.size(); k++) {
+                                String field = metrics.get(j);
+                                Map<String,Object> map = new HashMap<>();
+                                map.put("timestamp",metircList.get(k).get("time"));
+                                if(Objects.isNull(metircList.get(k).get(field)))
+                                    break;
+                                map.put("value",metircList.get(k).get(field));
+                                metircItems.add(map);
+                            }
+                            metricVO.setMetricItems(metircItems);
+                        }
+                        metricList.add(metricVO);
+                    }
+
                 }
 
+                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+                HistorysInnerResultVO resultVO = new HistorysInnerResultVO();
+                resultVO.setDeviceuuid(deviceUUIds.get(i));
+                resultVO.setTags(tag.get(0));
+                resultVO.setMetrics(metricList);
+                list.add(resultVO);
+            }
+
+        }else{
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                List<MetricVO> metricList = new ArrayList<>();
+                String tableName = deviceUUIds.get(i);
+                String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                QueryResult queryResult = tsdbUtils.query(query);
+                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+                if(queryResult.getResults().get(0).getSeries() != null){
+                    List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
+                    if(CollectionUtils.isNotEmpty(fields)){
+                        String fieldQuery = "SELECT *::field FROM \""+tableName+"\" where time >= '"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
+                        List<Map<String,Object>> metircs = tsdbUtils.fetchRecords(fieldQuery);
+
+                        for (int j = 0; j < fields.size(); j++) {
+                            if(!"time".equals(fields.get(j))){
+                                List<Map<String,Object>> metircItems = new ArrayList<>();
+                                MetricVO metricVO = new MetricVO();
+                                metricVO.setMetric(fields.get(j));
+                                if(CollectionUtils.isNotEmpty(metircs)){
+                                    for (int k = 0; k < metircs.size(); k++) {
+                                        String field = fields.get(j);
+                                        Map<String,Object> map = new HashMap<>();
+                                        map.put("timestamp",metircs.get(k).get("time"));
+                                        map.put("value",metircs.get(k).get(field));
+                                        metircItems.add(map);
+                                    }
+
+                                }
+                                metricVO.setMetricItems(metircItems);
+                                metricList.add(metricVO);
+                            }
+
+                        }
+
+                    }
+                }
+                HistorysInnerResultVO resultVO = new HistorysInnerResultVO();
+                resultVO.setDeviceuuid(deviceUUIds.get(i));
+                resultVO.setTags(tag.get(0));
+                resultVO.setMetrics(metricList);
+                list.add(resultVO);
             }
-            HistorysInnerResultVO resultVO = new HistorysInnerResultVO(tableName,metric,metricItemVOList);
-            list.add(resultVO);
         }
 
+
         return list;
     }