Переглянути джерело

'新增设备数据写入和设备查看查询相关接口'

james 5 місяців тому
батько
коміт
35ecb53c5f
28 змінених файлів з 577 додано та 433 видалено
  1. 2 2
      data-transfer/data-transfer-api/src/main/java/com/usky/transfer/domain/DeviceDataInfoVO.java
  2. 3 3
      data-transfer/data-transfer-api/src/main/java/com/usky/transfer/domain/DeviceDataWriteVO.java
  3. 5 0
      data-transfer/data-transfer-biz/pom.xml
  4. 206 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/web/QueryDeviceDataController.java
  5. 10 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/domain/DmpDevice.java
  6. 3 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/DmpDeviceCommandService.java
  7. 19 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/DmpDeviceCommandServiceImpl.java
  8. 21 8
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java
  9. 2 2
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java
  10. 13 13
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/MyConsumer.java
  11. 80 80
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/RocketMQSimpleContext.java
  12. 29 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalHistoryRequestVO.java
  13. 19 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalHistoryResultVO.java
  14. 19 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalLastRequestVO.java
  15. 20 0
      data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalLastResultVO.java
  16. 6 17
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java
  17. 3 8
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/HistoryRequestVO.java
  18. 2 6
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/HistoryResultVO.java
  19. 2 7
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/LastRequestVO.java
  20. 2 6
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/LastResultVO.java
  21. 0 5
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/MetricVO.java
  22. 9 9
      data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java
  23. 10 24
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java
  24. 10 62
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java
  25. 2 12
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryInfluxdbDataService.java
  26. 50 166
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryInfluxdbDataServiceImpl.java
  27. 3 3
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/SimpleContext.java
  28. 27 0
      data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/utils/TsdbUtils.java

+ 2 - 2
data-transfer/data-transfer-api/src/main/java/com/usky/transfer/domain/DeviceDataInfoVO.java

@@ -27,12 +27,12 @@ public class DeviceDataInfoVO implements Serializable {
     /**
      * 设备编号
      */
-    private String device_id;
+    private String deviceUUId;
 
     /**
      * 产品编码
      */
-    private String product_code;
+    private String productCode;
 
     /**
      * 数据上报时间

+ 3 - 3
data-transfer/data-transfer-api/src/main/java/com/usky/transfer/domain/DeviceDataWriteVO.java

@@ -20,9 +20,9 @@ public class DeviceDataWriteVO implements Serializable {
     private String productCode;
 
     /**
-     * 设备Id
+     * 设备UUId
      */
-    private String deviceId;
+    private String deviceUUId;
 
     /**
      * 数据上报时间
@@ -32,7 +32,7 @@ public class DeviceDataWriteVO implements Serializable {
     /**
      * 标签集合
      */
-    private Map<String,Object> tags;
+    private Map<String,String> tags;
 
     /**
      * 属性值集合

+ 5 - 0
data-transfer/data-transfer-biz/pom.xml

@@ -68,6 +68,11 @@
             <artifactId>data-transfer-api</artifactId>
             <version>0.0.1</version>
         </dependency>
+        <dependency>
+            <groupId>com.usky</groupId>
+            <artifactId>data-tsdb-proxy-api</artifactId>
+            <version>0.0.1</version>
+        </dependency>
 
     </dependencies>
 

+ 206 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/controller/web/QueryDeviceDataController.java

@@ -0,0 +1,206 @@
+package com.usky.transfer.controller.web;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.usky.common.core.bean.ApiResult;
+import com.usky.common.core.bean.CommonPage;
+import com.usky.common.core.exception.BusinessException;
+import com.usky.demo.RemoteTsdbProxyService;
+import com.usky.demo.domain.HistoryRequestVO;
+import com.usky.demo.domain.HistoryResultVO;
+import com.usky.demo.domain.LastRequestVO;
+import com.usky.demo.domain.LastResultVO;
+import com.usky.transfer.domain.DmpDevice;
+import com.usky.transfer.domain.DmpDeviceCommand;
+import com.usky.transfer.service.DmpDeviceCommandService;
+import com.usky.transfer.service.DmpDeviceService;
+import com.usky.transfer.service.vo.DeviceMapVO;
+import com.usky.transfer.service.vo.ExternalHistoryRequestVO;
+import com.usky.transfer.service.vo.ExternalLastRequestVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RestController
+@RequestMapping("dataQuery")
+public class QueryDeviceDataController {
+    @Autowired
+    private RemoteTsdbProxyService remoteTsdbProxyService;
+    @Autowired
+    private DmpDeviceService dmpDeviceService;
+    @Autowired
+    private DmpDeviceCommandService dmpDeviceCommandService;
+
+    @Cacheable(cacheNames = "externalDeviceList",sync = true)
+    public Map<String, DeviceMapVO> getExternalDeviceMap(){
+        Map<String,DeviceMapVO> deviceMap = new HashMap<>();
+        LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
+        queryWrapper.eq(DmpDevice::getDeleteFlag,0)
+                .orderByDesc(DmpDevice::getId);
+        List<DmpDevice> list = dmpDeviceService.list(queryWrapper);
+        if(CollectionUtils.isNotEmpty(list)){
+            for (int i = 0; i < list.size(); i++) {
+                String deviceId = list.get(i).getDeviceId();
+                DeviceMapVO mapVO = new DeviceMapVO();
+                mapVO.setProductCode(list.get(i).getProductCode());
+                mapVO.setDeviceId(list.get(i).getDeviceId());
+                mapVO.setDeviceUuid(list.get(i).getDeviceUuid());
+                mapVO.setDeviceStatus(list.get(i).getServiceStatus());
+
+                deviceMap.put(deviceId,mapVO);
+            }
+        }
+
+        return deviceMap;
+    }
+
+    /**
+     * 单个设备实时数据查询(对外)
+     * @param productCode
+     * @param deviceId
+     * @return
+     */
+    @GetMapping("/externalLast")
+    public ApiResult<LastResultVO> queryLastDeviceData(@RequestParam(value = "productCode") String productCode,
+                                                       @RequestParam(value = "deviceId") String deviceId){
+        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
+        if(!deviceMapList.containsKey(deviceId)){
+            throw new BusinessException(deviceId+"无设备信息");
+        }
+
+        String deviceUUId = "";
+
+        for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+            String productCode1 = map.getValue().getProductCode();
+            String deviceId1 = map.getKey();
+            if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+                deviceUUId = map.getValue().getDeviceUuid();
+            }
+        }
+
+        return ApiResult.success(remoteTsdbProxyService.queryLastDeviceData(deviceUUId));
+    }
+
+    /**
+     * 批量设备实时数据查询(对外)
+     * @param requestVO
+     * @return
+     */
+    @PostMapping("/externalLast")
+    public ApiResult<List<LastResultVO>> queryLastDeviceData(@RequestBody ExternalLastRequestVO requestVO){
+        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
+        LastRequestVO reqVO = new LastRequestVO();
+
+        String productCode = requestVO.getProductCode();
+        List<String> deviceIds = requestVO.getDeviceId();
+        List<String> deviceUUIds = new ArrayList<>();
+        for (int i = 0; i < deviceIds.size(); i++) {
+            String deviceId = deviceIds.get(i);
+            if(!deviceMapList.containsKey(deviceId)){
+                throw new BusinessException(deviceId+"无设备信息");
+            }
+
+            for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+                String productCode1 = map.getValue().getProductCode();
+                String deviceId1 = map.getKey();
+                if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+                    deviceUUIds.add(map.getValue().getDeviceUuid());
+
+                }
+            }
+        }
+        reqVO.setDeviceUUId(deviceUUIds);
+
+        return ApiResult.success(remoteTsdbProxyService.queryLastDeviceData(reqVO));
+    }
+
+    /**
+     * 单个设备历史数据查询(对外)
+     * @param productCode
+     * @param deviceId
+     * @param startTime
+     * @param endTime
+     * @return
+     */
+    @GetMapping("/externalHistory")
+    public ApiResult<HistoryResultVO> queryHistoryDeviceData(@RequestParam(value = "productCode") String productCode,
+                                                             @RequestParam(value = "deviceId") String deviceId,
+                                                             @RequestParam(value = "startTime") String startTime,
+                                                             @RequestParam(value = "endTime") String endTime){
+        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
+        if(!deviceMapList.containsKey(deviceId)){
+            throw new BusinessException(deviceId+"无设备信息");
+        }
+
+        String deviceUUId = "";
+
+        for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+            String productCode1 = map.getValue().getProductCode();
+            String deviceId1 = map.getKey();
+            if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+                deviceUUId = map.getValue().getDeviceUuid();
+            }
+        }
+
+        return ApiResult.success(remoteTsdbProxyService.queryHistoryDeviceData(deviceUUId,startTime,endTime));
+    }
+
+    /**
+     * 批量设备历史数据查询(对外)
+     * @param requestVO
+     * @return
+     */
+    @PostMapping("/externalHistory")
+    public ApiResult<List<HistoryResultVO>> queryHistoryDeviceData(@RequestBody ExternalHistoryRequestVO requestVO){
+        Map<String, DeviceMapVO> deviceMapList = this.getExternalDeviceMap();
+        HistoryRequestVO reqVO = new HistoryRequestVO();
+
+        String productCode = requestVO.getProductCode();
+        List<String> deviceIds = requestVO.getDeviceId();
+        List<String> deviceUUIds = new ArrayList<>();
+        for (int i = 0; i < deviceIds.size(); i++) {
+            String deviceId = deviceIds.get(i);
+            if(!deviceMapList.containsKey(deviceId)){
+                throw new BusinessException(deviceId+"无设备信息");
+            }
+
+            for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+                String productCode1 = map.getValue().getProductCode();
+                String deviceId1 = map.getKey();
+                if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+                    deviceUUIds.add(map.getValue().getDeviceUuid());
+
+                }
+            }
+        }
+        reqVO.setDeviceUUId(deviceUUIds);
+        reqVO.setStartTime(requestVO.getStartTime());
+        reqVO.setEndTime(requestVO.getEndTime());
+
+        return ApiResult.success(remoteTsdbProxyService.queryHistoryDeviceData(reqVO));
+    }
+
+    /**
+     * 指令记录
+     * @param commandStatus
+     * @param startTime
+     * @param endTime
+     * @param pageNum
+     * @param pageSize
+     * @return
+     */
+    @GetMapping("deviceCommandRecord")
+    public ApiResult<CommonPage<DmpDeviceCommand>> deviceCommandRecord(@RequestParam(value = "commandStatus",required = false) Integer commandStatus,
+                                                                       @RequestParam(value = "startTime",required = false) String startTime,
+                                                                       @RequestParam(value = "endTime",required = false) String endTime,
+                                                                       @RequestParam(value = "pageNum", required = false, defaultValue = "1") Integer pageNum,
+                                                                       @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize){
+        return ApiResult.success(dmpDeviceCommandService.deviceCommandRecord(commandStatus,startTime,endTime,pageNum,pageSize));
+    }
+}

