Pārlūkot izejas kodu

新增订阅网关设备注册mqtt、网关设备数据mqtt和消防物联网2.0设备数据处理逻辑

james 3 mēneši atpakaļ
vecāks
revīzija
598e5004a4

+ 2 - 2
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/DmpProductService.java

@@ -17,8 +17,8 @@ import java.util.Map;
  */
 public interface DmpProductService extends CrudService<DmpProduct> {
     void deleteProductCache();
-    void deleteDeviceCache();
+    void deleteDeviceCache(String productCode);
 
     Map<String, ProductMapVO> getProductMap();
-    Map<String, DeviceMapVO> getDeviceMap();
+    Map<String, DeviceMapVO> getDeviceMap(String productCode);
 }

+ 5 - 6
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/DmpProductServiceImpl.java

@@ -43,8 +43,8 @@ public class DmpProductServiceImpl extends AbstractCrudService<DmpProductMapper,
     }
 
     //清除设备缓存
-    @CacheEvict(cacheNames = "deviceList")
-    public void deleteDeviceCache(){
+    @CacheEvict(cacheNames = "deviceList",key = "#productCode")
+    public void deleteDeviceCache(String productCode){
         System.out.println("deleteDeviceCache");
     }
 
@@ -53,7 +53,6 @@ public class DmpProductServiceImpl extends AbstractCrudService<DmpProductMapper,
         Map<String,ProductMapVO> productMap = new HashMap<>();
         LambdaQueryWrapper<DmpProduct> queryWrapper = Wrappers.lambdaQuery();
         queryWrapper.eq(DmpProduct::getDeleteFlag,0)
-                .eq(DmpProduct::getTenantId, SecurityUtils.getTenantId())
                 .orderByDesc(DmpProduct::getId);
         List<DmpProduct> list = this.list(queryWrapper);
         if(CollectionUtils.isNotEmpty(list)){
@@ -74,12 +73,12 @@ public class DmpProductServiceImpl extends AbstractCrudService<DmpProductMapper,
         return productMap;
     }
 
-    @Cacheable(cacheNames = "deviceList",sync = true)
-    public Map<String, DeviceMapVO> getDeviceMap(){
+    @Cacheable(cacheNames = "deviceList",key = "#productCode",sync = true)
+    public Map<String, DeviceMapVO> getDeviceMap(String productCode){
         Map<String,DeviceMapVO> deviceMap = new HashMap<>();
         LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
         queryWrapper.eq(DmpDevice::getDeleteFlag,0)
-                .eq(DmpDevice::getTenantId,SecurityUtils.getTenantId())
+                .eq(DmpDevice::getProductCode,productCode)
                 .orderByDesc(DmpDevice::getId);
         List<DmpDevice> list = dmpDeviceService.list(queryWrapper);
         if(CollectionUtils.isNotEmpty(list)){

+ 6 - 5
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -160,7 +160,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
             }
 
             //判断上报数据设备是否已注册(要判断注册过的设备是不是属于本产品的),未注册自动注册
-            Map<String,DeviceMapVO> deviceMapList = dmpProductService.getDeviceMap();
+            Map<String,DeviceMapVO> deviceMapList = dmpProductService.getDeviceMap(productCode);
             if(!deviceMapList.containsKey(deviceId)){
                 DmpDevice dmpDeviceInfo = new DmpDevice();
                 ProductMapVO productMapVO = productMapList.get(productCode);
@@ -175,7 +175,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceInfo.setServiceStatus(1);
                 dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
                 dmpDeviceService.save(dmpDeviceInfo);
-                dmpProductService.deleteDeviceCache();
+                dmpProductService.deleteDeviceCache(productCode);
 
                 deviceUUId = dmpDeviceInfo.getDeviceUuid();
 
@@ -187,7 +187,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
                 dmpDeviceStatusService.save(dmpDeviceStatus);
 
-                deviceMapList = dmpProductService.getDeviceMap();
+                deviceMapList = dmpProductService.getDeviceMap(productCode);
             }else if(deviceMapList.containsKey(deviceId)){
                 LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
                 queryWrapper.eq(DmpDevice::getDeleteFlag,0)
@@ -208,7 +208,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     dmpDeviceInfo.setServiceStatus(1);
                     dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
                     dmpDeviceService.save(dmpDeviceInfo);
-                    dmpProductService.deleteDeviceCache();
+                    dmpProductService.deleteDeviceCache(productCode);
 
                     deviceUUId = dmpDeviceInfo.getDeviceUuid();
 
@@ -220,7 +220,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
                     dmpDeviceStatusService.save(dmpDeviceStatus);
 
-                    deviceMapList = dmpProductService.getDeviceMap();
+                    deviceMapList = dmpProductService.getDeviceMap(productCode);
                 }
             }
 
@@ -230,6 +230,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     String deviceId1 = map.getKey();
                     if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
                         deviceUUId = map.getValue().getDeviceUuid();
+                        break;
                     }
                 }
             }

