Przeglądaj źródła

拓普索尔电信AEP平台

hanzhengyi 1 tydzień temu
rodzic
commit
9bca271a3f

+ 19 - 1
agbox-topsail/src/main/java/com/usky/topsail/controller/web/SpSj2017Controller.java

@@ -1,9 +1,16 @@
 package com.usky.topsail.controller.web;
 
 
+import com.usky.common.core.bean.ApiResult;
+import com.usky.topsail.service.SpOwnerService;
+import com.usky.topsail.service.SpSj2017Service;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 
 import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RestController;
 
 /**
  * <p>
@@ -13,9 +20,20 @@ import org.springframework.stereotype.Controller;
  * @author han
  * @since 2024-02-02
  */
-@Controller
+@RestController
 @RequestMapping("/spSj2017")
 public class SpSj2017Controller {
+    @Autowired
+    private SpSj2017Service spSj2017Service;
 
+    /**
+     * 新增
+     * @return
+     */
+    @PostMapping("/alarmInfo")
+    ApiResult<Void> add(@RequestBody String requestBody){
+        spSj2017Service.add(requestBody);
+        return ApiResult.success();
+    }
 }
 

+ 1 - 1
agbox-topsail/src/main/java/com/usky/topsail/service/SpSj2017Service.java

@@ -12,5 +12,5 @@ import com.usky.common.mybatis.core.CrudService;
  * @since 2024-02-02
  */
 public interface SpSj2017Service extends CrudService<SpSj2017> {
-
+    boolean add(String requestBody);
 }

+ 50 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,50 @@
+package com.usky.topsail.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+    @Value("${mqtt.username}")
+    private String username;
+
+    @Value("${mqtt.password}")
+    private String password;
+
+    @Value("${mqtt.url}")
+    private String hostUrl;
+
+    @Value("${mqtt.sub-topics}")
+    private String msgTopic;
+
+    @Value("${mqtt.keep-alive-interval}")
+    //心跳间隔
+    private int keepAliveInterval;
+    @Value("${mqtt.completionTimeout}")
+    //心跳间隔
+    private int completionTimeout;
+
+
+    @Bean
+    public MqttPahoClientFactory mqttClientFactory() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{this.getHostUrl()});
+        options.setUserName(this.getUsername());
+        options.setPassword(this.getPassword().toCharArray());
+        factory.setConnectionOptions(options);
+        return factory;
+    }
+
+}

