|
@@ -1,5 +1,6 @@
|
|
|
package com.usky.transfer.service.impl;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.nacos.shaded.com.google.protobuf.Internal;
|
|
@@ -7,6 +8,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
+import com.usky.common.core.exception.BusinessException;
|
|
|
import com.usky.common.core.util.UUIDUtils;
|
|
|
import com.usky.common.security.utils.SecurityUtils;
|
|
|
import com.usky.transfer.domain.*;
|
|
@@ -49,6 +51,8 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
|
|
|
private DmpDeviceStatusService dmpDeviceStatusService;
|
|
|
@Autowired
|
|
|
private DmpDeviceCommandService dmpDeviceCommandService;
|
|
|
+ @Autowired
|
|
|
+ private DmpProductAttributeService dmpProductAttributeService;
|
|
|
|
|
|
@Resource
|
|
|
private MyProducer myProducer;
|
|
@@ -252,9 +256,55 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
|
|
|
Map<String,Object> rec_map = new HashMap<>();
|
|
|
|
|
|
String topic = mqttDeviceDataVO.getTopic();
|
|
|
+ String[] topics = topic.split("/");
|
|
|
+ String productCode1 = topics[1];
|
|
|
+ String deviceId1 = topics[2];
|
|
|
String payload = mqttDeviceDataVO.getDeviceData();
|
|
|
|
|
|
JSONObject deviceDataJson = JSONObject.parseObject(payload);
|
|
|
+ String productCode2 = deviceDataJson.get("productCode").toString();
|
|
|
+ Object tag = JSONObject.toJSONString(deviceDataJson.get("tags"));
|
|
|
+ JSONObject tagJson = JSON.parseObject(tag.toString());
|
|
|
+ String deviceId2 = tagJson.get("device_id").toString();
|
|
|
+ Object metric = JSONObject.toJSONString(deviceDataJson.get("metrics"));
|
|
|
+ JSONObject metricJson = JSON.parseObject(metric.toString());
|
|
|
+
|
|
|
+ if(!productCode1.equals(productCode2) || !deviceId1.equals(deviceId2)){
|
|
|
+ throw new BusinessException("Topic和请求体中的产品编码、设备Id不一致,请修改");
|
|
|
+ }
|
|
|
+
|
|
|
+ LambdaQueryWrapper<DmpProduct> lambdaQuery = Wrappers.lambdaQuery();
|
|
|
+ lambdaQuery.eq(DmpProduct::getProductCode,productCode1)
|
|
|
+ .eq(DmpProduct::getDeleteFlag,0);
|
|
|
+ List<DmpProduct> dmpProductList = dmpProductService.list(lambdaQuery);
|
|
|
+ if(dmpProductList.size() <= 0){
|
|
|
+ throw new BusinessException("产品编码不存在,请修改");
|
|
|
+ }
|
|
|
+ LambdaQueryWrapper<DmpDevice> lambdaQuery2 = Wrappers.lambdaQuery();
|
|
|
+ lambdaQuery2.eq(DmpDevice::getDeviceId,deviceId1)
|
|
|
+ .eq(DmpDevice::getProductCode,productCode1)
|
|
|
+ .eq(DmpDevice::getServiceStatus,2)
|
|
|
+ .eq(DmpDevice::getDeleteFlag,0);
|
|
|
+ List<DmpDevice> dmpDeviceList = dmpDeviceService.list(lambdaQuery2);
|
|
|
+ if(dmpDeviceList.size() <= 0){
|
|
|
+ throw new BusinessException("设备Id不存在或在本产品下不存在,请修改");
|
|
|
+ }
|
|
|
+
|
|
|
+ List<String> metricList = new ArrayList<>();
|
|
|
+ if(Objects.nonNull(metricJson)){
|
|
|
+ for(String entry: metricJson.keySet()){
|
|
|
+ metricList.add(entry.toLowerCase());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int count1 = metricList.size();
|
|
|
+ LambdaQueryWrapper<DmpProductAttribute> lambdaQuery3 = Wrappers.lambdaQuery();
|
|
|
+ lambdaQuery3.eq(DmpProductAttribute::getProductId,dmpProductList.get(0).getId())
|
|
|
+ .in(DmpProductAttribute::getAttributeCode,metricList);
|
|
|
+ int count2 = dmpProductAttributeService.count(lambdaQuery3);
|
|
|
+ if(count1 > count2){
|
|
|
+ throw new BusinessException("设备属性编码不存在,请检查");
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
mqttGateway.sendToMqtt(topic, deviceDataJson.toJSONString());
|
|
|
|