|
@@ -1,15 +1,23 @@
|
|
|
package com.usky.demo.service.impl;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
import com.usky.demo.domain.*;
|
|
|
import com.usky.demo.mapper.QueryInfluxdbDataMapper;
|
|
|
+import com.usky.demo.service.DmpDeviceService;
|
|
|
+import com.usky.demo.service.DmpProductService;
|
|
|
import com.usky.demo.service.QueryInfluxdbDataService;
|
|
|
import com.usky.common.mybatis.core.AbstractCrudService;
|
|
|
import com.usky.demo.service.rocketmq.MyProducer;
|
|
|
import com.usky.demo.service.utils.TsdbUtils;
|
|
|
+import com.usky.demo.service.vo.DeviceMapVO;
|
|
|
+import com.usky.demo.service.vo.ProductMapVO;
|
|
|
import org.influxdb.dto.QueryResult;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.cache.annotation.Cacheable;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
@@ -30,41 +38,93 @@ import java.util.Map;
|
|
|
public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInfluxdbDataMapper, QueryInfluxdbData> implements QueryInfluxdbDataService {
|
|
|
@Autowired
|
|
|
private TsdbUtils tsdbUtils;
|
|
|
+ @Autowired
|
|
|
+ private DmpProductService dmpProductService;
|
|
|
+ @Autowired
|
|
|
+ private DmpDeviceService dmpDeviceService;
|
|
|
|
|
|
@Resource
|
|
|
private MyProducer myProducer;
|
|
|
|
|
|
- @Override
|
|
|
- public Map<String,Object> sendDeviceData(DeviceDataWriteVO writeVO){
|
|
|
- Map<String,Object> rec_map = new HashMap<>();
|
|
|
- DeviceDataInfoVO dataInfo = new DeviceDataInfoVO();
|
|
|
- Map<String,Object> metrics = writeVO.getMetrics();
|
|
|
- Map<String,String> tags = writeVO.getTags();
|
|
|
- if(metrics.size() > 0){
|
|
|
- Map<String,Object> mp = new HashMap<>();
|
|
|
- for(Map.Entry<String,Object> map:metrics.entrySet()){
|
|
|
- mp.put(map.getKey(),map.getValue());
|
|
|
- }
|
|
|
- dataInfo.setMetrics(mp);
|
|
|
- if(tags != null && tags.size() > 0){
|
|
|
- mp.clear();
|
|
|
- for(Map.Entry<String,String> map:tags.entrySet()){
|
|
|
- mp.put(map.getKey(),map.getValue());
|
|
|
- }
|
|
|
- dataInfo.setTags(mp);
|
|
|
- }else{
|
|
|
- dataInfo.setTags(new HashMap<>());
|
|
|
+// @Override
|
|
|
+// public Map<String,Object> sendDeviceData(DeviceDataWriteVO writeVO){
|
|
|
+// Map<String,Object> rec_map = new HashMap<>();
|
|
|
+// DeviceDataInfoVO dataInfo = new DeviceDataInfoVO();
|
|
|
+// Map<String,Object> metrics = writeVO.getMetrics();
|
|
|
+// Map<String,String> tags = writeVO.getTags();
|
|
|
+// if(metrics.size() > 0){
|
|
|
+// Map<String,Object> mp = new HashMap<>();
|
|
|
+// for(Map.Entry<String,Object> map:metrics.entrySet()){
|
|
|
+// mp.put(map.getKey(),map.getValue());
|
|
|
+// }
|
|
|
+// dataInfo.setMetrics(mp);
|
|
|
+// if(tags != null && tags.size() > 0){
|
|
|
+// mp.clear();
|
|
|
+// for(Map.Entry<String,String> map:tags.entrySet()){
|
|
|
+// mp.put(map.getKey(),map.getValue());
|
|
|
+// }
|
|
|
+// dataInfo.setTags(mp);
|
|
|
+// }else{
|
|
|
+// dataInfo.setTags(new HashMap<>());
|
|
|
+// }
|
|
|
+// dataInfo.setProduct_code(writeVO.getProductCode());
|
|
|
+// dataInfo.setDevice_id(writeVO.getDeviceId());
|
|
|
+// dataInfo.setTimestamp(writeVO.getTimestamp());
|
|
|
+//
|
|
|
+// myProducer.sendMessage("data-collector", JSONArray.toJSON(dataInfo).toString());
|
|
|
+// }
|
|
|
+//
|
|
|
+// rec_map.put("code",200);
|
|
|
+// rec_map.put("message","操作成功!");
|
|
|
+// return rec_map;
|
|
|
+// }
|
|
|
+
|
|
|
+ @Cacheable(cacheNames = "productList",sync = true)
|
|
|
+ public Map<String, ProductMapVO> getProductMap(){
|
|
|
+ Map<String,ProductMapVO> productMap = new HashMap<>();
|
|
|
+ LambdaQueryWrapper<DmpProduct> queryWrapper = Wrappers.lambdaQuery();
|
|
|
+ queryWrapper.eq(DmpProduct::getDeleteFlag,0)
|
|
|
+ .orderByDesc(DmpProduct::getId);
|
|
|
+ List<DmpProduct> list = dmpProductService.list(queryWrapper);
|
|
|
+ if(CollectionUtils.isNotEmpty(list)){
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ String productCode = list.get(i).getProductCode();
|
|
|
+ ProductMapVO mapVO = new ProductMapVO();
|
|
|
+ mapVO.setProductId(list.get(i).getId());
|
|
|
+ mapVO.setProductCode(list.get(i).getProductCode());
|
|
|
+ mapVO.setCreatedBy(list.get(i).getCreatedBy());
|
|
|
+ mapVO.setTenantId(list.get(i).getTenantId());
|
|
|
+ mapVO.setDeviceType(list.get(i).getDeviceType());
|
|
|
+
|
|
|
+ productMap.put(productCode,mapVO);
|
|
|
}
|
|
|
- dataInfo.setProduct_code(writeVO.getProductCode());
|
|
|
- dataInfo.setDevice_id(writeVO.getDeviceId());
|
|
|
- dataInfo.setTimestamp(writeVO.getTimestamp());
|
|
|
+ }
|
|
|
|
|
|
- myProducer.sendMessage("data-collector", JSONArray.toJSON(dataInfo).toString());
|
|
|
+
|
|
|
+ return productMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Cacheable(cacheNames = "deviceList",sync = true)
|
|
|
+ public Map<String, DeviceMapVO> getDeviceMap(){
|
|
|
+ Map<String,DeviceMapVO> deviceMap = new HashMap<>();
|
|
|
+ LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
|
|
|
+ queryWrapper.eq(DmpDevice::getDeleteFlag,0)
|
|
|
+ .orderByDesc(DmpDevice::getId);
|
|
|
+ List<DmpDevice> list = dmpDeviceService.list(queryWrapper);
|
|
|
+ if(CollectionUtils.isNotEmpty(list)){
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ String deviceId = list.get(i).getDeviceId();
|
|
|
+ DeviceMapVO mapVO = new DeviceMapVO();
|
|
|
+ mapVO.setProductCode(list.get(i).getProductCode());
|
|
|
+ mapVO.setDeviceId(list.get(i).getDeviceId());
|
|
|
+ mapVO.setDeviceUuid(list.get(i).getDeviceUuid());
|
|
|
+ mapVO.setDeviceStatus(list.get(i).getServiceStatus());
|
|
|
+
|
|
|
+ deviceMap.put(deviceId,mapVO);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- rec_map.put("code",200);
|
|
|
- rec_map.put("message","操作成功!");
|
|
|
- return rec_map;
|
|
|
+ return deviceMap;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -180,4 +240,118 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public LastResultVO queryOuterLastDeviceData(String productCode, String deviceId){
|
|
|
+
|
|
|
+ String tableName = productCode + "_" + deviceId;
|
|
|
+ String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
|
|
|
+ List<Map<String, Object>> metrics = tsdbUtils.fetchRecords(query);
|
|
|
+ LastResultVO resultVO = new LastResultVO(deviceId,metrics);
|
|
|
+
|
|
|
+ return resultVO;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<LastResultVO> queryOuterLastDeviceData(LastRequestVO requestVO){
|
|
|
+ List<LastResultVO> result = new ArrayList<>();
|
|
|
+ String productCode = requestVO.getProductCode();
|
|
|
+ List<String> deviceIds = requestVO.getDeviceId();
|
|
|
+ if(CollectionUtils.isNotEmpty(deviceIds)){
|
|
|
+ for (int i = 0; i < deviceIds.size(); i++) {
|
|
|
+ String tableName = productCode + "_" + deviceIds.get(i);
|
|
|
+ String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
|
|
|
+ List<Map<String, Object>> metrics = tsdbUtils.fetchRecords(query);
|
|
|
+ LastResultVO resultVO = new LastResultVO(deviceIds.get(i),metrics);
|
|
|
+ result.add(resultVO);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public HistoryResultVO queryOuterHistoryDeviceData(String productCode,String deviceId,String startTime,String endTime){
|
|
|
+ List<MetricVO> metricList = new ArrayList<>();
|
|
|
+ String tableName = productCode + "_" + deviceId;
|
|
|
+ String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
|
|
|
+ QueryResult queryResult = tsdbUtils.query(query);
|
|
|
+ if(queryResult.getResults().get(0).getSeries() != null){
|
|
|
+ List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
|
|
|
+ if(CollectionUtils.isNotEmpty(fields)){
|
|
|
+ String fieldQuery = "SELECT *::field FROM \""+tableName+"\" where time >= '"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
|
|
|
+ List<Map<String,Object>> metircs = tsdbUtils.fetchRecords(fieldQuery);
|
|
|
+
|
|
|
+ if(CollectionUtils.isNotEmpty(metircs)){
|
|
|
+ for (int j = 0; j < fields.size(); j++) {
|
|
|
+ List<Map<String,Object>> metircItems = new ArrayList<>();
|
|
|
+ if(!"time".equals(fields.get(j))){
|
|
|
+ for (int k = 0; k < metircs.size(); k++) {
|
|
|
+ String field = fields.get(j);
|
|
|
+ Map<String,Object> map = new HashMap<>();
|
|
|
+ map.put("timestamp",metircs.get(k).get("time"));
|
|
|
+ map.put("value",metircs.get(k).get(field));
|
|
|
+ metircItems.add(map);
|
|
|
+ }
|
|
|
+ MetricVO metricVO = new MetricVO(fields.get(j),metircItems);
|
|
|
+ metricList.add(metricVO);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ HistoryResultVO resultVO = new HistoryResultVO(deviceId,metricList);
|
|
|
+ return resultVO;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<HistoryResultVO> queryOuterHistoryDeviceData(HistoryRequestVO requestVO){
|
|
|
+ List<HistoryResultVO> list = new ArrayList<>();
|
|
|
+ String productCode = requestVO.getProductCode();
|
|
|
+ List<String> deviceIds = requestVO.getDeviceId();
|
|
|
+ String startTime = requestVO.getStartTime();
|
|
|
+ String endTime = requestVO.getEndTime();
|
|
|
+ for (int i = 0; i < deviceIds.size(); i++) {
|
|
|
+ List<MetricVO> metricList = new ArrayList<>();
|
|
|
+ String tableName = productCode + "_" + deviceIds.get(i);
|
|
|
+ String query = "SELECT *::field FROM \""+tableName+"\" order by desc limit 1 tz('Asia/Shanghai')";
|
|
|
+ QueryResult queryResult = tsdbUtils.query(query);
|
|
|
+ if(queryResult.getResults().get(0).getSeries() != null){
|
|
|
+ List<String> fields = queryResult.getResults().get(0).getSeries().get(0).getColumns();
|
|
|
+ if(CollectionUtils.isNotEmpty(fields)){
|
|
|
+ String fieldQuery = "SELECT *::field FROM \""+tableName+"\" where time >= '"+startTime+"' and time <= '"+endTime+"' tz('Asia/Shanghai')";
|
|
|
+ List<Map<String,Object>> metircs = tsdbUtils.fetchRecords(fieldQuery);
|
|
|
+
|
|
|
+ if(CollectionUtils.isNotEmpty(metircs)){
|
|
|
+ for (int j = 0; j < fields.size(); j++) {
|
|
|
+ List<Map<String,Object>> metircItems = new ArrayList<>();
|
|
|
+ if(!"time".equals(fields.get(j))){
|
|
|
+ for (int k = 0; k < metircs.size(); k++) {
|
|
|
+ String field = fields.get(j);
|
|
|
+ Map<String,Object> map = new HashMap<>();
|
|
|
+ map.put("timestamp",metircs.get(k).get("time"));
|
|
|
+ map.put("value",metircs.get(k).get(field));
|
|
|
+ metircItems.add(map);
|
|
|
+ }
|
|
|
+ MetricVO metricVO = new MetricVO(fields.get(j),metircItems);
|
|
|
+ metricList.add(metricVO);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ HistoryResultVO resultVO = new HistoryResultVO(deviceIds.get(i),metricList);
|
|
|
+ list.add(resultVO);
|
|
|
+ }
|
|
|
+
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
}
|