Pārlūkot izejas kodu

1、优化data-tsdb服务设备消息消费逻辑,创建datarealtime表,time 相当于表的主键,当一条数据的time和tags完全相同时候,新数据会替换掉旧数据,保证每个设备最新一条数据存储;
2、优化获取设备实时数据接口,更换为查询datarealtime表数据,调整代码逻辑,修改接口文档,测试接口;

james 1 nedēļu atpakaļ
vecāks
revīzija
5dd9ccc218

+ 71 - 30
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -242,7 +242,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     @Override
     public List<Map<String, Object>> getAllDeviceRealTime(){
         // 构建查询语句,获取所有 measurement 的最新数据
-        String queryStr = "SELECT * FROM /.*/ ORDER BY time DESC LIMIT 1";
+        String queryStr = "SELECT *::field FROM datarealtime tz('Asia/Shanghai')";
 
         return tsdbUtils.fetchRealTimeRecords(queryStr);
     }
@@ -252,45 +252,86 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
         List<LastInnerResultVO> list = new ArrayList<>();
         List<String> deviceUUIds = requestVO.getDeviceuuid();
         List<String> metrics = requestVO.getMetrics();
+//        if(CollectionUtils.isNotEmpty(metrics)){
+//            String metricStr = "\""+metrics.get(0)+"\"";
+//            for (int i = 1; i < metrics.size(); i++) {
+//                metricStr=metricStr.concat(",\""+metrics.get(i)+"\"");
+//            }
+//            for (int i = 0; i < deviceUUIds.size(); i++) {
+//                String tableName = deviceUUIds.get(i);
+//                LastInnerResultVO resultVO = new LastInnerResultVO();
+//                resultVO.setDeviceuuid(deviceUUIds.get(i));
+//
+//                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+//                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+//                if(CollectionUtils.isNotEmpty(tag)){
+//                    resultVO.setTags(tag.get(0));
+//                    String query = "SELECT time,"+metricStr+" FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+//                    List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
+//                    if(CollectionUtils.isEmpty(metricList)){
+//                        throw new BusinessException(tableName+"设备没有对应属性,请添加");
+//                    }
+//                    resultVO.setMetrics(metricList.get(0));
+//                }
+//
+//                list.add(resultVO);
+//            }
+//
+//        }else{
+//            for (int i = 0; i < deviceUUIds.size(); i++) {
+//                String tableName = deviceUUIds.get(i);
+//                LastInnerResultVO resultVO = new LastInnerResultVO();
+//                resultVO.setDeviceuuid(deviceUUIds.get(i));
+//                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+//                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+//                if(CollectionUtils.isNotEmpty(tag)){
+//                    resultVO.setTags(tag.get(0));
+//                    String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+//                    List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
+//                    resultVO.setMetrics(metricList.get(0));
+//                }
+//                list.add(resultVO);
+//            }
+//        }
+
+        String deviceUUId = "device_uuid='"+deviceUUIds.get(0)+"'";
+        for (int i = 1; i < deviceUUIds.size(); i++) {
+            deviceUUId=deviceUUId.concat(" or device_uuid='"+deviceUUIds.get(i)+"'");
+        }
         if(CollectionUtils.isNotEmpty(metrics)){
             String metricStr = "\""+metrics.get(0)+"\"";
             for (int i = 1; i < metrics.size(); i++) {
                 metricStr=metricStr.concat(",\""+metrics.get(i)+"\"");
             }
-            for (int i = 0; i < deviceUUIds.size(); i++) {
-                String tableName = deviceUUIds.get(i);
-                LastInnerResultVO resultVO = new LastInnerResultVO();
-                resultVO.setDeviceuuid(deviceUUIds.get(i));
-
-                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
-                if(CollectionUtils.isNotEmpty(tag)){
-                    resultVO.setTags(tag.get(0));
-                    String query = "SELECT time,"+metricStr+" FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-                    List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
-                    if(CollectionUtils.isEmpty(metricList)){
-                        throw new BusinessException(tableName+"设备没有对应属性,请添加");
-                    }
-                    resultVO.setMetrics(metricList.get(0));
+            String query = "SELECT \"realtime\",\"device_uuid\",\"device_id\","+metricStr+" FROM \"datarealtime\" where ("+deviceUUId+") tz('Asia/Shanghai')";
+            List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
+            if(CollectionUtils.isNotEmpty(metricList)){
+                for (int i = 0; i < metricList.size(); i++) {
+                    LastInnerResultVO resultVO = new LastInnerResultVO();
+                    resultVO.setDeviceuuid(metricList.get(i).get("device_uuid").toString());
+                    Map<String, String> tag = new HashMap<String,String>();
+                    tag.put("device_id",metricList.get(i).get("device_id").toString());
+                    resultVO.setTags(tag);
+                    resultVO.setMetrics(metricList.get(i));
+
+                    list.add(resultVO);
                 }
-
-                list.add(resultVO);
             }
 
         }else{
-            for (int i = 0; i < deviceUUIds.size(); i++) {
-                String tableName = deviceUUIds.get(i);
-                LastInnerResultVO resultVO = new LastInnerResultVO();
-                resultVO.setDeviceuuid(deviceUUIds.get(i));
-                String tagQuery = "SELECT \"device_id\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
-                if(CollectionUtils.isNotEmpty(tag)){
-                    resultVO.setTags(tag.get(0));
-                    String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-                    List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
-                    resultVO.setMetrics(metricList.get(0));
+            String query = "SELECT *::field FROM \"datarealtime\" where ("+deviceUUId+") tz('Asia/Shanghai')";
+            List<Map<String, Object>> metricList = tsdbUtils.fetchRecords(query);
+            if(CollectionUtils.isNotEmpty(metricList)){
+                for (int i = 0; i < metricList.size(); i++) {
+                    LastInnerResultVO resultVO = new LastInnerResultVO();
+                    resultVO.setDeviceuuid(metricList.get(i).get("device_uuid").toString());
+                    Map<String, String> tag = new HashMap<String,String>();
+                    tag.put("device_id",metricList.get(i).get("device_id").toString());
+                    resultVO.setTags(tag);
+                    resultVO.setMetrics(metricList.get(i));
+
+                    list.add(resultVO);
                 }
-                list.add(resultVO);
             }
         }
 

+ 14 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/SimpleContext.java

@@ -8,6 +8,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 import org.springframework.stereotype.Service;
 
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.*;
 
 /**
@@ -26,6 +28,8 @@ public class SimpleContext {
     public void disposeMessageToDB(String message){
         Map<String, String> tags = new HashMap<>();
         Map<String, Object> fields = new HashMap<>();
+        Map<String, String> realtags = new HashMap<>();
+        Map<String, Object> realfields = new HashMap<>();
         Map map_data = JsonUtils.fromJson(message,Map.class);
         System.out.println("test11111");
         String deviceUUId = map_data.get("deviceUUId").toString();
@@ -45,10 +49,20 @@ public class SimpleContext {
         JSONObject metrics = JSON.parseObject(met.toString());
         for(String entry : metrics.keySet()){
             fields.put(entry.toLowerCase(),metrics.get(entry));
+            realfields.put(entry.toLowerCase(),Float.valueOf(metrics.get(entry).toString()));
         }
         System.out.println("test44444"+met.toString());
 
         influxDBUtils.insertOne1(tableName,tags,fields,timestamp);
+
+        //创建设备实时数据表,用于保存每个设备最新一条数据记录
+        String realtimeTableName = "datarealtime";
+        long realtimestamp = 1744164000001L;
+        realtags.put("device_uuid",deviceUUId);
+        realfields.put("realtime", (map_data.get("timestamp").toString()));
+        realfields.put("device_uuid",deviceUUId);
+        realfields.put("device_id",tags.get("device_id"));
+        influxDBUtils.insertOne1(realtimeTableName,realtags,realfields,realtimestamp);
     }
 }