瀏覽代碼

Merge branch 'data-zyj' of uskycloud/usky-data into master

gez 4 天之前
父節點
當前提交
2af2964b98

+ 1 - 1
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/config/mqtt/MqttInConfig.java

@@ -53,7 +53,7 @@ public class MqttInConfig {
         List<EgDeviceConfig> list = egDeviceConfigMapper.selectList(queryWrapper);
         String[] tops = new String[list.size()];
         for (int i = 0; i < list.size(); i++) {
-            tops[i] = "/"+list.get(i).getDeviceUuid()+"/control";
+            tops[i] = "/usky/devices/"+list.get(i).getDeviceUuid()+"/control";
         }
 
         String clientId = "gateway-eg-kat-mqtt-in-" + System.currentTimeMillis();

+ 3 - 2
data-gateway/data-gateway-eg-kat/src/main/java/com/usky/gateway/service/mqtt/control/control.java

@@ -44,10 +44,10 @@ public class control implements MqttStrategy {
 
     //处理下发命令消息,下发命令控制设备
     public String disposeMessage(MqttBaseVO mqttBaseVO) {
-        String[] topics = mqttBaseVO.getTopic().split("/");
-        String deviceUuid = topics[1];
+
         JSONObject obj_data = JSONObject.parseObject(mqttBaseVO.getData().toString());
         Integer commandId = Integer.parseInt(obj_data.get("id").toString());
+        String deviceUuid = obj_data.get("deviceUuid").toString();
         Object params = JSONObject.toJSONString(obj_data.get("params"));
         JSONObject params_data = JSON.parseObject(params.toString());
         String commandCode = params_data.getString("commandCode");
@@ -85,6 +85,7 @@ public class control implements MqttStrategy {
 
                 }
 
+                jsonObject.put("deviceUuid",deviceUuid);
                 jsonObject.put("timeStamp",System.currentTimeMillis());
                 jsonObject.put("id",commandId);
 

+ 10 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/domain/DmpDevice.java

@@ -137,5 +137,15 @@ public class DmpDevice implements Serializable {
      */
     private String latitude;
 
+    /**
+     * 设备所属类型(1、普通设备  2、网关设备  3、网关子设备)
+     */
+    private Integer categoryType;
+
+    /**
+     * 所属网关
+     */
+    private String gatewayUuid;
+
 
 }

+ 5 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/domain/DmpDeviceStatus.java

@@ -57,5 +57,10 @@ public class DmpDeviceStatus implements Serializable {
      */
     private String productCode;
 
+    /**
+     * 设备uuid
+     */
+    private String deviceUuid;
+
 
 }

+ 6 - 1
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/config/TsdbConfig.java

@@ -1,12 +1,15 @@
 package com.usky.transfer.service.config;
 
 import com.usky.common.core.util.StringUtils;
+import okhttp3.OkHttpClient;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.concurrent.TimeUnit;
+
 @Configuration
 public class TsdbConfig {
 
@@ -30,7 +33,9 @@ public class TsdbConfig {
         if (StringUtils.isEmpty(userName)) {
             influxDB = InfluxDBFactory.connect(influxDBUrl);
         } else {
-            influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
+            OkHttpClient.Builder client = new OkHttpClient.Builder();
+            client.readTimeout(300, TimeUnit.SECONDS);
+            influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password, client);
         }
         try {
 

+ 41 - 18
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/impl/QueryInfluxdbDataServiceImpl.java

@@ -63,13 +63,12 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
     public Map<String,Object> deviceControl(String productCode, String deviceUuid, String commandStr,Integer tenantId, Long userId, String userName){
         Map<String,Object> rec_map = new HashMap<>();
 
+        JSONObject dataJson = JSONObject.parseObject(commandStr);
 
         //存储下发设备控制命令到数据库表中
         DmpDeviceCommand command = new DmpDeviceCommand();
         command.setProductCode(productCode);
-        if(StringUtils.isNotBlank(deviceUuid)){
-            command.setDeviceUuid(deviceUuid);
-        }
+        command.setDeviceUuid(dataJson.get("deviceUuid").toString());
 
         command.setCommandContent(commandStr);
         command.setCreatedTime(LocalDateTime.now());
@@ -83,14 +82,13 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
         dmpDeviceCommandService.save(command);
         int commandId = command.getId();
 
-        JSONObject dataJson = JSONObject.parseObject(commandStr);
         dataJson.put("id",commandId);
 
         command.setCommandContent(dataJson.toJSONString());
         dmpDeviceCommandService.updateById(command);
         //推送下发设备控制mqtt
         if(StringUtils.isNotBlank(commandStr)){
-            String topic = "/"+deviceUuid+"/control";
+            String topic = "/usky/devices/"+deviceUuid+"/control";
             mqttGateway.sendToMqtt(topic,dataJson.toJSONString());
         }
 
@@ -144,6 +142,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
             String productCode = writeVO.getProductCode();
             String deviceUUId = writeVO.getDeviceUUId();
             String deviceId = tags.get("device_id");
+            log.info("sendDeviceDataToMQ "+deviceId+" start222");
 
             //判断上报数据对应产品是否注册,如未注册则为非法
             Map<String,ProductMapVO> productMapList = dmpProductService.getProductMap();
@@ -177,7 +176,12 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceInfo.setCreatedTime(LocalDateTime.now());
                 dmpDeviceInfo.setTenantId(productMapVO.getTenantId());
                 dmpDeviceInfo.setServiceStatus(1);
-                dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
+                if(StringUtils.isBlank(deviceUUId)){
+                    dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
+                }else{
+                    dmpDeviceInfo.setDeviceUuid(deviceUUId);
+                }
+
                 dmpDeviceService.save(dmpDeviceInfo);
                 dmpProductService.deleteDeviceCache(productCode);
 
@@ -189,6 +193,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                 dmpDeviceStatus.setDeviceStatus(2);
                 dmpDeviceStatus.setLastOfflineTime(LocalDateTime.now());
                 dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
+                dmpDeviceStatus.setDeviceUuid(deviceUUId);
                 dmpDeviceStatusService.save(dmpDeviceStatus);
 
                 deviceMapList = dmpProductService.getDeviceMap(productCode);
@@ -210,7 +215,11 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     dmpDeviceInfo.setCreatedTime(LocalDateTime.now());
                     dmpDeviceInfo.setTenantId(productMapVO.getTenantId());
                     dmpDeviceInfo.setServiceStatus(1);
-                    dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
+                    if(StringUtils.isBlank(deviceUUId)){
+                        dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
+                    }else{
+                        dmpDeviceInfo.setDeviceUuid(deviceUUId);
+                    }
                     dmpDeviceService.save(dmpDeviceInfo);
                     dmpProductService.deleteDeviceCache(productCode);
 
@@ -222,6 +231,7 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
                     dmpDeviceStatus.setDeviceStatus(2);
                     dmpDeviceStatus.setLastOfflineTime(LocalDateTime.now());
                     dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
+                    dmpDeviceStatus.setDeviceUuid(deviceUUId);
                     dmpDeviceStatusService.save(dmpDeviceStatus);
 
                     deviceMapList = dmpProductService.getDeviceMap(productCode);
@@ -242,8 +252,10 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
             dataInfo.setProductCode(productCode);
             dataInfo.setDeviceUUId(deviceUUId);
             dataInfo.setTimestamp(writeVO.getTimestamp());
+            log.info("推送tsdb "+JSONArray.toJSON(dataInfo).toString()+" start");
 
             myProducer.sendMessage("data-tsdb", JSONArray.toJSON(dataInfo).toString());
+            log.info("推送tsdb "+JSONArray.toJSON(dataInfo).toString()+" end");
         }
 
         rec_map.put("code",200);
@@ -257,8 +269,10 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
 
         String topic = mqttDeviceDataVO.getTopic();
         String[] topics = topic.split("/");
-        String productCode1 = topics[1];
-        String deviceId1 = topics[2];
+        //String productCode1 = topics[1];
+        //String deviceId1 = topics[2];
+        String deviceUuid = topics[3];
+
         String payload = mqttDeviceDataVO.getDeviceData();
 
         JSONObject deviceDataJson = JSONObject.parseObject(payload);
@@ -269,24 +283,33 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
         Object metric = JSONObject.toJSONString(deviceDataJson.get("metrics"));
         JSONObject metricJson = JSON.parseObject(metric.toString());
 
-        if(!productCode1.equals(productCode2) || !deviceId1.equals(deviceId2)){
-            throw new BusinessException("Topic和请求体中的产品编码、设备Id不一致,请修改");
-        }
+//        if(!productCode1.equals(productCode2) || !deviceId1.equals(deviceId2)){
+//            throw new BusinessException("Topic和请求体中的产品编码、设备Id不一致,请修改");
+//        }
 
         LambdaQueryWrapper<DmpProduct> lambdaQuery = Wrappers.lambdaQuery();
-        lambdaQuery.eq(DmpProduct::getProductCode,productCode1)
+        lambdaQuery.eq(DmpProduct::getProductCode,productCode2)
                 .eq(DmpProduct::getDeleteFlag,0);
         List<DmpProduct> dmpProductList = dmpProductService.list(lambdaQuery);
         if(dmpProductList.size() <= 0){
             throw new BusinessException("产品编码不存在,请修改");
         }
         LambdaQueryWrapper<DmpDevice> lambdaQuery2 = Wrappers.lambdaQuery();
-        lambdaQuery2.eq(DmpDevice::getDeviceId,deviceId1)
-                .eq(DmpDevice::getProductCode,productCode1)
+        lambdaQuery2.eq(DmpDevice::getDeviceUuid,deviceUuid)
+                .eq(DmpDevice::getProductCode,productCode2)
                 .eq(DmpDevice::getServiceStatus,2)
                 .eq(DmpDevice::getDeleteFlag,0);
         List<DmpDevice> dmpDeviceList = dmpDeviceService.list(lambdaQuery2);
         if(dmpDeviceList.size() <= 0){
+            throw new BusinessException("topic主题中的设备Uuid不存在或在本产品下不存在,请修改");
+        }
+        LambdaQueryWrapper<DmpDevice> lambdaQuery3 = Wrappers.lambdaQuery();
+        lambdaQuery3.eq(DmpDevice::getDeviceId,deviceId2)
+                .eq(DmpDevice::getProductCode,productCode2)
+                .eq(DmpDevice::getServiceStatus,2)
+                .eq(DmpDevice::getDeleteFlag,0);
+        List<DmpDevice> dmpDeviceList1 = dmpDeviceService.list(lambdaQuery3);
+        if(dmpDeviceList1.size() <= 0){
             throw new BusinessException("设备Id不存在或在本产品下不存在,请修改");
         }
 
@@ -297,10 +320,10 @@ public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInflu
             }
         }
         int count1 = metricList.size();
-        LambdaQueryWrapper<DmpProductAttribute> lambdaQuery3 = Wrappers.lambdaQuery();
-        lambdaQuery3.eq(DmpProductAttribute::getProductId,dmpProductList.get(0).getId())
+        LambdaQueryWrapper<DmpProductAttribute> lambdaQuery4 = Wrappers.lambdaQuery();
+        lambdaQuery4.eq(DmpProductAttribute::getProductId,dmpProductList.get(0).getId())
                 .in(DmpProductAttribute::getAttributeCode,metricList);
-        int count2 = dmpProductAttributeService.count(lambdaQuery3);
+        int count2 = dmpProductAttributeService.count(lambdaQuery4);
         if(count1 > count2){
             throw new BusinessException("设备属性编码不存在,请检查");
         }

+ 7 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/listener/MqttListener.java

@@ -10,9 +10,12 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.stereotype.Component;
 
+import java.util.Objects;
+
 /**
  * @author yq
  * @date 2021/11/3 8:13
@@ -36,6 +39,10 @@ public class MqttListener {
     @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
     public MessageHandler handler() {
         return message -> {
+            if (message.getPayload() == null || message.getPayload().toString().isEmpty()) {
+                System.out.println("Received empty message, skipping processing.");
+                return;
+            }
             String payload = message.getPayload().toString();
             //进行接口推送
 //            String[] infoCode = TopListener.DEVICE_INFO.getCode().split("/");

+ 34 - 4
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/add/Add.java

@@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
 import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 @Slf4j
 @Service("add")
@@ -37,10 +38,13 @@ public class Add implements MqttStrategy {
     private DmpDeviceStatusService dmpDeviceStatusService;
 
     public String disposeMessage(MqttBaseVO mqttBaseVO){
+        String[] topics = mqttBaseVO.getTopic().toString().split("/");
+        String topic_deviceUUID = topics[3];
         String deviceInfoStr = mqttBaseVO.getData().toString();
         JSONObject deviceInfoJson = JSONObject.parseObject(deviceInfoStr);
         String productCode = deviceInfoJson.get("productCode").toString();
         String deviceId = deviceInfoJson.get("deviceId").toString();
+        Integer categoryType = Integer.valueOf(deviceInfoJson.get("categoryType").toString());
 
         //判断上报数据对应产品是否注册,如未注册则为非法
         Map<String, ProductMapVO> productMapList = dmpProductService.getProductMap();
@@ -64,7 +68,15 @@ public class Add implements MqttStrategy {
             DmpDevice dmpDeviceInfo = new DmpDevice();
             ProductMapVO productMapVO = productMapList.get(productCode);
             dmpDeviceInfo.setDeviceId(deviceId);
-            dmpDeviceInfo.setDeviceName("");
+            if(Objects.nonNull(deviceInfoJson.get("deviceName"))){
+                dmpDeviceInfo.setDeviceName(deviceInfoJson.get("deviceName").toString());
+            }
+            if(Objects.nonNull(deviceInfoJson.get("simCode"))){
+                dmpDeviceInfo.setSimCode(deviceInfoJson.get("simCode").toString());
+            }
+            if(Objects.nonNull(deviceInfoJson.get("installAddress"))){
+                dmpDeviceInfo.setInstallAddress(deviceInfoJson.get("installAddress").toString());
+            }
             dmpDeviceInfo.setDeviceType(productMapVO.getDeviceType());
             dmpDeviceInfo.setProductId(productMapVO.getProductId());
             dmpDeviceInfo.setProductCode(productCode);
@@ -72,7 +84,11 @@ public class Add implements MqttStrategy {
             dmpDeviceInfo.setCreatedTime(LocalDateTime.now());
             dmpDeviceInfo.setTenantId(productMapVO.getTenantId());
             dmpDeviceInfo.setServiceStatus(1);
-            dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
+            dmpDeviceInfo.setDeviceUuid(deviceInfoJson.get("deviceUuid").toString());
+            dmpDeviceInfo.setCategoryType(categoryType);
+            if(categoryType == 3){
+                dmpDeviceInfo.setGatewayUuid(topic_deviceUUID);
+            }
             dmpDeviceService.save(dmpDeviceInfo);
 
             DmpDeviceStatus dmpDeviceStatus = new DmpDeviceStatus();
@@ -81,6 +97,7 @@ public class Add implements MqttStrategy {
             dmpDeviceStatus.setDeviceStatus(2);
             dmpDeviceStatus.setLastOfflineTime(LocalDateTime.now());
             dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
+            dmpDeviceStatus.setDeviceUuid(dmpDeviceInfo.getDeviceUuid());
             dmpDeviceStatusService.save(dmpDeviceStatus);
 
             dmpProductService.deleteDeviceCache(productCode);
@@ -94,7 +111,15 @@ public class Add implements MqttStrategy {
                 DmpDevice dmpDeviceInfo = new DmpDevice();
                 ProductMapVO productMapVO = productMapList.get(productCode);
                 dmpDeviceInfo.setDeviceId(deviceId);
-                dmpDeviceInfo.setDeviceName("");
+                if(Objects.nonNull(deviceInfoJson.get("deviceName"))){
+                    dmpDeviceInfo.setDeviceName(deviceInfoJson.get("deviceName").toString());
+                }
+                if(Objects.nonNull(deviceInfoJson.get("simCode"))){
+                    dmpDeviceInfo.setSimCode(deviceInfoJson.get("simCode").toString());
+                }
+                if(Objects.nonNull(deviceInfoJson.get("installAddress"))){
+                    dmpDeviceInfo.setInstallAddress(deviceInfoJson.get("installAddress").toString());
+                }
                 dmpDeviceInfo.setDeviceType(productMapVO.getDeviceType());
                 dmpDeviceInfo.setProductId(productMapVO.getProductId());
                 dmpDeviceInfo.setProductCode(productCode);
@@ -102,7 +127,11 @@ public class Add implements MqttStrategy {
                 dmpDeviceInfo.setCreatedTime(LocalDateTime.now());
                 dmpDeviceInfo.setTenantId(productMapVO.getTenantId());
                 dmpDeviceInfo.setServiceStatus(1);
-                dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
+                dmpDeviceInfo.setDeviceUuid(deviceInfoJson.get("deviceUuid").toString());
+                dmpDeviceInfo.setCategoryType(categoryType);
+                if(categoryType == 3){
+                    dmpDeviceInfo.setGatewayUuid(topic_deviceUUID);
+                }
                 dmpDeviceService.save(dmpDeviceInfo);
 
                 DmpDeviceStatus dmpDeviceStatus = new DmpDeviceStatus();
@@ -111,6 +140,7 @@ public class Add implements MqttStrategy {
                 dmpDeviceStatus.setDeviceStatus(2);
                 dmpDeviceStatus.setLastOfflineTime(LocalDateTime.now());
                 dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
+                dmpDeviceStatus.setDeviceUuid(dmpDeviceInfo.getDeviceUuid());
                 dmpDeviceStatusService.save(dmpDeviceStatus);
 
                 dmpProductService.deleteDeviceCache(productCode);

+ 28 - 21
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/datacollector/DataCollector.java

@@ -8,6 +8,7 @@ import com.usky.transfer.service.QueryInfluxdbDataService;
 import com.usky.transfer.service.mqtt.MqttStrategy;
 import com.usky.transfer.service.rocketmq.MyProducer;
 import com.usky.transfer.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -19,6 +20,7 @@ import java.util.Map;
  * @author zyj
  * @date 2022/12/6 15:07
  */
+@Slf4j
 @Service("dataCollector")
 public class DataCollector implements MqttStrategy {
     @Resource
@@ -27,29 +29,34 @@ public class DataCollector implements MqttStrategy {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     public String disposeMessage(MqttBaseVO mqttBaseVO) {
-        Map<String, String> tags = new HashMap<>();
-        Map<String, Object> fields = new HashMap<>();
-        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
-        String productCode = map_data.get("product_id").toString().toLowerCase();
-        long timestamp = Long.valueOf(map_data.get("timestamp").toString())*1000L+1L;
-        String deviceId = map_data.get("device_id").toString();
-
-        tags.put("device_id",deviceId);
-
-        Object met = JSONObject.toJSONString(map_data.get("metrics"));
-        JSONObject metrics = JSON.parseObject(met.toString());
-        for(String entry : metrics.keySet()){
-            fields.put(entry.toLowerCase(),metrics.get(entry));
-        }
+        try {
+            Map<String, String> tags = new HashMap<>();
+            Map<String, Object> fields = new HashMap<>();
+            Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+            String productCode = map_data.get("product_id").toString().toLowerCase();
+            long timestamp = Long.valueOf(map_data.get("timestamp").toString())*1000L+1L;
+            String deviceId = map_data.get("device_id").toString();
+
+            tags.put("device_id",deviceId);
+            log.info("disposeMessage "+deviceId+" start111");
 
-        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceUUId("");
-        deviceDataWriteVO.setProductCode(productCode);
-        deviceDataWriteVO.setTimestamp(timestamp);
-        deviceDataWriteVO.setTags(tags);
-        deviceDataWriteVO.setMetrics(metrics);
+            Object met = JSONObject.toJSONString(map_data.get("metrics"));
+            JSONObject metrics = JSON.parseObject(met.toString());
+            for(String entry : metrics.keySet()){
+                fields.put(entry.toLowerCase(),metrics.get(entry));
+            }
 
-        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+            DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+            deviceDataWriteVO.setDeviceUUId("");
+            deviceDataWriteVO.setProductCode(productCode);
+            deviceDataWriteVO.setTimestamp(timestamp);
+            deviceDataWriteVO.setTags(tags);
+            deviceDataWriteVO.setMetrics(metrics);
+
+            queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
 
         return null;
     }

+ 31 - 22
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java

@@ -13,6 +13,7 @@ import com.usky.transfer.service.*;
 import com.usky.transfer.service.mqtt.MqttStrategy;
 import com.usky.transfer.service.rocketmq.MyProducer;
 import com.usky.transfer.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -25,6 +26,7 @@ import java.util.*;
  * @author zyj
  * @date 2022/12/6 15:07
  */
+@Slf4j
 @Service("info")
 public class Info implements MqttStrategy {
     @Resource
@@ -33,33 +35,40 @@ public class Info implements MqttStrategy {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     public String disposeMessage(MqttBaseVO mqttBaseVO) {
-        Map<String, String> tags = new HashMap<>();
-        Map<String, Object> fields = new HashMap<>();
-        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
-        String productCode = map_data.get("productCode").toString().toLowerCase();
+        try {
+            String[] topics = mqttBaseVO.getTopic().toString().split("/");
+            String topic_deviceUUID = topics[3];
+            Map<String, String> tags = new HashMap<>();
+            Map<String, Object> fields = new HashMap<>();
+            Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+            String productCode = map_data.get("productCode").toString().toLowerCase();
 
-        long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+            long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+            String deviceUuid = map_data.get("deviceUuid").toString();
 
-        Object tg = JSONObject.toJSONString(map_data.get("tags"));
-        JSONObject tag = JSON.parseObject(tg.toString());
-        for (String entry : tag.keySet()){
-            tags.put(entry.toLowerCase(),tag.get(entry).toString());
-        }
+            Object tg = JSONObject.toJSONString(map_data.get("tags"));
+            JSONObject tag = JSON.parseObject(tg.toString());
+            for (String entry : tag.keySet()){
+                tags.put(entry.toLowerCase(),tag.get(entry).toString());
+            }
 
-        Object met = JSONObject.toJSONString(map_data.get("metrics"));
-        JSONObject metrics = JSON.parseObject(met.toString());
-        for(String entry : metrics.keySet()){
-            fields.put(entry.toLowerCase(),metrics.get(entry));
-        }
+            Object met = JSONObject.toJSONString(map_data.get("metrics"));
+            JSONObject metrics = JSON.parseObject(met.toString());
+            for(String entry : metrics.keySet()){
+                fields.put(entry.toLowerCase(),metrics.get(entry));
+            }
 
-        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceUUId("");
-        deviceDataWriteVO.setProductCode(productCode);
-        deviceDataWriteVO.setTimestamp(timestamp);
-        deviceDataWriteVO.setTags(tags);
-        deviceDataWriteVO.setMetrics(metrics);
+            DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+            deviceDataWriteVO.setDeviceUUId(deviceUuid);
+            deviceDataWriteVO.setProductCode(productCode);
+            deviceDataWriteVO.setTimestamp(timestamp);
+            deviceDataWriteVO.setTags(tags);
+            deviceDataWriteVO.setMetrics(metrics);
 
-        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+            queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
 
         return null;
     }

+ 2 - 0
data-transfer/data-transfer-biz/src/main/resources/mapper/transfer/DmpDeviceMapper.xml

@@ -25,6 +25,8 @@
         <result column="service_status" property="serviceStatus" />
         <result column="product_code" property="productCode" />
         <result column="device_uuid" property="deviceUuid" />
+        <result column="category_type" property="categoryType" />
+        <result column="gateway_uuid" property="gatewayUuid" />
     </resultMap>
 
 </mapper>

+ 1 - 1
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/config/TsdbConfig.java

@@ -35,7 +35,7 @@ public class TsdbConfig {
         } else {
             OkHttpClient.Builder client = new OkHttpClient.Builder();
             client.readTimeout(300, TimeUnit.SECONDS);
-            influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password,client);
+            influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password, client);
         }
         try {
 

+ 14 - 4
data-tsdb-proxy/data-tsdb-proxy-biz/src/main/java/com/usky/demo/service/utils/TsdbUtils.java

@@ -17,6 +17,8 @@ import org.springframework.stereotype.Component;
 import javax.annotation.Resource;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -171,7 +173,7 @@ public class TsdbUtils extends InfluxDbUtils {
                             if(columns.get(i).equals("time")){
                                 Date time = null;
                                 try {
-                                    time = simpleDateFormat1.parse(value.get(i).toString());
+                                    time = simpleDateFormat1.parse(padMilliseconds(value.get(i).toString()));
                                 } catch (ParseException e) {
                                     e.printStackTrace();
                                 }
@@ -180,7 +182,7 @@ public class TsdbUtils extends InfluxDbUtils {
                             } else if(columns.get(i).equals("timestamp")){
                                 Date time = null;
                                 try {
-                                    time = simpleDateFormat1.parse(value.get(i).toString());
+                                    time = simpleDateFormat1.parse(padMilliseconds(value.get(i).toString()));
                                 } catch (ParseException e) {
                                     e.printStackTrace();
                                 }
@@ -199,6 +201,14 @@ public class TsdbUtils extends InfluxDbUtils {
         return results;
     }
 
+    private static String padMilliseconds(String timeStr) {
+        // 检查是否缺少毫秒部分(格式: YYYY-MM-DDTHH:MM:SSZ)
+        if (timeStr.matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}[+-]\\d{2}:\\d{2}")) {
+            return timeStr.replace("+08:00", ".000+08:00");
+        }
+        return timeStr; // 已有毫秒部分,直接返回
+    }
+
     /**
      * 查询,返回Map集合
      *
@@ -224,7 +234,7 @@ public class TsdbUtils extends InfluxDbUtils {
                                 if(!value.get(i).equals("1970-01-01T00:00:00Z")){
                                     Date time = null;
                                     try {
-                                        time = simpleDateFormat1.parse(value.get(i).toString());
+                                        time = simpleDateFormat1.parse(padMilliseconds(value.get(i).toString()));
                                     } catch (ParseException e) {
                                         e.printStackTrace();
                                     }
@@ -235,7 +245,7 @@ public class TsdbUtils extends InfluxDbUtils {
                             } else if(columns.get(i).equals("timestamp")){
                                 Date time = null;
                                 try {
-                                    time = simpleDateFormat1.parse(value.get(i).toString());
+                                    time = simpleDateFormat1.parse(padMilliseconds(value.get(i).toString()));
                                 } catch (ParseException e) {
                                     e.printStackTrace();
                                 }