+ 48 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,48 @@
+package com.usky.topsail.service.config.mqtt;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+
+/**
+ * @author han
+ * @date 2025/03/20 14:30
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttInConfig {
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     *
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        String[] tops = mqttBaseConfig.getMsgTopic().split(",");
+        String clientId = "h-agbox-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), tops);
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 86 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,86 @@
+package com.usky.topsail.service.config.mqtt;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+/**
+ * @author han
+ * @date 2025/03/20 14:31
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttOutConfig {
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+
+    /**
+     * 连接通道
+     *
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     *
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         *
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         *
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         *
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 10 - 9
agbox-topsail/src/main/java/com/usky/topsail/service/impl/SpOwnerServiceImpl.java

@@ -11,14 +11,12 @@ import com.usky.topsail.mapper.SpOwnerMapper;
 import com.usky.topsail.service.AaService;
 import com.usky.topsail.service.SpOwnerService;
 import com.usky.common.mybatis.core.AbstractCrudService;
-import com.usky.topsail.service.util.DynamicTableDataInsertion;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
-import java.util.Calendar;
-import java.util.List;
+import java.util.*;
 
 /**
  * <p>
@@ -50,6 +48,7 @@ public class SpOwnerServiceImpl extends AbstractCrudService<SpOwnerMapper, SpOwn
             String device_code = payloadVO.get("IMEI").toString();
             Integer bat_level = Integer.parseInt(payloadVO.get("bat_level").toString());
             Integer CSQ = Integer.parseInt(payloadVO.get("CSQ").toString());
+            Integer dev_type = Integer.parseInt(payloadVO.get("dev_type").toString());
             Integer alarm = Integer.parseInt(payloadVO.get("alarm").toString());
             String samp_time = LocalDateTime.now().toString();
             String sensor_dat = payloadVO.get("sensor_dat").toString();
@@ -63,12 +62,14 @@ public class SpOwnerServiceImpl extends AbstractCrudService<SpOwnerMapper, SpOwn
                 device_value = strArray1[0];
             }
             String device_status = "";
-            if (alarm.equals(0)){
-                device_status = "WP0";
-            }else if (alarm.equals(131072)){
-                device_status = "WP0";
-            }else if (alarm.equals(262144)){
-                device_status = "WP0";
+            if(dev_type.equals(1)){
+                if (alarm.equals(0)){
+                    device_status = "WP0";
+                }
+            }else if (dev_type.equals(2)){
+                if (alarm.equals(0)){
+                    device_status = "LL0";
+                }
             }
             // 构建SQL语句
             String sql =

+ 215 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/impl/SpSj2017ServiceImpl.java

@@ -1,11 +1,32 @@
 package com.usky.topsail.service.impl;
 
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.toolkit.SqlRunner;
+import com.usky.topsail.domain.Aa;
+import com.usky.topsail.domain.SpOwner;
 import com.usky.topsail.domain.SpSj2017;
 import com.usky.topsail.mapper.SpSj2017Mapper;
+import com.usky.topsail.service.AaService;
+import com.usky.topsail.service.SpOwnerService;
 import com.usky.topsail.service.SpSj2017Service;
 import com.usky.common.mybatis.core.AbstractCrudService;
+import com.usky.topsail.service.config.mqtt.MqttOutConfig;
+import com.usky.topsail.service.util.HttpClientUtil;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * <p>
  * asf 服务实现类
@@ -16,5 +37,199 @@ import org.springframework.stereotype.Service;
  */
 @Service
 public class SpSj2017ServiceImpl extends AbstractCrudService<SpSj2017Mapper, SpSj2017> implements SpSj2017Service {
+    @Autowired
+    private AaService aaService;
+
+    @Autowired
+    private SpOwnerService spOwnerService;
+
+    @Resource
+    private MqttOutConfig.MqttGateway mqttGateway;
 
+    @Override
+    public boolean add(String requestBody){
+        JSONObject requestVO = JSONObject.parseObject(requestBody);
+        String serviceId = requestVO.get("serviceId").toString();
+        if (StringUtils.isNotBlank(serviceId) && serviceId.equals("sensor_dat_report")){
+            String payload = requestVO.get("payload").toString();
+            JSONObject payloadVO = JSONObject.parseObject(payload);
+            String device_code = payloadVO.get("IMEI").toString();
+            LambdaQueryWrapper<SpOwner> wrapper = Wrappers.lambdaQuery();
+            wrapper.select(SpOwner::getCompany,SpOwner::getOwnerName);
+            wrapper.eq(SpOwner::getOwnerCode, device_code);
+            List<SpOwner> deviceList = spOwnerService.list(wrapper);
+            if (deviceList.size()>0){
+                Integer bat_level = Integer.parseInt(payloadVO.get("bat_level").toString());
+                Integer CSQ = Integer.parseInt(payloadVO.get("CSQ").toString());
+                Integer dev_type = Integer.parseInt(payloadVO.get("dev_type").toString());
+                String alarm = payloadVO.get("alarm").toString();
+                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+                String samp_time = LocalDateTime.now().format(formatter);
+                String sensor_dat = payloadVO.get("sensor_dat").toString();
+                String[] strArray = sensor_dat.split("\\|");
+                String sensor_dat_value = strArray[1];
+                String[] strArray1 = sensor_dat_value.split(",");
+                String device_value = "";
+                if (strArray1.length>=2){
+                    device_value = strArray1[1];
+                }else {
+                    device_value = strArray1[0];
+                }
+                String device_status = "";
+                JSONObject payloadVO1 = new JSONObject();
+                List<Map<String, Object>> payloadVO3 = new ArrayList<>();
+                JSONObject payloadVO2 = new JSONObject();
+                payloadVO1.put("devId",device_code);
+                payloadVO1.put("connType","G");
+                payloadVO1.put("deviceName",deviceList.get(0).getOwnerName());
+                payloadVO2.put("deviceName",deviceList.get(0).getOwnerName());
+                Instant instant = Instant.now();
+                long seconds = instant.getEpochSecond();
+                payloadVO2.put("timeStamp",seconds);
+                payloadVO1.put("timeStamp",seconds);
+                payloadVO2.put("deviceModel","");
+                payloadVO2.put("devId",device_code);
+                List<Map<String, Object>> db = new ArrayList<>();
+                Map<String, Object> dbMap = new HashMap<>();
+                dbMap.put("property","电量");
+                dbMap.put("serial","201");
+                dbMap.put("value",bat_level);
+                db.add(dbMap);
+                Map<String, Object> dbMap1 = new HashMap<>();
+                dbMap1.put("property","信号");
+                dbMap1.put("serial","202");
+                dbMap1.put("value","3");
+                db.add(dbMap1);
+                if(dev_type.equals(1)){
+                    payloadVO2.put("deviceType","2");
+                    Map<String, Object> dbMap2 = new HashMap<>();
+                    dbMap2.put("property","水压");
+                    dbMap2.put("serial","203");
+                    dbMap2.put("value",device_value);
+                    db.add(dbMap2);
+                    Map<String, Object> dbMap3 = new HashMap<>();
+                    dbMap3.put("property","状态");
+                    dbMap3.put("serial","204");
+                    if (alarm.equals("0")){
+                        device_status = "WP0";
+                        dbMap3.put("value","0");
+                        payloadVO1.put("type","INFO");
+                    }else if (alarm.equals("131072")){
+                        device_status = "WP1";
+                        dbMap3.put("value","1");
+                        payloadVO1.put("type","ALARM");
+                    }else if (alarm.equals("262144")){
+                        device_status = "WP2";
+                        dbMap3.put("value","2");
+                        payloadVO1.put("type","ALARM");
+                    }
+                    db.add(dbMap3);
+                }else if (dev_type.equals(2)){
+                    payloadVO2.put("deviceType","5");
+                    Map<String, Object> dbMap2 = new HashMap<>();
+                    dbMap2.put("property","水位");
+                    dbMap2.put("serial","203");
+                    dbMap2.put("value",device_value);
+                    db.add(dbMap2);
+                    Map<String, Object> dbMap3 = new HashMap<>();
+                    dbMap3.put("property","状态");
+                    dbMap3.put("serial","204");
+                    if (alarm.equals("0")){
+                        device_status = "LL0";
+                        dbMap3.put("value","0");
+                        payloadVO1.put("type","INFO");
+                    }else if (alarm.equals("131072")){
+                        payloadVO1.put("type","ALARM");
+                        dbMap3.put("value","1");
+                        device_status = "LL1";
+                        SpSj2017 spSj2017 = new SpSj2017();
+                        spSj2017.setPort("TSM-10P");
+                        spSj2017.setDeviceCode(device_code);
+                        spSj2017.setStatus("17");
+                        spSj2017.setAddress("127.0.0.1");
+                        spSj2017.setNcmd("");
+                        spSj2017.setTime(LocalDateTime.now());
+                        spSj2017.setData1(device_status);
+                        spSj2017.setData2(bat_level.toString());
+                        spSj2017.setData3(CSQ.toString());
+                        spSj2017.setData4(device_value);
+                        spSj2017.setData5("11111111111");
+                        spSj2017.setCllxr("");
+                        spSj2017.setClsj(LocalDateTime.now());
+                        spSj2017.setClr("");
+                        spSj2017.setClnr("");
+                        spSj2017.setClwb("");
+                        spSj2017.setCldh("");
+                        spSj2017.setClzt(0);
+                        spSj2017.setCllx("");
+                        spSj2017.setVideo("");
+                        this.save(spSj2017);
+                        Long insertid1 = spSj2017.getId();
+                        HashMap map = new HashMap();
+                        map.put("IMEI", device_code);
+                        map.put("insert_id", String.valueOf(insertid1));
+                        map.put("phone", "17630065224");
+                        map.put("time", samp_time);
+                        map.put("evt", device_status);
+                        String httpurl = "https://iot.usky.cn/jdxf/wxapp2.php/Home/Waterwarn/message_water";
+                        String httpOrgCreateTestRtn = HttpClientUtil.doPost(httpurl, map, "utf-8");
+                    }else if (alarm.equals("262144")){
+                        payloadVO1.put("type","ALARM");
+                        dbMap3.put("value","2");
+                        device_status = "LL2";
+                        SpSj2017 spSj2017 = new SpSj2017();
+                        spSj2017.setPort("TSM-10P");
+                        spSj2017.setDeviceCode(device_code);
+                        spSj2017.setStatus("17");
+                        spSj2017.setAddress("127.0.0.1");
+                        spSj2017.setNcmd("");
+                        spSj2017.setTime(LocalDateTime.now());
+                        spSj2017.setData1(device_status);
+                        spSj2017.setData2(bat_level.toString());
+                        spSj2017.setData3(CSQ.toString());
+                        spSj2017.setData4(device_value);
+                        spSj2017.setData5("11111111111");
+                        spSj2017.setCllxr("");
+                        spSj2017.setClsj(LocalDateTime.now());
+                        spSj2017.setClr("");
+                        spSj2017.setClnr("");
+                        spSj2017.setClwb("");
+                        spSj2017.setCldh("");
+                        spSj2017.setClzt(0);
+                        spSj2017.setCllx("");
+                        spSj2017.setVideo("");
+                        this.save(spSj2017);
+                        Long insertid1 = spSj2017.getId();
+                        HashMap map = new HashMap();
+                        map.put("IMEI", device_code);
+                        map.put("insert_id", String.valueOf(insertid1));
+                        map.put("phone", "17630065224");
+                        map.put("time", samp_time);
+                        map.put("evt", device_status);
+                        String httpurl = "https://iot.usky.cn/jdxf/wxapp2.php/Home/Waterwarn/message_water";
+                        String httpOrgCreateTestRtn = HttpClientUtil.doPost(httpurl, map, "utf-8");
+                    }
+                    db.add(dbMap3);
+                }
+                payloadVO2.put("dp",db);
+                payloadVO3.add(payloadVO2);
+                payloadVO1.put("devs",payloadVO3);
+                // 构建SQL语句
+                String sql =
+                        "INSERT INTO " + "sp_d"+device_code + "(device_code,port,time,data1,data2," +
+                                "data3,data4,data5) " + "VALUES ("+ "\""+device_code+ "\""+","+"\""+"TSM-10P"+"\""+","+"\""+samp_time+"\""
+                                +","+"\""+device_status+"\""+","+"\""+bat_level+"\""+","+"\""+CSQ+"\""+","+"\""+device_value+"\""+","+"\""+"11111111111"+"\""+")";
+                // 调用JdbcTemplate进行数据库操作
+                SqlRunner.db().insert(sql);
+                System.out.print(payloadVO1.toJSONString());
+                mqttGateway.sendToMqtt("/usky/ytDP0002/"+deviceList.get(0).getCompany()+"/"+device_code+"/info",
+                        payloadVO1.toJSONString());
+            }
+        }
+        Aa aa = new Aa();
+        aa.setData(requestBody);
+        aa.setTime(LocalDateTime.now());
+        aaService.save(aa);
+        return true;
+    }
 }

