Kaynağa Gözat

优化data-transfer模块中订阅设备指标数据mqtt代码逻辑

james 3 ay önce
ebeveyn
işleme
1cda50736f

+ 3 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/DmpProductServiceImpl.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.usky.common.mybatis.core.AbstractCrudService;
+import com.usky.common.security.utils.SecurityUtils;
 import com.usky.transfer.domain.DmpDevice;
 import com.usky.transfer.domain.DmpProduct;
 import com.usky.transfer.mapper.DmpProductMapper;
@@ -52,6 +53,7 @@ public class DmpProductServiceImpl extends AbstractCrudService<DmpProductMapper,
         Map<String,ProductMapVO> productMap = new HashMap<>();
         LambdaQueryWrapper<DmpProduct> queryWrapper = Wrappers.lambdaQuery();
         queryWrapper.eq(DmpProduct::getDeleteFlag,0)
+                .eq(DmpProduct::getTenantId, SecurityUtils.getTenantId())
                 .orderByDesc(DmpProduct::getId);
         List<DmpProduct> list = this.list(queryWrapper);
         if(CollectionUtils.isNotEmpty(list)){
@@ -77,6 +79,7 @@ public class DmpProductServiceImpl extends AbstractCrudService<DmpProductMapper,
         Map<String,DeviceMapVO> deviceMap = new HashMap<>();
         LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
         queryWrapper.eq(DmpDevice::getDeleteFlag,0)
+                .eq(DmpDevice::getTenantId,SecurityUtils.getTenantId())
                 .orderByDesc(DmpDevice::getId);
         List<DmpDevice> list = dmpDeviceService.list(queryWrapper);
         if(CollectionUtils.isNotEmpty(list)){

+ 5 - 3
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -139,7 +139,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
 
             String productCode = writeVO.getProductCode();
             String deviceUUId = writeVO.getDeviceUUId();
-            String deviceId = tags.get("deviceId");
+            String deviceId = tags.get("device_id");
 
             //判断上报数据对应产品是否注册,如未注册则为非法
             Map<String,ProductMapVO> productMapList = dmpProductService.getProductMap();
@@ -175,6 +175,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceInfo.setServiceStatus(1);
                 dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
                 dmpDeviceService.save(dmpDeviceInfo);
+                dmpProductService.deleteDeviceCache();
 
                 deviceUUId = dmpDeviceInfo.getDeviceUuid();
 
@@ -186,7 +187,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
                 dmpDeviceStatusService.save(dmpDeviceStatus);
 
-                dmpProductService.deleteDeviceCache();
+                deviceMapList = dmpProductService.getDeviceMap();
             }else if(deviceMapList.containsKey(deviceId)){
                 LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
                 queryWrapper.eq(DmpDevice::getDeleteFlag,0)
@@ -207,6 +208,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     dmpDeviceInfo.setServiceStatus(1);
                     dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
                     dmpDeviceService.save(dmpDeviceInfo);
+                    dmpProductService.deleteDeviceCache();
 
                     deviceUUId = dmpDeviceInfo.getDeviceUuid();
 
@@ -218,7 +220,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
                     dmpDeviceStatusService.save(dmpDeviceStatus);
 
-                    dmpProductService.deleteDeviceCache();
+                    deviceMapList = dmpProductService.getDeviceMap();
                 }
             }
 

+ 4 - 10
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java

@@ -19,10 +19,7 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * @author zyj
@@ -39,12 +36,9 @@ public class Info implements MqttStrategy {
         Map<String, String> tags = new HashMap<>();
         Map<String, Object> fields = new HashMap<>();
         Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
-        String deviceId = map_data.get("device_id").toString();
-        String productCode = map_data.get("product_code").toString().toLowerCase();
+        String productCode = map_data.get("productCode").toString().toLowerCase();
         Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
 
-        String tableName = deviceId;
-
         Object tg = JSONObject.toJSONString(map_data.get("tags"));
         JSONObject tag = JSON.parseObject(tg.toString());
         for (String entry : tag.keySet()){
@@ -58,9 +52,9 @@ public class Info implements MqttStrategy {
         }
 
         DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceUUId(deviceId);
+        deviceDataWriteVO.setDeviceUUId("");
         deviceDataWriteVO.setProductCode(productCode);
-        deviceDataWriteVO.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
+        deviceDataWriteVO.setTimestamp(timestamp);
         deviceDataWriteVO.setTags(tags);
         deviceDataWriteVO.setMetrics(metrics);