+ 10 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/domain/DmpDevice.java

@@ -127,5 +127,15 @@ public class DmpDevice implements Serializable {
      */
     private String deviceUuid;
 
+    /**
+     * 经度
+     */
+    private String longitude;
+
+    /**
+     * 纬度
+     */
+    private String latitude;
+
 
 }

+ 3 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/DmpDeviceCommandService.java

@@ -1,7 +1,9 @@
 package com.usky.transfer.service;
 
+import com.usky.common.core.bean.CommonPage;
 import com.usky.transfer.domain.DmpDeviceCommand;
 import com.usky.common.mybatis.core.CrudService;
+import org.springframework.web.bind.annotation.RequestParam;
 
 /**
  * <p>
@@ -13,4 +15,5 @@ import com.usky.common.mybatis.core.CrudService;
  */
 public interface DmpDeviceCommandService extends CrudService<DmpDeviceCommand> {
 
+    CommonPage<DmpDeviceCommand> deviceCommandRecord(Integer commandStatus,String startTime,String endTime,Integer pageNum,Integer pageSize);
 }

+ 19 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/DmpDeviceCommandServiceImpl.java

@@ -1,5 +1,12 @@
 package com.usky.transfer.service.impl;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.usky.common.core.bean.CommonPage;
+import com.usky.common.security.utils.SecurityUtils;
 import com.usky.transfer.domain.DmpDeviceCommand;
 import com.usky.transfer.mapper.DmpDeviceCommandMapper;
 import com.usky.transfer.service.DmpDeviceCommandService;
@@ -17,4 +24,16 @@ import org.springframework.stereotype.Service;
 @Service
 public class DmpDeviceCommandServiceImpl extends AbstractCrudService<DmpDeviceCommandMapper, DmpDeviceCommand> implements DmpDeviceCommandService {
 
+    @Override
+    public CommonPage<DmpDeviceCommand> deviceCommandRecord(Integer commandStatus, String startTime, String endTime, Integer pageNum, Integer pageSize){
+        IPage<DmpDeviceCommand> page = new Page<>(pageNum,pageSize);
+        LambdaQueryWrapper<DmpDeviceCommand> queryWrapper = Wrappers.lambdaQuery();
+        queryWrapper.eq(commandStatus != null,DmpDeviceCommand::getCommandStatus,commandStatus)
+                .between(StringUtils.isNotBlank(startTime)&&StringUtils.isNotBlank(endTime),DmpDeviceCommand::getCreatedTime,startTime,endTime)
+                .eq(DmpDeviceCommand::getTenantId, SecurityUtils.getTenantId())
+                .orderByDesc(DmpDeviceCommand::getId);
+        page = this.page(page,queryWrapper);
+
+        return new CommonPage<>(page.getRecords(),page.getTotal(),pageSize,pageNum);
+    }
 }