+ 55 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/listener/MqttListener.java

@@ -0,0 +1,55 @@
+//package com.usky.topsail.service.listener;
+//
+//import com.usky.topsail.service.config.mqtt.MqttInConfig;
+//import com.usky.topsail.service.mqtt.SimpleContext;
+//import com.usky.topsail.service.vo.MqttBaseVO;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.integration.annotation.ServiceActivator;
+//import org.springframework.messaging.MessageHandler;
+//import org.springframework.stereotype.Component;
+//
+///**
+// * @author han
+// * @date 2025/03/20 14:41
+// */
+//@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+//@Slf4j
+//@Component
+//public class MqttListener {
+//    public static final String MESSAGE_NAME = "messageInput";
+//
+//    @Autowired
+//    private SimpleContext simpleContext;
+//
+//    /**
+//     * 处理消息-消费者
+//     *
+//     * @return
+//     */
+//    @Bean(MESSAGE_NAME)
+//    @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
+//    public MessageHandler handler() {
+//        return message -> {
+//            String payload = message.getPayload().toString();
+//            //进行接口推送
+//            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+//            if (null != mqttReceivedTopic) {
+//                String topic = mqttReceivedTopic.toString();
+//                MqttBaseVO mqttBaseVO = new MqttBaseVO();
+//                mqttBaseVO.setTopic(topic);
+//                if (topic.indexOf("info") != -1 ) {
+//                    mqttBaseVO.setDescribe("info");
+//                    mqttBaseVO.setData(payload);
+//                }else if(topic.indexOf("event") != -1 ) {
+//                    mqttBaseVO.setDescribe("event");
+//                    mqttBaseVO.setData(payload);
+//                }
+//                //统一处理数据
+//                simpleContext.getResource(mqttBaseVO);
+//            }
+//        };
+//    }
+//}

