Просмотр исходного кода

1、调整service-tsdb服务中对外查询实时和历史api接口封装体;
2、优化service-iot服务中的更新设备列表状态接口查询时序数据库逻辑;
3、解决service-rule服务调用设备实时数据和历史数据两个方法报错的问题;

james 3 дней назад
Родитель
Сommit
a0df8c4b14

+ 2 - 1
service-iot/service-iot-biz/src/main/java/com/usky/iot/service/impl/DmpDeviceInfoServiceImpl.java

@@ -611,7 +611,8 @@ public class DmpDeviceInfoServiceImpl extends AbstractCrudService<DmpDeviceInfoM
 
     @Override
     public void updateDeviceStatus(LastInnerQueryVO queryVO) {
-        List<LastInnerResultVO> list = remoteTsdbProxyService.queryLastDeviceData(queryVO);
+        ApiResult<List<LastInnerResultVO>> lastApi = remoteTsdbProxyService.queryLastDeviceData(queryVO);
+        List<LastInnerResultVO> list = lastApi != null ? lastApi.getData() : null;
         if (CollectionUtils.isNotEmpty(list)) {
             for (int i = 0; i < list.size(); i++) {
                 if (Objects.nonNull(list.get(i).getMetrics())) {

+ 5 - 1
service-rule/service-rule-biz/src/main/java/com/usky/rule/jobs/ConsumptionJob.java

@@ -1,6 +1,7 @@
 package com.usky.rule.jobs;
 
 
+import com.usky.common.core.bean.ApiResult;
 import com.usky.common.security.utils.SecurityUtils;
 import com.usky.demo.RemoteTsdbProxyService;
 import com.usky.demo.domain.HistorysInnerRequestVO;
@@ -106,7 +107,10 @@ public class ConsumptionJob implements Job {
                                 requestVO.setMetrics(Collections.singletonList(identifier));
                                 requestVO.setStartTime(times[0].toString());
                                 requestVO.setEndTime(times[1].toString());
-                                List<HistorysInnerResultVO> result = remoteTsdbProxyService.queryHistoryDeviceData(requestVO);
+                                ApiResult<List<HistorysInnerResultVO>> historyApi = remoteTsdbProxyService.queryHistoryDeviceData(requestVO);
+                                List<HistorysInnerResultVO> result = historyApi != null && historyApi.getData() != null
+                                        ? historyApi.getData()
+                                        : Collections.emptyList();
                                 List<DataPointVO> dataPointVOList = findDataPointsForMetric(result, device.getDeviceUuid(), identifier);
                                 if (dataPointVOList.isEmpty()) {
                                     boolConstraintExp.append(false);

+ 16 - 3
service-rule/service-rule-biz/src/main/java/com/usky/rule/subscribe/TriggerDeviceUtil.java

@@ -2,6 +2,7 @@ package com.usky.rule.subscribe;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.usky.common.core.bean.ApiResult;
 import com.usky.common.core.util.JsonUtils;
 import com.usky.common.security.utils.SecurityUtils;
 import com.usky.demo.RemoteTsdbProxyService;
@@ -380,15 +381,27 @@ public class TriggerDeviceUtil {
                     LastInnerQueryVO lastInnerQueryVO = new LastInnerQueryVO();
                     lastInnerQueryVO.setDeviceuuid(Collections.singletonList(deviceUuid));
                     lastInnerQueryVO.setMetrics(Collections.singletonList(deviceIdentifier));
-                    List<LastInnerResultVO> currValueList = remoteTsdbProxyService.queryLastDeviceData(lastInnerQueryVO);
+                    ApiResult<List<LastInnerResultVO>> lastApi = remoteTsdbProxyService.queryLastDeviceData(lastInnerQueryVO);
+                    List<LastInnerResultVO> currValueList = lastApi != null && lastApi.getData() != null
+                            ? lastApi.getData()
+                            : Collections.emptyList();
+                    String targetKey = lastInnerQueryVO.getMetrics().get(0);
+
                     BigDecimal currValue = currValueList.stream()
                             .filter(Objects::nonNull)
                             .map(LastInnerResultVO::getMetrics)
                             .filter(Objects::nonNull)
-                            .flatMap(metrics -> metrics.values().stream())
+                            .map(metrics -> metrics.get(targetKey))
                             .filter(Objects::nonNull)
                             .map(String::valueOf)
-                            .map(BigDecimal::new)
+                            .map(val -> {
+                                try {
+                                    return new BigDecimal(val);
+                                } catch (Exception e) {
+                                    return null;
+                                }
+                            })
+                            .filter(Objects::nonNull)
                             .findFirst()
                             .orElse(null);
                     if (currValue == null) {

+ 2 - 2
service-transfer/service-transfer-biz/src/main/java/com/usky/transfer/controller/web/QueryDeviceDataController.java

@@ -114,7 +114,7 @@ public class QueryDeviceDataController {
         }
         reqVO.setDeviceuuid(deviceUUIds);
 
-        return ApiResult.success(remoteTsdbProxyService.queryLastDeviceData(reqVO));
+        return remoteTsdbProxyService.queryLastDeviceData(reqVO);
     }
 
 //    /**
@@ -180,7 +180,7 @@ public class QueryDeviceDataController {
         reqVO.setStartTime(requestVO.getStartTime());
         reqVO.setEndTime(requestVO.getEndTime());
 
-        return ApiResult.success(remoteTsdbProxyService.queryHistoryDeviceData(reqVO));
+        return remoteTsdbProxyService.queryHistoryDeviceData(reqVO);
     }
 
     /**

+ 8 - 3
service-tsdb/service-tsdb-api/src/main/java/com/usky/demo/RemoteTsdbProxyService.java

@@ -4,13 +4,18 @@ package com.usky.demo;
 import com.usky.common.core.bean.ApiResult;
 import com.usky.demo.domain.*;
 import com.usky.demo.factory.RemoteTsdbProxyFallbackFactory;
+import com.usky.demo.feign.RemoteTsdbProxyFeignConfiguration;
 import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
 import java.util.Map;
 
-@FeignClient(contextId = "remoteTsdbProxyService", value = "service-tsdb", fallbackFactory = RemoteTsdbProxyFallbackFactory.class)
+@FeignClient(
+        contextId = "remoteTsdbProxyService",
+        value = "service-tsdb",
+        fallbackFactory = RemoteTsdbProxyFallbackFactory.class,
+        configuration = RemoteTsdbProxyFeignConfiguration.class)
 public interface RemoteTsdbProxyService {
 
 //    /**
@@ -40,7 +45,7 @@ public interface RemoteTsdbProxyService {
      * @return
      */
     @PostMapping("/externalLast")
-    List<LastInnerResultVO> queryLastDeviceData(@RequestBody LastInnerQueryVO requestVO);
+    ApiResult<List<LastInnerResultVO>> queryLastDeviceData(@RequestBody LastInnerQueryVO requestVO);
 
 //    /**
 //     * 单个设备历史数据查询
@@ -60,7 +65,7 @@ public interface RemoteTsdbProxyService {
      * @return
      */
     @PostMapping("/history")
-    List<HistorysInnerResultVO> queryHistoryDeviceData(@RequestBody HistorysInnerRequestVO requestVO);
+    ApiResult<List<HistorysInnerResultVO>> queryHistoryDeviceData(@RequestBody HistorysInnerRequestVO requestVO);
 
 
     /**

+ 2 - 2
service-tsdb/service-tsdb-api/src/main/java/com/usky/demo/factory/RemoteTsdbProxyFallbackFactory.java

@@ -56,7 +56,7 @@ public class RemoteTsdbProxyFallbackFactory implements FallbackFactory<RemoteTsd
 //            }
 
             @Override
-            public List<LastInnerResultVO> queryLastDeviceData(LastInnerQueryVO requestVO)
+            public ApiResult<List<LastInnerResultVO>> queryLastDeviceData(LastInnerQueryVO requestVO)
             {
                 throw new BusinessException("批量设备实时数据查询:" + throwable.getMessage());
             }
@@ -70,7 +70,7 @@ public class RemoteTsdbProxyFallbackFactory implements FallbackFactory<RemoteTsd
 //            }
 
             @Override
-            public List<HistorysInnerResultVO> queryHistoryDeviceData(HistorysInnerRequestVO requestVO)
+            public ApiResult<List<HistorysInnerResultVO>> queryHistoryDeviceData(HistorysInnerRequestVO requestVO)
             {
                 throw new BusinessException("批量设备历史数据查询:" + throwable.getMessage());
             }

+ 23 - 0
service-tsdb/service-tsdb-api/src/main/java/com/usky/demo/feign/RemoteTsdbProxyFeignConfiguration.java

@@ -0,0 +1,23 @@
+package com.usky.demo.feign;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import feign.codec.Decoder;
+import org.springframework.beans.factory.ObjectFactory;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
+import org.springframework.cloud.openfeign.support.SpringDecoder;
+import org.springframework.context.annotation.Bean;
+
+/**
+ * 仅作用于 {@code remoteTsdbProxyService}:解析裸数组或 {@link com.usky.common.core.bean.ApiResult}。
+ */
+public class RemoteTsdbProxyFeignConfiguration {
+
+    @Bean
+    public Decoder feignDecoder(
+            ObjectFactory<HttpMessageConverters> messageConverters,
+            ObjectProvider<ObjectMapper> objectMapperProvider) {
+        ObjectMapper mapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);
+        return new TsdbProxyApiResultFeignDecoder(new SpringDecoder(messageConverters), mapper);
+    }
+}

+ 84 - 0
service-tsdb/service-tsdb-api/src/main/java/com/usky/demo/feign/TsdbProxyApiResultFeignDecoder.java

@@ -0,0 +1,84 @@
+package com.usky.demo.feign;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.usky.common.core.bean.ApiResult;
+import com.usky.demo.domain.HistorysInnerResultVO;
+import com.usky.demo.domain.LastInnerResultVO;
+import feign.Response;
+import feign.Util;
+import feign.codec.Decoder;
+
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * 兼容 TSDB 两种响应:裸 JSON 数组(旧实现)与 {@link ApiResult} 包装(新实现)。
+ */
+public class TsdbProxyApiResultFeignDecoder implements Decoder {
+
+    private final Decoder delegate;
+    private final ObjectMapper objectMapper;
+
+    public TsdbProxyApiResultFeignDecoder(Decoder delegate, ObjectMapper objectMapper) {
+        this.delegate = delegate;
+        this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public Object decode(Response response, Type type) throws IOException {
+        if (response.body() == null) {
+            return delegate.decode(response, type);
+        }
+        byte[] bodyData = Util.toByteArray(response.body().asInputStream());
+        String trimmed = new String(bodyData, StandardCharsets.UTF_8).trim();
+        if (trimmed.startsWith("[")) {
+            Optional<Object> wrapped = wrapJsonArrayAsApiResult(bodyData, type);
+            if (wrapped.isPresent()) {
+                return wrapped.get();
+            }
+        }
+        Response rebuilt = Response.builder()
+                .status(response.status())
+                .reason(response.reason())
+                .headers(response.headers())
+                .request(response.request())
+                .body(bodyData)
+                .build();
+        return delegate.decode(rebuilt, type);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Optional<Object> wrapJsonArrayAsApiResult(byte[] bodyData, Type type) throws IOException {
+        if (!(type instanceof ParameterizedType)) {
+            return Optional.empty();
+        }
+        ParameterizedType pt = (ParameterizedType) type;
+        if (!ApiResult.class.isAssignableFrom((Class<?>) pt.getRawType())) {
+            return Optional.empty();
+        }
+        Type dataType = pt.getActualTypeArguments()[0];
+        if (!(dataType instanceof ParameterizedType)) {
+            return Optional.empty();
+        }
+        ParameterizedType listPt = (ParameterizedType) dataType;
+        if (!List.class.isAssignableFrom((Class<?>) listPt.getRawType())) {
+            return Optional.empty();
+        }
+        Type elemType = listPt.getActualTypeArguments()[0];
+        if (!(elemType instanceof Class)) {
+            return Optional.empty();
+        }
+        Class<?> elemClass = (Class<?>) elemType;
+        if (elemClass != LastInnerResultVO.class && elemClass != HistorysInnerResultVO.class) {
+            return Optional.empty();
+        }
+        CollectionType listJavaType = objectMapper.getTypeFactory().constructCollectionType(List.class, elemClass);
+        List<?> list = objectMapper.readValue(bodyData, listJavaType);
+        return Optional.of(ApiResult.success((List) list));
+    }
+}

+ 6 - 6
service-tsdb/service-tsdb-biz/src/main/java/com/usky/demo/controller/api/DataTsdbProxyControllerApi.java

@@ -75,11 +75,11 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
      * @return
      */
     @Override
-    public List<LastInnerResultVO> queryLastDeviceData(LastInnerQueryVO requestVO){
+    public ApiResult<List<LastInnerResultVO>> queryLastDeviceData(LastInnerQueryVO requestVO){
         if(sourcetype.equals("taos")){
-            return queryTdengineDataService.tdengineLast(requestVO);
+            return ApiResult.success(queryTdengineDataService.tdengineLast(requestVO));
         }else{
-            return queryInfluxdbDataService.last(requestVO);
+            return ApiResult.success(queryInfluxdbDataService.last(requestVO));
         }
     }
 
@@ -103,11 +103,11 @@ public class DataTsdbProxyControllerApi implements RemoteTsdbProxyService {
      * @return
      */
     @Override
-    public List<HistorysInnerResultVO> queryHistoryDeviceData(HistorysInnerRequestVO requestVO){
+    public ApiResult<List<HistorysInnerResultVO>> queryHistoryDeviceData(HistorysInnerRequestVO requestVO){
         if(sourcetype.equals("taos")) {
-            return queryTdengineDataService.tdengineHistory(requestVO);
+            return ApiResult.success(queryTdengineDataService.tdengineHistory(requestVO));
         }else {
-            return queryInfluxdbDataService.history(requestVO);
+            return ApiResult.success(queryInfluxdbDataService.history(requestVO));
         }
     }