+ 21 - 8
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -95,25 +95,26 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
         Map<String,Object> rec_map = new HashMap<>();
         DeviceDataInfoVO dataInfo = new DeviceDataInfoVO();
         Map<String,Object> metrics = writeVO.getMetrics();
-        Map<String,Object> tags = writeVO.getTags();
+        Map<String,String> tags = writeVO.getTags();
         if(metrics.size() > 0){
             Map<String,Object> mp = new HashMap<>();
+            Map<String,Object> mp_tag = new HashMap<>();
             for(Map.Entry<String,Object> map:metrics.entrySet()){
                 mp.put(map.getKey(),map.getValue());
             }
             dataInfo.setMetrics(mp);
             if(tags != null && tags.size() > 0){
-                mp.clear();
-                for(Map.Entry<String,Object> map:tags.entrySet()){
-                    mp.put(map.getKey(),map.getValue().toString());
+                for(Map.Entry<String,String> map:tags.entrySet()){
+                    mp_tag.put(map.getKey(),map.getValue());
                 }
-                dataInfo.setTags(mp);
+                dataInfo.setTags(mp_tag);
             }else{
                 dataInfo.setTags(new HashMap<>());
             }
 
             String productCode = writeVO.getProductCode();
-            String deviceId = writeVO.getDeviceId();
+            String deviceUUId = writeVO.getDeviceUUId();
+            String deviceId = tags.get("deviceId");
 
             //判断上报数据对应产品是否注册,如未注册则为非法
             Map<String,ProductMapVO> productMapList = getProductMap();
@@ -130,6 +131,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     return rec_map;
                 }
             }
+
             //判断上报数据设备是否已注册,未注册自动注册
             Map<String,DeviceMapVO> deviceMapList = getDeviceMap();
             if(!deviceMapList.containsKey(deviceId)){
@@ -147,6 +149,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
                 dmpDeviceService.save(dmpDeviceInfo);
 
+                deviceUUId = dmpDeviceInfo.getDeviceUuid();
 
                 DmpDeviceStatus dmpDeviceStatus = new DmpDeviceStatus();
                 dmpDeviceStatus.setDeviceId(dmpDeviceInfo.getDeviceId());
@@ -157,8 +160,18 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceStatusService.save(dmpDeviceStatus);
             }
 
-            dataInfo.setProduct_code(productCode);
-            dataInfo.setDevice_id(deviceId);
+            if(StringUtils.isBlank(deviceUUId)){
+                for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
+                    String productCode1 = map.getValue().getProductCode();
+                    String deviceId1 = map.getKey();
+                    if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
+                        deviceUUId = map.getValue().getDeviceUuid();
+                    }
+                }
+            }
+
+            dataInfo.setProductCode(productCode);
+            dataInfo.setDeviceUUId(deviceUUId);
             dataInfo.setTimestamp(writeVO.getTimestamp());
 
             myProducer.sendMessage("data-tsdb", JSONArray.toJSON(dataInfo).toString());

+ 2 - 2
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java

