Explorar o código

优化data-transfer服务模块中订阅和处理mqtt消息逻辑,接收消息体处判空处理,处理mqtt消息增加try catch捕获异常 将报错信息记录到error日志中,防止异常向上抛出导致程序中断

james hai 1 mes
pai
achega
7bb26102ae

+ 7 - 0
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/listener/MqttListener.java

@@ -10,9 +10,12 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.stereotype.Component;
 
+import java.util.Objects;
+
 /**
  * @author yq
  * @date 2021/11/3 8:13
@@ -36,6 +39,10 @@ public class MqttListener {
     @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
     public MessageHandler handler() {
         return message -> {
+            if (message.getPayload() == null || message.getPayload().toString().isEmpty()) {
+                System.out.println("Received empty message, skipping processing.");
+                return;
+            }
             String payload = message.getPayload().toString();
             //进行接口推送
 //            String[] infoCode = TopListener.DEVICE_INFO.getCode().split("/");

+ 26 - 22
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/datacollector/DataCollector.java

@@ -29,30 +29,34 @@ public class DataCollector implements MqttStrategy {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     public String disposeMessage(MqttBaseVO mqttBaseVO) {
-        Map<String, String> tags = new HashMap<>();
-        Map<String, Object> fields = new HashMap<>();
-        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
-        String productCode = map_data.get("product_id").toString().toLowerCase();
-        long timestamp = Long.valueOf(map_data.get("timestamp").toString())*1000L+1L;
-        String deviceId = map_data.get("device_id").toString();
-
-        tags.put("device_id",deviceId);
-        log.info("disposeMessage "+deviceId+" start111");
-
-        Object met = JSONObject.toJSONString(map_data.get("metrics"));
-        JSONObject metrics = JSON.parseObject(met.toString());
-        for(String entry : metrics.keySet()){
-            fields.put(entry.toLowerCase(),metrics.get(entry));
-        }
+        try {
+            Map<String, String> tags = new HashMap<>();
+            Map<String, Object> fields = new HashMap<>();
+            Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+            String productCode = map_data.get("product_id").toString().toLowerCase();
+            long timestamp = Long.valueOf(map_data.get("timestamp").toString())*1000L+1L;
+            String deviceId = map_data.get("device_id").toString();
+
+            tags.put("device_id",deviceId);
+            log.info("disposeMessage "+deviceId+" start111");
 
-        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceUUId("");
-        deviceDataWriteVO.setProductCode(productCode);
-        deviceDataWriteVO.setTimestamp(timestamp);
-        deviceDataWriteVO.setTags(tags);
-        deviceDataWriteVO.setMetrics(metrics);
+            Object met = JSONObject.toJSONString(map_data.get("metrics"));
+            JSONObject metrics = JSON.parseObject(met.toString());
+            for(String entry : metrics.keySet()){
+                fields.put(entry.toLowerCase(),metrics.get(entry));
+            }
 
-        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+            DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+            deviceDataWriteVO.setDeviceUUId("");
+            deviceDataWriteVO.setProductCode(productCode);
+            deviceDataWriteVO.setTimestamp(timestamp);
+            deviceDataWriteVO.setTags(tags);
+            deviceDataWriteVO.setMetrics(metrics);
+
+            queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
 
         return null;
     }

+ 29 - 22
data-transfer/data-transfer-biz/src/main/java/com/usky/transfer/service/mqtt/info/Info.java

@@ -13,6 +13,8 @@ import com.usky.transfer.service.*;
 import com.usky.transfer.service.mqtt.MqttStrategy;
 import com.usky.transfer.service.rocketmq.MyProducer;
 import com.usky.transfer.service.vo.MqttBaseVO;
+import jdk.internal.org.objectweb.asm.tree.TryCatchBlockNode;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -25,6 +27,7 @@ import java.util.*;
  * @author zyj
  * @date 2022/12/6 15:07
  */
+@Slf4j
 @Service("info")
 public class Info implements MqttStrategy {
     @Resource
@@ -33,33 +36,37 @@ public class Info implements MqttStrategy {
     private QueryInfluxdbDataService queryInfluxdbDataService;
 
     public String disposeMessage(MqttBaseVO mqttBaseVO) {
-        Map<String, String> tags = new HashMap<>();
-        Map<String, Object> fields = new HashMap<>();
-        Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
-        String productCode = map_data.get("productCode").toString().toLowerCase();
+        try {
+            Map<String, String> tags = new HashMap<>();
+            Map<String, Object> fields = new HashMap<>();
+            Map map_data = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+            String productCode = map_data.get("productCode").toString().toLowerCase();
 
-        long timestamp = Long.valueOf(map_data.get("timestamp").toString());
+            long timestamp = Long.valueOf(map_data.get("timestamp").toString());
 
-        Object tg = JSONObject.toJSONString(map_data.get("tags"));
-        JSONObject tag = JSON.parseObject(tg.toString());
-        for (String entry : tag.keySet()){
-            tags.put(entry.toLowerCase(),tag.get(entry).toString());
-        }
+            Object tg = JSONObject.toJSONString(map_data.get("tags"));
+            JSONObject tag = JSON.parseObject(tg.toString());
+            for (String entry : tag.keySet()){
+                tags.put(entry.toLowerCase(),tag.get(entry).toString());
+            }
 
-        Object met = JSONObject.toJSONString(map_data.get("metrics"));
-        JSONObject metrics = JSON.parseObject(met.toString());
-        for(String entry : metrics.keySet()){
-            fields.put(entry.toLowerCase(),metrics.get(entry));
-        }
+            Object met = JSONObject.toJSONString(map_data.get("metrics"));
+            JSONObject metrics = JSON.parseObject(met.toString());
+            for(String entry : metrics.keySet()){
+                fields.put(entry.toLowerCase(),metrics.get(entry));
+            }
 
-        DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
-        deviceDataWriteVO.setDeviceUUId("");
-        deviceDataWriteVO.setProductCode(productCode);
-        deviceDataWriteVO.setTimestamp(timestamp);
-        deviceDataWriteVO.setTags(tags);
-        deviceDataWriteVO.setMetrics(metrics);
+            DeviceDataWriteVO deviceDataWriteVO = new DeviceDataWriteVO();
+            deviceDataWriteVO.setDeviceUUId("");
+            deviceDataWriteVO.setProductCode(productCode);
+            deviceDataWriteVO.setTimestamp(timestamp);
+            deviceDataWriteVO.setTags(tags);
+            deviceDataWriteVO.setMetrics(metrics);
 
-        queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+            queryInfluxdbDataService.sendDeviceDataToMQ(deviceDataWriteVO);
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
 
         return null;
     }