+ 4 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/listener/MqttListener.java

@@ -56,6 +56,10 @@ public class MqttListener {
                     mqttBaseVO.setDescribe("add");
                     mqttBaseVO.setData(payload);
                 }
+                else if(topic.indexOf("data-collector") != -1){
+                    mqttBaseVO.setDescribe("dataCollector");
+                    mqttBaseVO.setData(payload);
+                }
                 //统一处理数据
                 simpleContext.getResource(mqttBaseVO);
             }

+ 3 - 3
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/add/Add.java

@@ -59,7 +59,7 @@ public class Add implements MqttStrategy {
         }
 
         //判断上报数据设备是否已注册(要判断注册过的设备是不是属于本产品的),未注册自动注册
-        Map<String, DeviceMapVO> deviceMapList = dmpProductService.getDeviceMap();
+        Map<String, DeviceMapVO> deviceMapList = dmpProductService.getDeviceMap(productCode);
         if(!deviceMapList.containsKey(deviceId)){
             DmpDevice dmpDeviceInfo = new DmpDevice();
             ProductMapVO productMapVO = productMapList.get(productCode);
@@ -83,7 +83,7 @@ public class Add implements MqttStrategy {
             dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
             dmpDeviceStatusService.save(dmpDeviceStatus);
 
-            dmpProductService.deleteDeviceCache();
+            dmpProductService.deleteDeviceCache(productCode);
         }else if(deviceMapList.containsKey(deviceId)){
             LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
             queryWrapper.eq(DmpDevice::getDeleteFlag,0)
@@ -113,7 +113,7 @@ public class Add implements MqttStrategy {
                 dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
                 dmpDeviceStatusService.save(dmpDeviceStatus);
 
-                dmpProductService.deleteDeviceCache();
+                dmpProductService.deleteDeviceCache(productCode);
             }
         }
 

+ 58 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/datacollector/DataCollector.java

@@ -0,0 +1,58 @@
+package com.usky.transfer.service.mqtt.datacollector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.transfer.domain.DeviceDataWriteVO;
+import com.usky.transfer.service.QueryInfluxdbDataService;
+import com.usky.transfer.service.mqtt.MqttStrategy;
+import com.usky.transfer.service.rocketmq.MyProducer;
+import com.usky.transfer.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author zyj
+ * @date 2022/12/6 15:07
+ */
+@Service("dataCollector")
+public class DataCollector implements MqttStrategy {
+    @Resource
+    private MyProducer myProducer;
+    @Autowired
+    private QueryInfluxdbDataService queryInfluxdbDataService;
+
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+        Map<String, String> tags = new HashMap<>();
+        Map<String, Object> fields = new HashMap<>();
+        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+        String productCode = map_data.get("product_id").toString().toLowerCase();
+        Long timestamp = Long.valueOf(map_data.get("timestamp").toString()+"000");
+        String deviceId = map_data.get("device_id").toString();
+
+        tags.put("device_id",deviceId);
+
+        Object met = JSONObject.toJSONString(map_data.get("metrics"));
+        JSONObject metrics = JSON.parseObject(met.toString());
+        for(String entry : metrics.keySet()){
+            fields.put(entry.toLowerCase(),metrics.get(entry));
+        }
+
+        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+        deviceDataWriteVO.setDeviceUUId("");
+        deviceDataWriteVO.setProductCode(productCode);
+        deviceDataWriteVO.setTimestamp(timestamp);
+        deviceDataWriteVO.setTags(tags);
+        deviceDataWriteVO.setMetrics(metrics);
+
+        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+
+        return null;
+    }
+
+
+}

+ 1 - 1
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java

@@ -37,7 +37,7 @@ public class Info implements MqttStrategy {
         Map<String, Object> fields = new HashMap<>();
         Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
         String productCode = map_data.get("productCode").toString().toLowerCase();
-        Long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+        Long timestamp = Long.valueOf(map_data.get("timestamp").toString()+"000");
 
         Object tg = JSONObject.toJSONString(map_data.get("tags"));
         JSONObject tag = JSON.parseObject(tg.toString());