@@ -36,7 +36,7 @@ public class Info implements MqttStrategy {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     public String disposeMessage(MqttBaseVO mqttBaseVO) {
-        Map<String, Object> tags = new HashMap<>();
+        Map<String, String> tags = new HashMap<>();
         Map<String, Object> fields = new HashMap<>();
         Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
         String deviceId = map_data.get("device_id").toString();
@@ -58,7 +58,7 @@ public class Info implements MqttStrategy {
         }
 
         DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceId(deviceId);
+        deviceDataWriteVO.setDeviceUUId(deviceId);
         deviceDataWriteVO.setProductCode(productCode);
         deviceDataWriteVO.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
         deviceDataWriteVO.setTags(tags);

+ 13 - 13
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/MyConsumer.java

@@ -6,16 +6,16 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-@Slf4j
-@Component
-@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}")
-public class MyConsumer implements RocketMQListener<String> {
-    @Autowired
-    private RocketMQSimpleContext simpleContext;
-
-    @Override
-    public void onMessage(String message){
-        System.out.println("DirectReceiver消费者收到消息: " + message);
-        simpleContext.disposeMessageToDB(message);
-    }
-}
+//@Slf4j
+//@Component
+//@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}")
+//public class MyConsumer implements RocketMQListener<String> {
+//    @Autowired
+//    private RocketMQSimpleContext simpleContext;
+//
+//    @Override
+//    public void onMessage(String message){
+//        System.out.println("DirectReceiver消费者收到消息: " + message);
+//        simpleContext.disposeMessageToDB(message);
+//    }
+//}

+ 80 - 80
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/rocketmq/RocketMQSimpleContext.java

@@ -1,80 +1,80 @@
-package com.usky.transfer.service.rocketmq;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.nacos.shaded.com.google.gson.JsonArray;
-import com.usky.common.core.util.JsonUtils;
-import com.usky.transfer.domain.DeviceDataInfoVO;
-import com.usky.transfer.domain.DeviceDataWriteVO;
-import com.usky.transfer.service.QueryInfluxdbDataService;
-import com.usky.transfer.service.utils.TsdbUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.util.*;
-
-/**
- * 中间处理消息转发
- */
-@Service
-@Repository
-public class RocketMQSimpleContext {
-    @Resource
-    private MyProducer myProducer;
-    @Autowired
-    private QueryInfluxdbDataService queryInfluxdbDataService;
-
-    /**
-     * 设备数据推送到MQ队列
-     * @param
-     */
-    public void disposeMessageToDB(String message){
-        Map<String, Object> tags = new HashMap<>();
-        Map<String, Object> fields = new HashMap<>();
-        Map map_data = JsonUtils.fromJson(message,Map.class);
-        String deviceId = map_data.get("device_id").toString();
-        String productCode = map_data.get("product_code").toString().toLowerCase();
-        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
-
-        String tableName = deviceId;
-
-        Object tg = JSONObject.toJSONString(map_data.get("tags"));
-        JSONObject tag = JSON.parseObject(tg.toString());
-        for (String entry : tag.keySet()){
-            tags.put(entry.toLowerCase(),tag.get(entry).toString());
-        }
-
-        Object met = JSONObject.toJSONString(map_data.get("metrics"));
-        JSONObject metrics = JSON.parseObject(met.toString());
-        for(String entry : metrics.keySet()){
-            fields.put(entry.toLowerCase(),metrics.get(entry));
-        }
-
-        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceId(deviceId);
-        deviceDataWriteVO.setProductCode(productCode);
-        deviceDataWriteVO.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
-        deviceDataWriteVO.setTags(tags);
-        deviceDataWriteVO.setMetrics(metrics);
-
-        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
-    }
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+//package com.usky.transfer.service.rocketmq;
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONObject;
+//import com.alibaba.nacos.shaded.com.google.gson.JsonArray;
+//import com.usky.common.core.util.JsonUtils;
+//import com.usky.transfer.domain.DeviceDataInfoVO;
+//import com.usky.transfer.domain.DeviceDataWriteVO;
+//import com.usky.transfer.service.QueryInfluxdbDataService;
+//import com.usky.transfer.service.utils.TsdbUtils;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Repository;
+//import org.springframework.stereotype.Service;
+//
+//import javax.annotation.Resource;
+//import java.time.LocalDateTime;
+//import java.time.ZoneOffset;
+//import java.util.*;
+//
+///**
+// * 中间处理消息转发
+// */
+//@Service
+//@Repository
+//public class RocketMQSimpleContext {
+//    @Resource
+//    private MyProducer myProducer;
+//    @Autowired
+//    private QueryInfluxdbDataService queryInfluxdbDataService;
+//
+//    /**
+//     * 设备数据推送到MQ队列
+//     * @param
+//     */
+//    public void disposeMessageToDB(String message){
+//        Map<String, String> tags = new HashMap<>();
+//        Map<String, Object> fields = new HashMap<>();
+//        Map map_data = JsonUtils.fromJson(message,Map.class);
+//        String deviceId = map_data.get("deviceUUId").toString();
+//        String productCode = map_data.get("productCode").toString().toLowerCase();
+//        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+//
+//        String tableName = deviceId;
+//
+//        Object tg = JSONObject.toJSONString(map_data.get("tags"));
+//        JSONObject tag = JSON.parseObject(tg.toString());
+//        for (String entry : tag.keySet()){
+//            tags.put(entry.toLowerCase(),tag.get(entry).toString());
+//        }
+//
+//        Object met = JSONObject.toJSONString(map_data.get("metrics"));
+//        JSONObject metrics = JSON.parseObject(met.toString());
+//        for(String entry : metrics.keySet()){
+//            fields.put(entry.toLowerCase(),metrics.get(entry));
+//        }
+//
+//        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+//        deviceDataWriteVO.setDeviceUUId(deviceId);
+//        deviceDataWriteVO.setProductCode(productCode);
+//        deviceDataWriteVO.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
+//        deviceDataWriteVO.setTags(tags);
+//        deviceDataWriteVO.setMetrics(metrics);
+//
+//        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+//    }
+//}
+//
+//
+//
+//
+//
+//
+//
+//
+//
+//
+//
+//
+//
+//

+ 29 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalHistoryRequestVO.java

@@ -0,0 +1,29 @@
+package com.usky.transfer.service.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ExternalHistoryRequestVO implements Serializable {
+    /**
+     * 产品编码
+     */
+    private String productCode;
+
+    /**
+     * 设备Id
+     */
+    private List<String> deviceId;
+
+    /**
+     * 开始时间
+     */
+    private String startTime;
+
+    /**
+     * 结束时间
+     */
+    private String endTime;
+}

+ 19 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalHistoryResultVO.java

@@ -0,0 +1,19 @@
+package com.usky.transfer.service.vo;
+
+import com.usky.demo.domain.MetricVO;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ExternalHistoryResultVO implements Serializable {
+    private String deviceId;
+    private List<MetricVO> metrics;
+
+    public ExternalHistoryResultVO(String devId,List<MetricVO> metrics){
+        this.deviceId = devId;
+        this.metrics = metrics;
+    }
+
+}

+ 19 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalLastRequestVO.java

@@ -0,0 +1,19 @@
+package com.usky.transfer.service.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ExternalLastRequestVO implements Serializable {
+    /**
+     * 产品编码
+     */
+    private String productCode;
+
+    /**
+     * deviceId
+     */
+    private List<String> deviceId;
+}

+ 20 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/vo/ExternalLastResultVO.java

@@ -0,0 +1,20 @@
+package com.usky.transfer.service.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class ExternalLastResultVO implements Serializable {
+
+    private String deviceId;
+    private List<Map<String,Object>> metrics;
+
+    public ExternalLastResultVO(String devId,List<Map<String,Object>> metrics){
+        this.deviceId = devId;
+        this.metrics = metrics;
+    }
+
+}

+ 6 - 17
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java

@@ -12,22 +12,13 @@ import java.util.List;
 @FeignClient(contextId = "remoteTsdbProxyService", value = "data-tsdb-proxy", fallbackFactory = RemoteTsdbProxyFallbackFactory.class)
 public interface RemoteTsdbProxyService {
 
-//    /**
-//     * 单个设备数据写入
-//     * @return
-//     */
-//    @PostMapping("/sendDeviceData")
-//    ApiResult<Void> sendDeviceData(@RequestBody DeviceDataWriteVO writeVO);
-
     /**
      * 单个设备实时数据查询
-     * @param productCode
-     * @param deviceId
+     * @param deviceUUId
      * @return
      */
     @GetMapping("/last")
-    ApiResult<LastResultVO> queryLastDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                       @RequestParam(value = "deviceId") String deviceId);
+    LastResultVO queryLastDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId);
 
     /**
      * 批量设备实时数据查询
@@ -35,19 +26,17 @@ public interface RemoteTsdbProxyService {
      * @return
      */
     @PostMapping("/last")
-    ApiResult<List<LastResultVO>> queryLastDeviceData(@RequestBody LastRequestVO requestVO);
+    List<LastResultVO> queryLastDeviceData(@RequestBody LastRequestVO requestVO);
 
     /**
      * 单个设备历史数据查询
-     * @param productCode
-     * @param deviceId
+     * @param deviceUUId
      * @param startTime
      * @param endTime
      * @return
      */
     @GetMapping("/history")
-    ApiResult<HistoryResultVO> queryHistoryDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                             @RequestParam(value = "deviceId") String deviceId,
+    HistoryResultVO queryHistoryDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId,
                                                              @RequestParam(value = "startTime") String startTime,
                                                              @RequestParam(value = "endTime") String endTime);
 
@@ -57,5 +46,5 @@ public interface RemoteTsdbProxyService {
      * @return
      */
     @PostMapping("/history")
-    ApiResult<List<HistoryResultVO>> queryHistoryDeviceData(@RequestBody HistoryRequestVO requestVO);
+    List<HistoryResultVO> queryHistoryDeviceData(@RequestBody HistoryRequestVO requestVO);
 }