+ 13 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/mqtt/MqttStrategy.java

@@ -0,0 +1,13 @@
+package com.usky.topsail.service.mqtt;
+
+import com.usky.topsail.service.vo.MqttBaseVO;
+
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqttBaseVO
+     * @return
+     */
+    String disposeMessage(MqttBaseVO mqttBaseVO);
+}

+ 25 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/mqtt/SimpleContext.java

@@ -0,0 +1,25 @@
+package com.usky.topsail.service.mqtt;
+
+import com.usky.topsail.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 中间处理消息转发
+ */
+@Service
+public class SimpleContext {
+    @Autowired
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MqttBaseVO mqttBaseVO) {
+        return strategyMap.get(mqttBaseVO.getDescribe()).disposeMessage(mqttBaseVO);
+    }
+}

+ 27 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/mqtt/info/Info.java

@@ -0,0 +1,27 @@
+package com.usky.topsail.service.mqtt.info;
+
+import com.alibaba.fastjson.JSONObject;
+import com.usky.topsail.service.mqtt.MqttStrategy;
+import com.usky.topsail.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author han
+ * @date 2025/03/20 17:56
+ */
+@Service("info")
+public class Info implements MqttStrategy {
+
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+
+        try {
+            JSONObject map_data = JSONObject.parseObject(mqttBaseVO.getData().toString());
+            System.out.println("FInfoReceiver消费者收到消息: " + mqttBaseVO.getData().toString());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+}

+ 21 - 0
agbox-topsail/src/main/java/com/usky/topsail/service/vo/MqttBaseVO.java

@@ -0,0 +1,21 @@
+package com.usky.topsail.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author han
+ * @date 2025/03/20 14:41
+ */
+@Data
+public class MqttBaseVO {
+    /**
+     * 接口描述
+     */
+    private String describe;
+
+    private String topic;
+    /**
+     * 数据内容
+     */
+    private Object data;
+}

+ 8 - 0
agbox-topsail/src/main/resources/application.yml

@@ -3,6 +3,14 @@ server:
   port: 9892
   servlet:
     context-path: /agbox-topsail
+mqtt:
+  completionTimeout: 5000
+  enabled: true
+  keep-alive-interval: 60
+  password: usky
+  sub-topics: /usky/ytDP0009/10387/1/info
+  url: tcp://47.98.201.73:1883
+  username: usky
 mybatis:
   refresh:
     delay-seconds: 10