Selaa lähdekoodia

Merge branch 'han' of uskycloud/usky-modules into master

fuyuchuan 5 päivää sitten
vanhempi
commit
5881dcd319

+ 1 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/IotDataController.java

@@ -33,6 +33,7 @@ public class IotDataController {
      */
     @PostMapping("/flooded")
     public ApiResult<Void> sendWaterLeak(@RequestBody IotDataTransferVO jsonObject) {
+        // iotDataTransferService.createMqttConnection("3101130019", "ptrEQZK2");
         iotDataTransferService.sendWaterLeak(jsonObject);
         return ApiResult.success();
     }

+ 7 - 11
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/mqtt/MqttOutConfig.java

@@ -1,11 +1,7 @@
 package com.usky.cdi.service.config.mqtt;
 
-import com.alibaba.fastjson.JSONObject;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
 import org.springframework.integration.annotation.MessagingGateway;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.channel.DirectChannel;
@@ -47,7 +43,7 @@ public class MqttOutConfig {
     /**
      * MQTT消息发送处理器
      * 注意:这个方法会被Spring自动创建,用于处理mqttOutboundChannel通道上的消息
-     * 
+     *
      * @param factory MQTT客户端工厂
      * @return MessageHandler实例
      */
@@ -57,32 +53,32 @@ public class MqttOutConfig {
         // 注意:这里的client-id暂时使用固定值,因为username在启动时还不可用
         // 实际使用时,会在createMqttConnection方法中重新设置
         String clientId = "mqttx-" + System.currentTimeMillis();
-        MqttPahoMessageHandler messageHandler = 
+        MqttPahoMessageHandler messageHandler =
                 new MqttPahoMessageHandler(clientId, factory);
-        //如果设置成true,发送消息时将不会阻塞。
+        // 如果设置成true,发送消息时将不会阻塞。
         messageHandler.setAsync(true);
         messageHandler.setDefaultTopic(DEFAULT_TOPIC);
         return messageHandler;
     }
-    
+
     /**
      * MQTT客户端工厂
      * 注意:这个方法会被Spring自动创建,用于创建MQTT客户端
-     * 
+     *
      * @return DefaultMqttPahoClientFactory实例
      */
     @Bean
     public DefaultMqttPahoClientFactory mqttClientFactory() {
         // 创建默认的MqttPahoClientFactory
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-        
+
         // 设置默认的MqttConnectOptions,确保serverURIs不为null
         // 实际使用时,会在createMqttConnection方法中重新配置
         MqttConnectOptions options = new MqttConnectOptions();
         options.setServerURIs(new String[]{"ssl://114.80.201.143:8883"}); // 设置默认的服务器地址
         options.setKeepAliveInterval(60); // 设置默认的心跳间隔
         factory.setConnectionOptions(options);
-        
+
         return factory;
     }
 

+ 11 - 9
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/IotDataTransferService.java

@@ -48,11 +48,11 @@ import java.util.stream.Collectors;
 public class IotDataTransferService {
 
     private MqttOutConfig.MqttGateway mqttGateway;
-    
+
     // 注入ApplicationContext,确保总是能获取到
     @Autowired
     private ApplicationContext context;
-    
+
     // MQTT连接相关配置
     private static final String MQTT_URL = "ssl://114.80.201.143:8883";
     private static final String MQTT_TOPIC = "iotInfo/+";
@@ -116,6 +116,7 @@ public class IotDataTransferService {
 
         try {
             List<JSONObject> deviceData = deviceDataQuery.getDeviceData(transferVO);
+            log.warn("获取到的数据:{}", deviceData);
             Integer deviceType = transferVO.getDeviceType();
             Integer totalDevices = transferVO.getDevices().size();
 
@@ -902,36 +903,36 @@ public class IotDataTransferService {
      * @param username MQTT用户名
      * @param password MQTT密码
      */
-    private void createMqttConnection(String username, String password) {
+    public void createMqttConnection(String username, String password) {
         try {
             // 使用注入的ApplicationContext获取已有的mqttGateway实例
             // 因为我们保留了@MessagingGateway注解,Spring会自动创建这个实例
             if (this.context == null) {
                 throw new IllegalStateException("ApplicationContext未注入,无法获取MQTT Gateway");
             }
-            
+
             // 1. 获取mqttGateway实例
             this.mqttGateway = this.context.getBean(MqttOutConfig.MqttGateway.class);
             if (this.mqttGateway == null) {
                 throw new IllegalStateException("MQTT Gateway未找到,无法发送消息");
             }
-            
+
             // 2. 获取现有的mqttClientFactory实例
             DefaultMqttPahoClientFactory mqttClientFactory = this.context.getBean(DefaultMqttPahoClientFactory.class);
             if (mqttClientFactory == null) {
                 throw new IllegalStateException("MQTT Client Factory未找到,无法创建MQTT连接");
             }
-            
+
             // 3. 创建并配置MqttConnectOptions
             MqttConnectOptions options = new MqttConnectOptions();
             options.setServerURIs(new String[]{MQTT_URL});
             options.setUserName(username);
             options.setPassword(password.toCharArray());
             options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
-            
+
             // 4. 更新mqttClientFactory的连接选项
             mqttClientFactory.setConnectionOptions(options);
-            
+
             log.info("MQTT Gateway初始化成功,用户名:{}", username);
             log.info("MQTT连接配置完成,服务器地址:{},客户端ID:mqttx-{}", MQTT_URL, username);
         } catch (Exception e) {
@@ -958,7 +959,8 @@ public class IotDataTransferService {
      * @return 解析后的时间,如果解析失败返回null
      */
     private LocalDateTime parseDataTime(JSONObject deviceDataItem) {
-        Long dataTime = deviceDataItem.getLong("time");
+        log.warn("解析的json{}", deviceDataItem.toString());
+        Long dataTime = deviceDataItem.getLong("realtime");
         if (dataTime == null) {
             log.warn("设备{}的time为空", deviceDataItem.getString("device_id"));
             return null;

+ 50 - 9
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataQuery.java

@@ -100,10 +100,11 @@ public class DeviceDataQuery {
             List<String> deviceUuids = transferVO.getDevices().stream()
                     .map(DmpDevice::getDeviceUuid)
                     .collect(Collectors.toList());
-            requestBody.put("deviceUuids", deviceUuids);
+            requestBody.put("deviceuuid", deviceUuids);
 
             log.debug("请求设备数据接口,设备数量:{}", deviceUuids.size());
             String response = HttpClientUtils.doPostJson(baseUrl, requestBody.toJSONString());
+            log.warn("接口返回数据:{}", response);
 
             List<JSONObject> resultList = parseResponseData(response, transferVO.getDeviceType(), transferVO.getDevices());
 
@@ -111,6 +112,48 @@ public class DeviceDataQuery {
             if (resultList.isEmpty() && simulation) {
                 log.info("接口返回数据为空,生成模拟数据,设备类型:{}", transferVO.getDeviceType());
                 resultList = generateSimulationData(transferVO.getDeviceType(), transferVO.getDevices());
+            } else if (resultList.size() < transferVO.getDevices().size()) {
+                log.warn("接口返回数据数量与请求数量不一致,设备类型:{}", transferVO.getDeviceType());
+
+                // 获取返回数据中的device_id集合
+                Set<String> returnedDeviceIds = resultList.stream()
+                        .map(data -> data.getString("device_id"))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toSet());
+
+                // 获取请求中devices的device_id集合
+                Map<String, DmpDevice> requestDeviceMap = transferVO.getDevices().stream()
+                        .filter(device -> device.getDeviceId() != null)
+                        .collect(Collectors.toMap(DmpDevice::getDeviceId, device -> device));
+
+                // 找出返回数据中缺少的device_id
+                List<String> missingDeviceIds = requestDeviceMap.keySet().stream()
+                        .filter(deviceId -> !returnedDeviceIds.contains(deviceId))
+                        .collect(Collectors.toList());
+
+                if (!missingDeviceIds.isEmpty() && simulation) {
+                    log.info("发现缺少的设备数据,生成模拟数据,设备数量:{}", missingDeviceIds.size());
+                    // 为缺少的设备生成模拟数据
+                    List<DmpDevice> missingDevices = missingDeviceIds.stream()
+                            .map(requestDeviceMap::get)
+                            .collect(Collectors.toList());
+                    List<JSONObject> missingSimulationData = generateSimulationData(transferVO.getDeviceType(), missingDevices);
+                    // 将模拟数据与返回数据结合
+                    resultList.addAll(missingSimulationData);
+                }
+
+                // 校验结合后的数据是否与请求的device_id一一对应
+                Set<String> combinedDeviceIds = resultList.stream()
+                        .map(data -> data.getString("device_id"))
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toSet());
+
+                if (combinedDeviceIds.size() != transferVO.getDevices().size()) {
+                    log.warn("数据整合后仍存在缺失,请求设备数量:{},返回设备数量:{}",
+                            transferVO.getDevices().size(), combinedDeviceIds.size());
+                } else {
+                    log.debug("数据整合完成,设备数量与请求一致:{}", combinedDeviceIds.size());
+                }
             }
 
             return resultList;
@@ -192,9 +235,7 @@ public class DeviceDataQuery {
                 // 添加设备标识信息
                 targetData.put("deviceuuid", deviceUuid);
                 String deviceId = deviceUuidToIdMap.get(deviceUuid);
-                if (deviceId != null) {
-                    targetData.put("device_id", deviceId);
-                }
+                targetData.put("device_id", deviceId);
 
                 if (hasValidData) {
                     resultList.add(targetData);
@@ -217,13 +258,13 @@ public class DeviceDataQuery {
     private List<String> getTargetFieldsByDeviceType(Integer deviceType) {
         if (deviceType == null) {
             log.warn("获取目标字段失败:设备类型为空");
-            return Collections.singletonList("time");
+            return Collections.singletonList("realtime");
         }
 
         String fieldsStr = deviceFieldConfig.deviceFieldMapping().get(deviceType.toString());
         if (fieldsStr == null || fieldsStr.trim().isEmpty()) {
             log.warn("获取目标字段失败:设备类型{}对应的字段映射不存在", deviceType);
-            return Collections.singletonList("time");
+            return Collections.singletonList("realtime");
         }
 
         List<String> fields = Arrays.stream(fieldsStr.split(","))
@@ -232,8 +273,8 @@ public class DeviceDataQuery {
                 .collect(Collectors.toList());
 
         // 确保包含时间字段
-        if (!fields.contains("time")) {
-            fields.add("time");
+        if (!fields.contains("realtime")) {
+            fields.add("realtime");
         }
 
         return fields;
@@ -254,7 +295,7 @@ public class DeviceDataQuery {
 
         for (DmpDevice device : devices) {
             JSONObject simulationData = new JSONObject();
-            simulationData.put("time", currentTime);
+            simulationData.put("realtime", currentTime);
             simulationData.put("device_id", device.getDeviceId());
 
             if (deviceType == null) {

+ 1 - 1
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/DeviceDataSyncService.java

@@ -26,7 +26,7 @@ public class DeviceDataSyncService {
      * fixedDelay:任务执行完成后固定延迟29分钟执行下一次
      * initialDelay:初始化后立即执行第一次任务
      */
-    //@Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
+    // @Scheduled(fixedDelay = 14 * 60 * 1000, initialDelay = 0)
     // public void scheduledDeviceDataSync() {
     //     Integer tenantId = 1208;
     //     Long engineeringId = 3101130019L;

+ 10 - 1
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/HttpClientUtils.java

@@ -1,5 +1,6 @@
 package com.usky.cdi.service.util;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.entity.UrlEncodedFormEntity;
@@ -20,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+@Slf4j
 public class HttpClientUtils {
     // 使用静态连接池,避免每次请求都创建和关闭连接
     private static final CloseableHttpClient HTTP_CLIENT = HttpClients.custom()
@@ -53,7 +55,7 @@ public class HttpClientUtils {
                 resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            log.error("HTTP请求异常: {}", e.getMessage(), e);
         } finally {
             try {
                 if (response != null) {
@@ -109,12 +111,19 @@ public class HttpClientUtils {
     }
 
     public static String doPostJson(String url, String json) {
+        log.info(">>> 请求URL={}, 请求体={}", url, json);
+
         CloseableHttpResponse response = null;
         String resultString = "";
         try {
             // 创建Http Post请求
             HttpPost httpPost = new HttpPost(url);
             // 创建请求内容
+            httpPost.setHeader("Accept", "*/*");
+            httpPost.setHeader("Content-Type", "application/json");
+            httpPost.setHeader("Accept-Encoding", "gzip, deflate, br");
+            httpPost.setHeader("Connection", "keep-alive");
+
             StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
             httpPost.setEntity(entity);
             // 执行http请求

+ 13 - 4
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/util/WeatherFetcher.java

@@ -7,6 +7,7 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
+
 import lombok.extern.slf4j.Slf4j;
 import org.json.JSONObject;
 
@@ -52,17 +53,18 @@ public class WeatherFetcher {
         double tempCelsius = DEFAULT_TEMPERATURE;
         int humidity = DEFAULT_HUMIDITY;
 
-        try {
-            log.debug("开始调用OpenWeatherMap API获取天气数据");
+        log.debug("开始调用OpenWeatherMap API获取天气数据");
+        long startTime = System.currentTimeMillis();
 
+        try {
             // 1. 构造请求URL
             URL url = new URL(API_URL);
 
             // 2. 建立连接并发送请求
             HttpURLConnection conn = (HttpURLConnection) url.openConnection();
             conn.setRequestMethod("GET");
-            conn.setConnectTimeout(5000); // 连接超时:5秒
-            conn.setReadTimeout(5000);    // 读取超时:5秒
+            conn.setConnectTimeout(15000);
+            conn.setReadTimeout(15000);
 
             // 3. 读取响应
             BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
@@ -96,6 +98,7 @@ public class WeatherFetcher {
             log.debug("=== 天气解析结果 ===");
             log.debug("城市: {}", cityName);
             log.debug("温度: {:.2f}°C (原始: {}K)", tempCelsius, tempKelvin);
+            System.out.println("体感温度: {:.2f}°C (原始: {}K)" + feelsLikeCelsius + "==========" + feelsLikeKelvin);
             log.debug("体感温度: {:.2f}°C", feelsLikeCelsius);
             log.debug("湿度: {}%", humidity);
             log.debug("天气状况: {}", description);
@@ -119,6 +122,12 @@ public class WeatherFetcher {
             log.error("获取天气数据失败:{}", e.getMessage());
             // 异常时使用默认值
             log.warn("使用默认天气数据,温度:{}°C,湿度:{}%", DEFAULT_TEMPERATURE, DEFAULT_HUMIDITY);
+        } finally {
+            // 打印API调用结束时间和时长
+            long endTime = System.currentTimeMillis();
+            long duration = endTime - startTime;
+            System.out.println("第三方天气API调用时长" + duration + "毫秒");
+            log.info("OpenWeatherMap API调用结束,开始时间: {}, 结束时间: {}, 时长: {}ms", startTime, endTime, duration);
         }
 
         Map<String, Double> resultMap = new HashMap<>();