+ 3 - 8
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/HistoryRequestVO.java

@@ -8,15 +8,10 @@ import java.util.List;
 @Data
 public class HistoryRequestVO implements Serializable {
 
-    /**
-     * 产品编码
-     */
-    private String  productCode;
-
-    /**
-     * 设备Id
+   /**
+     * 设备UUId
      */
-    private List<String> deviceId;
+    private List<String> deviceUUId;
 
     /**
      * 开始时间

+ 2 - 6
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/HistoryResultVO.java

@@ -8,12 +8,8 @@ import java.util.Map;
 
 @Data
 public class HistoryResultVO implements Serializable {
-    private String deviceId;
+    private String deviceUUId;
+    private List<Map<String,String>> tags;
     private List<MetricVO> metrics;
 
-    public HistoryResultVO(String devId,List<MetricVO> metrics){
-        this.deviceId = devId;
-        this.metrics = metrics;
-    }
-
 }

+ 2 - 7
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/LastRequestVO.java

@@ -9,12 +9,7 @@ import java.util.List;
 public class LastRequestVO implements Serializable {
 
     /**
-     * 产品编码
+     * deviceUUId
      */
-    private String  productCode;
-
-    /**
-     * 设备Id
-     */
-    private List<String> deviceId;
+    private List<String> deviceUUId;
 }

+ 2 - 6
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/LastResultVO.java

@@ -9,12 +9,8 @@ import java.util.Map;
 @Data
 public class LastResultVO implements Serializable {
 
-    private String deviceId;
+    private String deviceUUId;
+    private List<Map<String,String>> tags;
     private List<Map<String,Object>> metrics;
 
-    public LastResultVO(String devId,List<Map<String,Object>> metrics){
-        this.deviceId = devId;
-        this.metrics = metrics;
-    }
-
 }

+ 0 - 5
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/domain/MetricVO.java

@@ -12,9 +12,4 @@ public class MetricVO implements Serializable {
     private String metric;
     private List<Map<String,Object>> metricItems;
 
-    public MetricVO(String metric,List<Map<String,Object>> metricItems){
-        this.metric = metric;
-        this.metricItems = metricItems;
-    }
-
 }

+ 9 - 9
data-tsdb-proxy/data-tsdb-proxy-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java

@@ -2,6 +2,7 @@ package com.usky.demo.factory;
 
 import com.usky.common.core.bean.ApiResult;
 import com.usky.common.core.exception.BusinessException;
+import com.usky.common.core.exception.FeignBadRequestException;
 import com.usky.demo.RemoteTsdbProxyService;
 import com.usky.demo.domain.*;
 import org.slf4j.Logger;
@@ -36,30 +37,29 @@ public class RemoteTsdbProxyFallbackFactory implements FallbackFactory<RemoteTsd
 //            }
 
             @Override
-            public ApiResult<LastResultVO> queryLastDeviceData(String productCode,String deviceId)
+            public LastResultVO queryLastDeviceData(String deviceUUId)
             {
-                return ApiResult.error("500","单个设备实时数据查询:" + throwable.getMessage());
+                throw new BusinessException("单个设备实时数据查询:" + throwable.getMessage());
             }
 
             @Override
-            public ApiResult<List<LastResultVO>> queryLastDeviceData(LastRequestVO requestVO)
+            public List<LastResultVO> queryLastDeviceData(LastRequestVO requestVO)
             {
-                return ApiResult.error("500","批量设备实时数据查询:" + throwable.getMessage());
+                throw new BusinessException("批量设备实时数据查询:" + throwable.getMessage());
             }
 
             @Override
-            public ApiResult<HistoryResultVO> queryHistoryDeviceData(String productCode,
-                                                                     String deviceId,
+            public HistoryResultVO queryHistoryDeviceData(String deviceUUId,
                                                                      String startTime,
                                                                      String endTime)
             {
-                return ApiResult.error("500","单个设备历史数据查询:" + throwable.getMessage());
+                throw new BusinessException("单个设备历史数据查询:" + throwable.getMessage());
             }
 
             @Override
-            public ApiResult<List<HistoryResultVO>> queryHistoryDeviceData(HistoryRequestVO requestVO)
+            public List<HistoryResultVO> queryHistoryDeviceData(HistoryRequestVO requestVO)
             {
-                return ApiResult.error("500","批量设备历史数据查询:" + throwable.getMessage());
+                throw new BusinessException("批量设备历史数据查询:" + throwable.getMessage());
             }
         };
     }

+ 10 - 24
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java

@@ -29,26 +29,14 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
     @Autowired
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
-//    /**
-//     * 单个设备数据写入
-//     * @return
-//     */
-//    @Override
-//    public ApiResult<Void> sendDeviceData(DeviceDataWriteVO writeVO){
-//        queryInfluxdbDataService.sendDeviceData(writeVO);
-//        return ApiResult.success();
-//    }
-
     /**
      * 单个设备实时数据查询
-     * @param productCode
-     * @param deviceId
+     * @param deviceUUId
      * @return
      */
     @Override
