Ver código fonte

修改查询influxdb数据超时时间为60s,同时开发查询influxdb所有表实时数据接口并提供RPC调用

james 1 semana atrás
pai
commit
67745fbe02

+ 8 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java

@@ -8,6 +8,7 @@ import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
+import java.util.Map;
 
 @FeignClient(contextId = "remoteTsdbProxyService", value = "data-tsdb-proxy", fallbackFactory = RemoteTsdbProxyFallbackFactory.class)
 public interface RemoteTsdbProxyService {
@@ -18,6 +19,13 @@ public interface RemoteTsdbProxyService {
     @PostMapping("/last")
     List<LastInnerResultVO> last(@RequestBody LastInnerQueryVO requestVO);
 
+    /**
+     * 查询influxdb所有表实时数据
+     * @return
+     */
+    @PostMapping("getAllDeviceRealTime")
+    List<Map<String, Object>> getAllDeviceRealTime();
+
     /**
      * 单个设备实时数据查询
      * @param deviceUUId

+ 7 - 0
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java

@@ -14,6 +14,7 @@ import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 
 import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -42,6 +43,12 @@ public class RemoteTsdbProxyFallbackFactory implements FallbackFactory<RemoteTsd
                 throw new BusinessException("设备实时数据查询:" + throwable.getMessage());
             }
 
+            @Override
+            public List<Map<String, Object>> getAllDeviceRealTime()
+            {
+                throw new BusinessException("查询influxdb所有表实时数据:" + throwable.getMessage());
+            }
+
             @Override
             public LastResultVO queryLastDeviceData(String deviceUUId)
             {

+ 10 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 @RestController
@@ -39,6 +40,15 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
         return queryInfluxdbDataService.last(requestVO);
     }
 
+    /**
+     * 查询influxdb所有表实时数据
+     * @return
+     */
+    @Override
+    public List<Map<String, Object>> getAllDeviceRealTime(){
+        return queryInfluxdbDataService.getAllDeviceRealTime();
+    }
+
     /**
      * 单个设备实时数据查询
      * @param deviceUUId

+ 10 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java

@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * <p>
@@ -45,5 +46,14 @@ public class QueryInfluxdbDataController {
         return ApiResult.success(queryInfluxdbDataService.history(requestVO));
     }
 
+    /**
+     * 查询influxdb所有表实时数据
+     * @return
+     */
+    @PostMapping("getAllDeviceRealTime")
+    public ApiResult<List<Map<String, Object>>> getAllDeviceRealTime(){
+        return ApiResult.success(queryInfluxdbDataService.getAllDeviceRealTime());
+    }
+
 }
 

+ 1 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryInfluxdbDataService.java

@@ -31,4 +31,5 @@ public interface QueryInfluxdbDataService extends CrudService<QueryInfluxdbData>
     List<LastInnerResultVO> last(LastInnerQueryVO requestVO);
     List<HistorysInnerResultVO> history(HistorysInnerRequestVO requestVO);
     // 对内接口  end
+    List<Map<String, Object>> getAllDeviceRealTime();
 }

+ 6 - 1
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/config/TsdbConfig.java

@@ -1,12 +1,15 @@
 package com.usky.demo.service.config;
 
 import com.usky.common.core.util.StringUtils;
+import okhttp3.OkHttpClient;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.concurrent.TimeUnit;
+
 @Configuration
 public class TsdbConfig {
 
@@ -30,7 +33,9 @@ public class TsdbConfig {
         if (StringUtils.isEmpty(userName)) {
             influxDB = InfluxDBFactory.connect(influxDBUrl);
         } else {
-            influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
+            OkHttpClient.Builder client = new OkHttpClient.Builder();
+            client.readTimeout(60, TimeUnit.SECONDS);
+            influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password,client);
         }
         try {
 

+ 9 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -239,6 +239,15 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
         return list;
     }
 
+    @Override
+    public List<Map<String, Object>> getAllDeviceRealTime(){
+        // 构建查询语句,获取所有 measurement 的最新数据
+        String queryStr = "SELECT * FROM /.*/ ORDER BY time DESC LIMIT 1";
+        List<Map<String, Object>> result = tsdbUtils.fetchRealTimeRecords(queryStr);
+
+        return result;
+    }
+
     @Override
     public List<LastInnerResultVO> last(LastInnerQueryVO requestVO){
         List<LastInnerResultVO> list = new ArrayList<>();

+ 55 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/utils/TsdbUtils.java

@@ -199,6 +199,61 @@ public class TsdbUtils extends InfluxDbUtils {
         return results;
     }
 
+    /**
+     * 查询,返回Map集合
+     *
+     * @param query 完整的查询语句
+     * @return
+     */
+    public List<Map<String, Object>> fetchRealTimeRecords(String query) {
+        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+        List<Map<String, Object>> results = new ArrayList<>();
+        QueryResult queryResult = influxDB.query(new Query(query, database));
+        if(queryResult.getResults().get(0).getSeries() != null){
+            queryResult.getResults().forEach(result -> {
+                result.getSeries().forEach(serial -> {
+                    List<String> columns = serial.getColumns();
+                    String deviceuuid = serial.getName();
+                    int fieldSize = columns.size();
+                    serial.getValues().forEach(value -> {
+                        Map<String, Object> obj = new HashMap<String,Object>();
+                        obj.put("deviceuuid", deviceuuid);
+                        for (int i = 0; i < fieldSize; i++) {
+                            if(columns.get(i).equals("time")){
+                                if(!value.get(i).equals("1970-01-01T00:00:00Z")){
+                                    Date time = null;
+                                    try {
+                                        time = simpleDateFormat1.parse(value.get(i).toString());
+                                    } catch (ParseException e) {
+                                        e.printStackTrace();
+                                    }
+                                    //String dateStr = simpleDateFormat.format(time);
+                                    obj.put(columns.get(i), time.getTime());
+                                }
+
+                            } else if(columns.get(i).equals("timestamp")){
+                                Date time = null;
+                                try {
+                                    time = simpleDateFormat1.parse(value.get(i).toString());
+                                } catch (ParseException e) {
+                                    e.printStackTrace();
+                                }
+                                //String dateStr = simpleDateFormat.format(time);
+                                obj.put(columns.get(i), time.getTime());
+                            }else{
+                                obj.put(columns.get(i), value.get(i));
+                            }
+                        }
+                        results.add(obj);
+                    });
+                });
+            });
+        }
+
+        return results;
+    }
+
     /**
      * 查询
      *