-    public ApiResult<LastResultVO> queryLastDeviceData(String productCode,
-                                                       String deviceId){
-        return ApiResult.success(queryInfluxdbDataService.queryLastDeviceData(productCode,deviceId));
+    public LastResultVO queryLastDeviceData(String deviceUUId){
+        return queryInfluxdbDataService.queryLastDeviceData(deviceUUId);
     }
 
     /**
@@ -57,24 +45,22 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
      * @return
      */
     @Override
-    public ApiResult<List<LastResultVO>> queryLastDeviceData(LastRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.queryLastDeviceData(requestVO));
+    public List<LastResultVO> queryLastDeviceData(LastRequestVO requestVO){
+        return queryInfluxdbDataService.queryLastDeviceData(requestVO);
     }
 
     /**
      * 单个设备历史数据查询
-     * @param productCode
-     * @param deviceId
+     * @param deviceUUId
      * @param startTime
      * @param endTime
      * @return
      */
     @Override
-    public ApiResult<HistoryResultVO> queryHistoryDeviceData(String productCode,
-                                                             String deviceId,
+    public HistoryResultVO queryHistoryDeviceData(String deviceUUId,
                                                              String startTime,
                                                              String endTime){
-        return ApiResult.success(queryInfluxdbDataService.queryHistoryDeviceData(productCode,deviceId,startTime,endTime));
+        return queryInfluxdbDataService.queryHistoryDeviceData(deviceUUId,startTime,endTime);
     }
 
     /**
@@ -83,7 +69,7 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
      * @return
      */
     @Override
-    public ApiResult<List<HistoryResultVO>> queryHistoryDeviceData(HistoryRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.queryHistoryDeviceData(requestVO));
+    public List<HistoryResultVO> queryHistoryDeviceData(HistoryRequestVO requestVO){
+        return queryInfluxdbDataService.queryHistoryDeviceData(requestVO);
     }
 }

+ 10 - 62
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/controller/web/QueryInfluxdbDataController.java

@@ -29,19 +29,17 @@ public class QueryInfluxdbDataController {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     /**
-     * 单个设备实时数据查询(内部)
-     * @param productCode
-     * @param deviceId
+     * 单个设备实时数据查询
+     * @param deviceUUId
      * @return
      */
     @GetMapping("/last")
-    public ApiResult<LastResultVO> queryLastDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                       @RequestParam(value = "deviceId") String deviceId){
-        return ApiResult.success(queryInfluxdbDataService.queryLastDeviceData(productCode,deviceId));
+    public ApiResult<LastResultVO> queryLastDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId){
+        return ApiResult.success(queryInfluxdbDataService.queryLastDeviceData(deviceUUId));
     }
 
     /**
-     * 批量设备实时数据查询(内部)
+     * 批量设备实时数据查询
      * @param requestVO
      * @return
      */
@@ -51,23 +49,21 @@ public class QueryInfluxdbDataController {
     }
 
     /**
-     * 单个设备历史数据查询(内部)
-     * @param productCode
-     * @param deviceId
+     * 单个设备历史数据查询
+     * @param deviceUUId
      * @param startTime
      * @param endTime
      * @return
      */
     @GetMapping("/history")
-    public ApiResult<HistoryResultVO> queryHistoryDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                             @RequestParam(value = "deviceId") String deviceId,
+    public ApiResult<HistoryResultVO> queryHistoryDeviceData(@RequestParam(value = "deviceUUId") String deviceUUId,
                                                              @RequestParam(value = "startTime") String startTime,
                                                              @RequestParam(value = "endTime") String endTime){
-        return ApiResult.success(queryInfluxdbDataService.queryHistoryDeviceData(productCode,deviceId,startTime,endTime));
+        return ApiResult.success(queryInfluxdbDataService.queryHistoryDeviceData(deviceUUId,startTime,endTime));
     }
 
     /**
-     * 批量设备历史数据查询(内部)
+     * 批量设备历史数据查询
      * @param requestVO
      * @return
      */
@@ -76,53 +72,5 @@ public class QueryInfluxdbDataController {
         return ApiResult.success(queryInfluxdbDataService.queryHistoryDeviceData(requestVO));
     }
 
-    /**
-     * 单个设备实时数据查询(对外)
-     * @param productCode
-     * @param deviceId
-     * @return
-     */
-    @GetMapping("/lastOuter")
-    public ApiResult<LastResultVO> queryOuterLastDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                       @RequestParam(value = "deviceId") String deviceId){
-        return ApiResult.success(queryInfluxdbDataService.queryOuterLastDeviceData(productCode,deviceId));
-    }
-
-    /**
-     * 批量设备实时数据查询(对外)
-     * @param requestVO
-     * @return
-     */
-    @PostMapping("/lastOuter")
-    public ApiResult<List<LastResultVO>> queryOuterLastDeviceData(@RequestBody LastRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.queryOuterLastDeviceData(requestVO));
-    }
-
-    /**
-     * 单个设备历史数据查询(对外)
-     * @param productCode
-     * @param deviceId
-     * @param startTime
-     * @param endTime
-     * @return
-     */
-    @GetMapping("/historyOuter")
-    public ApiResult<HistoryResultVO> queryOuterHistoryDeviceData(@RequestParam(value = "productCode") String productCode,
-                                                             @RequestParam(value = "deviceId") String deviceId,
-                                                             @RequestParam(value = "startTime") String startTime,
-                                                             @RequestParam(value = "endTime") String endTime){
-        return ApiResult.success(queryInfluxdbDataService.queryOuterHistoryDeviceData(productCode,deviceId,startTime,endTime));
-    }
-
-    /**
-     * 批量设备历史数据查询(对外)
-     * @param requestVO
-     * @return
-     */
-    @PostMapping("/historyOuter")
-    public ApiResult<List<HistoryResultVO>> queryOuterHistoryDeviceData(@RequestBody HistoryRequestVO requestVO){
-        return ApiResult.success(queryInfluxdbDataService.queryOuterHistoryDeviceData(requestVO));
-    }
-
 }
 

+ 2 - 12
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/QueryInfluxdbDataService.java

@@ -16,22 +16,12 @@ import java.util.Map;
  */
 public interface QueryInfluxdbDataService extends CrudService<QueryInfluxdbData> {
 
-    LastResultVO queryLastDeviceData(String productCode, String deviceId);
+    LastResultVO queryLastDeviceData(String deviceUUId);
 
     List<LastResultVO> queryLastDeviceData(LastRequestVO requestVO);
 
-    HistoryResultVO queryHistoryDeviceData(String productCode, String deviceId, String startTime, String endTime);
+    HistoryResultVO queryHistoryDeviceData(String deviceUUId, String startTime, String endTime);
 
     List<HistoryResultVO> queryHistoryDeviceData(HistoryRequestVO requestVO);
 
-    LastResultVO queryOuterLastDeviceData(String productCode, String deviceId);
-
-    List<LastResultVO> queryOuterLastDeviceData(LastRequestVO requestVO);
-
-    HistoryResultVO queryOuterHistoryDeviceData(String productCode, String deviceId, String startTime, String endTime);
-
-    List<HistoryResultVO> queryOuterHistoryDeviceData(HistoryRequestVO requestVO);
-
-//    Map<String,Object> sendDeviceData(DeviceDataWriteVO writeVO);
-
 }

+ 50 - 166
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -15,6 +15,7 @@ import com.usky.demo.service.rocketmq.MyProducer;
 import com.usky.demo.service.utils.TsdbUtils;
 import com.usky.demo.service.vo.DeviceMapVO;
 import com.usky.demo.service.vo.ProductMapVO;
+import org.apache.commons.collections.ResettableListIterator;
 import org.influxdb.dto.QueryResult;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cache.annotation.Cacheable;
@@ -43,41 +44,6 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     @Autowired
     private DmpDeviceService dmpDeviceService;
 
-    @Resource
-    private MyProducer myProducer;
-
-//    @Override
-//    public Map<String,Object> sendDeviceData(DeviceDataWriteVO writeVO){
-//        Map<String,Object> rec_map = new HashMap<>();
-//        DeviceDataInfoVO dataInfo = new DeviceDataInfoVO();
-//        Map<String,Object> metrics = writeVO.getMetrics();
-//        Map<String,String> tags = writeVO.getTags();
-//        if(metrics.size() > 0){
-//            Map<String,Object> mp = new HashMap<>();
-//            for(Map.Entry<String,Object> map:metrics.entrySet()){
-//                mp.put(map.getKey(),map.getValue());
-//            }
-//            dataInfo.setMetrics(mp);
-//            if(tags != null && tags.size() > 0){
-//                mp.clear();
-//                for(Map.Entry<String,String> map:tags.entrySet()){
-//                    mp.put(map.getKey(),map.getValue());
-//                }
-//                dataInfo.setTags(mp);
-//            }else{
-//                dataInfo.setTags(new HashMap<>());
-//            }
-//            dataInfo.setProduct_code(writeVO.getProductCode());
-//            dataInfo.setDevice_id(writeVO.getDeviceId());
-//            dataInfo.setTimestamp(writeVO.getTimestamp());
-//
-//            myProducer.sendMessage("data-collector", JSONArray.toJSON(dataInfo).toString());
-//        }
-//
-//        rec_map.put("code",200);
-//        rec_map.put("message","操作成功!");
-//        return rec_map;
-//    }
 
     @Cacheable(cacheNames = "productList",sync = true)
     public Map<String, ProductMapVO> getProductMap(){
@@ -128,12 +94,26 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     }
 
     @Override
-    public LastResultVO queryLastDeviceData(String productCode, String deviceId){
+    public LastResultVO queryLastDeviceData(String deviceUUId){
 
-        String tableName = productCode + "_" + deviceId;
+        String tableName = deviceUUId;
         String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
         List<Map<String, Object>> metrics = tsdbUtils.fetchRecords(query);
-        LastResultVO resultVO = new LastResultVO(deviceId,metrics);
+        String tagQuery = "SELECT \"deviceid\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+        List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+        LastResultVO resultVO = new LastResultVO();
+        if(tag.size() == 0){
+            Map<String,String> tagMap = new HashMap<>();
+            tagMap.put("deviceId","");
+            tag.add(tagMap);
+            resultVO.setDeviceUUId(deviceUUId);
+            resultVO.setTags(tag);
+            resultVO.setMetrics(metrics);
+        }else{
+            resultVO.setDeviceUUId(deviceUUId);
+            resultVO.setTags(tag);
+            resultVO.setMetrics(metrics);
+        }
 
         return resultVO;
     }
@@ -141,14 +121,18 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     @Override
     public List<LastResultVO> queryLastDeviceData(LastRequestVO requestVO){
         List<LastResultVO> result = new ArrayList<>();
-        String productCode = requestVO.getProductCode();
-        List<String> deviceIds = requestVO.getDeviceId();
-        if(CollectionUtils.isNotEmpty(deviceIds)){
-            for (int i = 0; i < deviceIds.size(); i++) {
-                String tableName = productCode + "_" + deviceIds.get(i);
+        List<String> deviceUUIds = requestVO.getDeviceUUId();
+        if(CollectionUtils.isNotEmpty(deviceUUIds)){
+            for (int i = 0; i < deviceUUIds.size(); i++) {
+                String tableName = deviceUUIds.get(i);
                 String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
                 List<Map<String, Object>> metrics = tsdbUtils.fetchRecords(query);
-                LastResultVO resultVO = new LastResultVO(deviceIds.get(i),metrics);
+                String tagQuery = "SELECT \"deviceid\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+                List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
+                LastResultVO resultVO = new LastResultVO();
+                resultVO.setDeviceUUId(deviceUUIds.get(i));
+                resultVO.setTags(tag);
+                resultVO.setMetrics(metrics);
                 result.add(resultVO);
             }
         }
@@ -158,11 +142,13 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     }
 
     @Override
-    public HistoryResultVO queryHistoryDeviceData(String productCode,String deviceId,String startTime,String endTime){
+    public HistoryResultVO queryHistoryDeviceData(String deviceUUId,String startTime,String endTime){
         List<MetricVO> metricList = new ArrayList<>();
-        String tableName = productCode + "_" + deviceId;
+        String tableName = deviceUUId;
         String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
         QueryResult queryResult = tsdbUtils.query(query);
+        String tagQuery = "SELECT \"deviceid\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+        List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
         if(queryResult.getResults().get(0).getSeries() != null){
             List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
             if(CollectionUtils.isNotEmpty(fields)){
@@ -180,7 +166,9 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                                 map.put("value",metircs.get(k).get(field));
                                 metircItems.add(map);
                             }
-                            MetricVO metricVO = new MetricVO(fields.get(j),metircItems);
+                            MetricVO metricVO = new MetricVO();
+                            metricVO.setMetric(fields.get(j));
+                            metricVO.setMetricItems(metircItems);
                             metricList.add(metricVO);
                         }
 
@@ -190,22 +178,26 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
             }
         }
 
-        HistoryResultVO resultVO = new HistoryResultVO(deviceId,metricList);
+        HistoryResultVO resultVO = new HistoryResultVO();
+        resultVO.setDeviceUUId(deviceUUId);
+        resultVO.setTags(tag);
+        resultVO.setMetrics(metricList);
         return resultVO;
     }
 
     @Override
     public List<HistoryResultVO> queryHistoryDeviceData(HistoryRequestVO requestVO){
         List<HistoryResultVO> list = new ArrayList<>();
-        String productCode = requestVO.getProductCode();
-        List<String> deviceIds = requestVO.getDeviceId();
+        List<String> deviceIds = requestVO.getDeviceUUId();
         String startTime = requestVO.getStartTime();
         String endTime = requestVO.getEndTime();
         for (int i = 0; i < deviceIds.size(); i++) {
             List<MetricVO> metricList = new ArrayList<>();
-            String tableName = productCode + "_" + deviceIds.get(i);
+            String tableName = deviceIds.get(i);
             String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
             QueryResult queryResult = tsdbUtils.query(query);
+            String tagQuery = "SELECT \"deviceid\",*::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
+            List<Map<String, String>> tag = tsdbUtils.fetchTagRecords(tagQuery);
             if(queryResult.getResults().get(0).getSeries() != null){
                 List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
                 if(CollectionUtils.isNotEmpty(fields)){
@@ -223,7 +215,9 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                                     map.put("value",metircs.get(k).get(field));
                                     metircItems.add(map);
                                 }
-                                MetricVO metricVO = new MetricVO(fields.get(j),metircItems);
+                                MetricVO metricVO = new MetricVO();
+                                metricVO.setMetric(fields.get(j));
+                                metricVO.setMetricItems(metircItems);
                                 metricList.add(metricVO);
                             }
 
@@ -233,7 +227,10 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 }
             }
 
-            HistoryResultVO resultVO = new HistoryResultVO(deviceIds.get(i),metricList);
+            HistoryResultVO resultVO = new HistoryResultVO();
+            resultVO.setDeviceUUId(deviceIds.get(i));
+            resultVO.setTags(tag);
+            resultVO.setMetrics(metricList);
             list.add(resultVO);
         }
 
@@ -241,117 +238,4 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     }
 
 
-    @Override
-    public LastResultVO queryOuterLastDeviceData(String productCode, String deviceId){
-
-        String tableName = productCode + "_" + deviceId;
-        String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-        List<Map<String, Object>> metrics = tsdbUtils.fetchRecords(query);
-        LastResultVO resultVO = new LastResultVO(deviceId,metrics);
-
-        return resultVO;
-    }
-
-    @Override
-    public List<LastResultVO> queryOuterLastDeviceData(LastRequestVO requestVO){
-        List<LastResultVO> result = new ArrayList<>();
-        String productCode = requestVO.getProductCode();
-        List<String> deviceIds = requestVO.getDeviceId();
-        if(CollectionUtils.isNotEmpty(deviceIds)){
-            for (int i = 0; i < deviceIds.size(); i++) {
-                String tableName = productCode + "_" + deviceIds.get(i);
-                String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-                List<Map<String, Object>> metrics = tsdbUtils.fetchRecords(query);
-                LastResultVO resultVO = new LastResultVO(deviceIds.get(i),metrics);
-                result.add(resultVO);
-            }
-        }
-
-        return result;
-
-    }
-
-    @Override
-    public HistoryResultVO queryOuterHistoryDeviceData(String productCode,String deviceId,String startTime,String endTime){
-        List<MetricVO> metricList = new ArrayList<>();
-        String tableName = productCode + "_" + deviceId;
-        String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-        QueryResult queryResult = tsdbUtils.query(query);
-        if(queryResult.getResults().get(0).getSeries() != null){
-            List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
-            if(CollectionUtils.isNotEmpty(fields)){
-                String fieldQuery = "SELECT *::field FROM \""+tableName+"\" where time >= '"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
-                List<Map<String,Object>> metircs = tsdbUtils.fetchRecords(fieldQuery);
-
-                if(CollectionUtils.isNotEmpty(metircs)){
-                    for (int j = 0; j < fields.size(); j++) {
-                        List<Map<String,Object>> metircItems = new ArrayList<>();
-                        if(!"time".equals(fields.get(j))){
-                            for (int k = 0; k < metircs.size(); k++) {
-                                String field = fields.get(j);
-                                Map<String,Object> map = new HashMap<>();
-                                map.put("timestamp",metircs.get(k).get("time"));
-                                map.put("value",metircs.get(k).get(field));
-                                metircItems.add(map);
-                            }
-                            MetricVO metricVO = new MetricVO(fields.get(j),metircItems);
-                            metricList.add(metricVO);
-                        }
-
-                    }
-                }
-
-            }
-        }
-
-        HistoryResultVO resultVO = new HistoryResultVO(deviceId,metricList);
-        return resultVO;
-    }
-
-    @Override
-    public List<HistoryResultVO> queryOuterHistoryDeviceData(HistoryRequestVO requestVO){
-        List<HistoryResultVO> list = new ArrayList<>();
-        String productCode = requestVO.getProductCode();
-        List<String> deviceIds = requestVO.getDeviceId();
-        String startTime = requestVO.getStartTime();
-        String endTime = requestVO.getEndTime();
-        for (int i = 0; i < deviceIds.size(); i++) {
-            List<MetricVO> metricList = new ArrayList<>();
-            String tableName = productCode + "_" + deviceIds.get(i);
-            String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
-            QueryResult queryResult = tsdbUtils.query(query);
-            if(queryResult.getResults().get(0).getSeries() != null){
-                List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
-                if(CollectionUtils.isNotEmpty(fields)){
-                    String fieldQuery = "SELECT *::field FROM \""+tableName+"\" where time >= '"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
-                    List<Map<String,Object>> metircs = tsdbUtils.fetchRecords(fieldQuery);
-
-                    if(CollectionUtils.isNotEmpty(metircs)){
-                        for (int j = 0; j < fields.size(); j++) {
-                            List<Map<String,Object>> metircItems = new ArrayList<>();
-                            if(!"time".equals(fields.get(j))){
-                                for (int k = 0; k < metircs.size(); k++) {
-                                    String field = fields.get(j);
-                                    Map<String,Object> map = new HashMap<>();
-                                    map.put("timestamp",metircs.get(k).get("time"));
-                                    map.put("value",metircs.get(k).get(field));
-                                    metircItems.add(map);
-                                }
-                                MetricVO metricVO = new MetricVO(fields.get(j),metircItems);
-                                metricList.add(metricVO);
-                            }
-
-                        }
-                    }
-
-                }
-            }
-
-            HistoryResultVO resultVO = new HistoryResultVO(deviceIds.get(i),metricList);
-            list.add(resultVO);
-        }
-
-        return list;
-    }
-
 }

+ 3 - 3
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/rocketmq/SimpleContext.java

@@ -27,10 +27,10 @@ public class SimpleContext {
         Map<String, String> tags = new HashMap<>();
         Map<String, Object> fields = new HashMap<>();
         Map map_data = JsonUtils.fromJson(message,Map.class);
-        String deviceId = map_data.get("device_id").toString();
-        String productId = map_data.get("product_code").toString().toLowerCase();
+        String deviceUUId = map_data.get("deviceUUId").toString();
+        String productCode = map_data.get("productCode").toString().toLowerCase();
         Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
-        String tableName = deviceId;
+        String tableName = deviceUUId;
 
         Object tg = JSONObject.toJSONString(map_data.get("tags"));
         JSONObject tag = JSON.parseObject(tg.toString());

+ 27 - 0
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/utils/TsdbUtils.java

@@ -122,6 +122,33 @@ public class TsdbUtils extends InfluxDbUtils {
         influxDB.write(batchPoints);
     }
 
+    /**
+     * 查询,返回Map集合
+     *
+     * @param query 完整的查询语句
+     * @return
+     */
+    public List<Map<String, String>> fetchTagRecords(String query) {
+        List<Map<String, String>> results = new ArrayList<>();
+        QueryResult queryResult = influxDB.query(new Query(query, database));
+        if(queryResult.getResults().get(0).getSeries() != null){
+            queryResult.getResults().forEach(result -> {
+                result.getSeries().forEach(serial -> {
+                    List<String> columns = serial.getColumns();
+                    serial.getValues().forEach(value -> {
+                        Map<String, String> obj = new HashMap<String,String>();
+                        for (int i = 1; i < 2; i++) {
+                            obj.put(columns.get(i), value.get(i).toString());
+                        }
+                        results.add(obj);
+                    });
+                });
+            });
+        }
+
+        return results;
+    }
+
     /**
      * 查询,返回Map集合
      *