Browse Source

Mqtt源代码搬迁

jichaobo 2 years ago
parent
commit
2b51f9f701
23 changed files with 1070 additions and 2 deletions
  1. 9 1
      service-fire/service-fire-biz/pom.xml
  2. 1 1
      service-fire/service-fire-biz/src/main/java/com/usky/fire/controller/MybatisGeneratorUtils.java
  3. 94 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/domain/WaterAj.java
  4. 17 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/mapper/WaterAjMapper.java
  5. 17 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/WaterAjService.java
  6. 34 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/api/KafkaTableApi.java
  7. 126 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/api/mhWater/OneCardApi.java
  8. 50 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/config/mqtt/MqttBaseConfig.java
  9. 53 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/config/mqtt/MqttInConfig.java
  10. 84 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/config/mqtt/MqttOutConfig.java
  11. 76 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/enums/TopListener.java
  12. 20 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/WaterAjServiceImpl.java
  13. 74 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/listener/MqttListener.java
  14. 20 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/MqttStrategy.java
  15. 26 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/SimpleContext.java
  16. 42 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/cy/CyStrategy.java
  17. 90 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/fire/FireStrategy.java
  18. 59 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/mh/MhStrategy.java
  19. 41 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/FireAlarmDevice.java
  20. 45 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/FireAlarmProperty.java
  21. 50 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/FireAlarmVO.java
  22. 21 0
      service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/MqttBaseVO.java
  23. 21 0
      service-fire/service-fire-biz/src/main/resources/mapper/fire/WaterAjMapper.xml

+ 9 - 1
service-fire/service-fire-biz/pom.xml

@@ -57,7 +57,15 @@
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
 
-
+        <!--MQTT依赖-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
service-fire/service-fire-biz/src/main/java/com/usky/fire/controller/MybatisGeneratorUtils.java

@@ -71,7 +71,7 @@ public class MybatisGeneratorUtils {
         // strategy.setTablePrefix("t_"); // 表名前缀
         strategy.setEntityLombokModel(true); //使用lombok
         //修改自己想要生成的表
-        strategy.setInclude("dem_large_security_manage");  // 逆向工程使用的表   如果要生成多个,这里可以传入String[]
+        strategy.setInclude("tb_info");  // 逆向工程使用的表   如果要生成多个,这里可以传入String[]
         mpg.setStrategy(strategy);
 
         // 关闭默认 xml 生成,调整生成 至 根目录

+ 94 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/domain/WaterAj.java

@@ -0,0 +1,94 @@
+package com.usky.fire.domain;
+
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * <p>
+ *
+ * </p>
+ *
+ * @author ya
+ * @since 2021-11-05
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Accessors(chain = true)
+public class WaterAj implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 案件编号
+     */
+    @TableId(value = "id", type = IdType.INPUT)
+    private Long id;
+
+    /**
+     * 发生时间
+     */
+    private Date createTime;
+
+    /**
+     * 案件类型
+     */
+    private String ajType;
+
+    /**
+     * 所属街道
+     */
+    private String street;
+
+    /**
+     * 案发地址
+     */
+    private String address;
+
+    /**
+     * 网格
+     */
+    private String grid;
+
+    /**
+     * 案件描述
+     */
+    private String ajDescribe;
+
+    /**
+     * 责任部门
+     */
+    private String dutyGroup;
+
+    /**
+     * 0立案1派遣2处理3结案
+     */
+    private Integer ajFlag;
+
+    /**
+     * 截止时间
+     */
+    private Date endTime;
+
+    /**
+     * 附件(非必要)
+     */
+    private String enclosure;
+
+    /**
+     * 图片(非必要)
+     */
+    private String image;
+    /**
+     * 设备编号
+     */
+    private String deviceCode;
+
+
+}

+ 17 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/mapper/WaterAjMapper.java

@@ -0,0 +1,17 @@
+package com.usky.fire.mapper;
+
+
+import com.usky.common.mybatis.core.CrudMapper;
+import com.usky.fire.domain.WaterAj;
+
+/**
+ * <p>
+ * Mapper 接口
+ * </p>
+ *
+ * @author ya
+ * @since 2021-11-05
+ */
+public interface WaterAjMapper extends CrudMapper<WaterAj> {
+
+}

+ 17 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/WaterAjService.java

@@ -0,0 +1,17 @@
+package com.usky.fire.service;
+
+
+import com.usky.common.mybatis.core.CrudService;
+import com.usky.fire.domain.WaterAj;
+
+/**
+ * <p>
+ * 服务类
+ * </p>
+ *
+ * @author ya
+ * @since 2021-11-05
+ */
+public interface WaterAjService extends CrudService<WaterAj> {
+
+}

+ 34 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/api/KafkaTableApi.java

@@ -0,0 +1,34 @@
+package com.usky.fire.service.api;
+
+import com.usky.common.core.util.HttpUtils;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author yq
+ * @date 2021/8/13 9:17
+ */
+@Slf4j
+public class KafkaTableApi {
+
+    private final static String URL = "http://IP:端口/kafka/property";
+
+    public static String uploadTable(String tbName, String dbId, Integer total, List<Object> list) {
+        Map<String, Object> params = new HashMap<>();
+        params.put("tbName", tbName);
+        params.put("dbId", dbId);
+        params.put("total", total);
+        params.put("data", list);
+        String result = "";
+        try {
+            result = HttpUtils.postJson(URL, params, null);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return result;
+    }
+}

+ 126 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/api/mhWater/OneCardApi.java

@@ -0,0 +1,126 @@
+package com.usky.fire.service.api.mhWater;
+
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.usky.common.core.util.HttpUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.security.MessageDigest;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * 一网通卡api
+ *
+ * @author yq
+ * @date 2021/11/4 11:24
+ */
+@Component
+@ConfigurationProperties(prefix = "mhwater")
+@Slf4j
+public class OneCardApi {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /**
+     * token
+     */
+    private static final String ACCESS_TOKEN = "3kqxQdFCS5tcXPrEYZHlLeIgBUm0MDWi";
+
+    /**
+     * 内网地址
+     */
+    @Value("${mhwater.path}")
+    private String path;
+
+    /**
+     * SHA256 加密方法
+     *
+     * @param str 参数:明文密码
+     * @return 密文
+     */
+    public static String getSHA256StrJava(String str) {
+        try {
+            if (StringUtils.isBlank(str)) {
+                return null;
+            }
+
+            MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+            messageDigest.update(str.getBytes("UTF-8"));
+            byte[] bytes = messageDigest.digest();
+
+            StringBuffer stringBuffer = new StringBuffer();
+            String temp;
+            for (int i = 0; i < bytes.length; i++) {
+                temp = Integer.toHexString(bytes[i] & 0xFF);
+                if (temp.length() == 1) {
+                    // 1得到一位的进行补0操作
+                    stringBuffer.append("0");
+                }
+                stringBuffer.append(temp);
+            }
+            return stringBuffer.toString();
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    /**
+     * 获取token
+     *
+     * @return
+     */
+    public Map<String, String> getAccessToken() {
+        //系统当前时间戳
+        Long timestamp = System.currentTimeMillis();
+        //进行明文加密
+        String token = getSHA256StrJava(ACCESS_TOKEN + timestamp);
+        token = token + "&" + timestamp;
+        Map<String, String> headMaps = new HashMap<>();
+        headMaps.put("AccessToken", token);
+        return headMaps;
+    }
+
+    /**
+     * 统一的解析数据
+     *
+     * @param url
+     * @param param
+     * @return
+     */
+    public void sendApiBase(String url, Object param, Consumer<String> consumer) {
+        try {
+            String result = HttpUtils.postJson(url, param, getAccessToken());
+            JsonNode arrNode = MAPPER.readTree(result);
+            if ("0".equals(arrNode.get("status").asText())) {
+                JsonNode data = arrNode.get("data");
+                consumer.accept(data.asText());
+            } else {
+                log.error("闵行水系统接口-----调用异常:" + arrNode.get("msg").asText());
+            }
+        } catch (Exception e) {
+            log.error("系统异常:" + e.getMessage());
+        }
+    }
+
+    /**
+     * 调用心跳接口
+     */
+    public void callInfoApi(Object param) {
+        sendApiBase(String.format("%s%s", path, "/iot/bomb/yt"), param, data -> log.info("获取到的数据" + data));
+    }
+
+
+    /**
+     * 调用告警接口
+     */
+    public void callAlarmApi(Object param) {
+        sendApiBase(String.format("%s%s", path, "/iot/alarm/yt"), param, data -> log.info("获取到的数据" + data));
+    }
+
+}

+ 50 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/config/mqtt/MqttBaseConfig.java

@@ -0,0 +1,50 @@
+package com.usky.fire.service.config.mqtt;
+
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.stereotype.Component;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Data
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttBaseConfig {
+
+	@Value("${mqtt.username}")
+	private String username;
+
+	@Value("${mqtt.password}")
+	private String password;
+
+	@Value("${mqtt.url}")
+	private String hostUrl;
+
+	@Value("${mqtt.sub-topics}")
+	private String msgTopic;
+
+	@Value("${mqtt.keep-alive-interval}")
+	//心跳间隔
+	private int keepAliveInterval;
+	@Value("${mqtt.completionTimeout}")
+	//心跳间隔
+	private int completionTimeout;
+
+
+	@Bean
+	public MqttPahoClientFactory mqttClientFactory() {
+		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+		MqttConnectOptions options = new MqttConnectOptions();
+		options.setServerURIs(new String[]{this.getHostUrl()});
+		options.setUserName(this.getUsername());
+		options.setPassword(this.getPassword().toCharArray());
+		factory.setConnectionOptions(options);
+		return factory;
+	}
+
+}

+ 53 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,53 @@
+package com.usky.fire.service.config.mqtt;
+
+import com.usky.fire.service.enums.TopListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/11/1 16:37
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttInConfig {
+
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     *
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        List<TopListener> parse = TopListener.parse(1);
+        String[] tops = parse.stream().map(TopListener::getCode).toArray(String[]::new);
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), tops);
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 84 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/config/mqtt/MqttOutConfig.java

@@ -0,0 +1,84 @@
+package com.usky.fire.service.config.mqtt;
+
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.handler.annotation.Header;
+
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttOutConfig {
+
+    @Autowired
+    public MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+
+    public static final String MESSAGE_NAME = "messageOut";
+
+    public static final String DEFAULT_TOPIC = "testTopic";
+
+    /**
+     * 连接通道
+     *
+     * @return
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
+     *
+     * @return
+     */
+    @Bean(name = MESSAGE_NAME)
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler outbound() {
+        // 在这里进行mqttOutboundChannel的相关设置
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler =
+                new MqttPahoMessageHandler(clientId, mqttBaseConfig.mqttClientFactory());
+        //如果设置成true,发送消息时将不会阻塞。
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultTopic(DEFAULT_TOPIC);
+        return messageHandler;
+    }
+
+    @MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
+    public interface MqttGateway {
+        /**
+         * 发送消息
+         *
+         * @param payload
+         */
+        void sendToMqtt(String payload);
+
+        /**
+         * 指定top发送消息
+         *
+         * @param topic
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+        /**
+         * 指定队列和qos
+         *
+         * @param topic
+         * @param qos
+         * @param payload
+         */
+        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+    }
+}

+ 76 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/enums/TopListener.java

@@ -0,0 +1,76 @@
+package com.usky.fire.service.enums;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/12/31 11:11
+ */
+public enum TopListener {
+
+    /**
+     * 城运对接
+     */
+    MH_WATER_INFO("fireInfo","mh/water/info",1),
+    MH_WATER_ALERT("fireInfo","mh/water/alert",1),
+    MH_WATER_STATISTICS("fireInfo","mh/water/statistics",1),
+    DEVICE_INFO("fireInfo","device/info",1),
+    DEVICE_ALERT("fireInfo","device/alert",1),
+    DEVICE_DETAIL("fireInfo","device/detail",1),
+    DEVICE_AJ("fireInfo","device/aj",1),
+
+    /**
+     * 全部设备对接
+     */
+    FIRE_INFO("fireInfo","/usky/ytDP0001/+/+/info",1),
+    FIRE_ALERT("fireAlarm","/usky/ytDP0001/+/+/alarm",1),
+    WATER_INFO("waterInfo","/usky/ytDP0002/+/+/info",1),
+    WATER_ALERT("waterAlert","/usky/ytDP0002/+/+/alarm",1),
+    LIQUID_INFO("waterAlert","/usky/ytDP0005/+/+/info",1),
+    LIQUID_ALERT("waterAlert","/usky/ytDP0005/+/+/alarm",1),
+    RTU_INFO("waterAlert","/usky/ytDP0006/+/+/info",1),
+    RTU_ALERT("waterAlert","/usky/ytDP0006/+/+/alarm",1),
+    VIDEO_ALERT("waterAlert","/usky/ytCamCore/+/+/alarm",1);
+
+
+    private String name;
+    private String code;
+    //0发送队列,1监听队列2都是
+    private Integer type;
+
+    TopListener(String name, String code, Integer type){
+        this.name = name;
+        this.code = code;
+        this.type = type;
+    }
+
+    public static TopListener parse(String code){
+        TopListener topListener = null;
+        for (TopListener t:TopListener.values()) {
+            if (t.getCode().equals(code)){
+                topListener = t;
+                break;
+            }
+        }
+        return topListener;
+    }
+    public static List<TopListener> parse(Integer type){
+        List<TopListener> listeners = new ArrayList<>();
+        for (TopListener t:TopListener.values()) {
+            if (t.getType().equals(type)){
+                listeners.add(t);
+            }
+        }
+        return listeners;
+    }
+    public String getCode(){
+        return code;
+    }
+    public String getName(){
+        return name;
+    }
+    public Integer getType(){
+        return type;
+    }
+}

+ 20 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/impl/WaterAjServiceImpl.java

@@ -0,0 +1,20 @@
+package com.usky.fire.service.impl;
+
+import com.usky.common.mybatis.core.AbstractCrudService;
+import com.usky.fire.domain.WaterAj;
+import com.usky.fire.mapper.WaterAjMapper;
+import com.usky.fire.service.WaterAjService;
+import org.springframework.stereotype.Service;
+
+/**
+ * <p>
+ * 服务实现类
+ * </p>
+ *
+ * @author ya
+ * @since 2021-11-05
+ */
+@Service
+public class WaterAjServiceImpl extends AbstractCrudService<WaterAjMapper, WaterAj> implements WaterAjService {
+
+}

+ 74 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/listener/MqttListener.java

@@ -0,0 +1,74 @@
+package com.usky.fire.service.listener;
+
+
+import com.usky.fire.service.config.mqtt.MqttInConfig;
+import com.usky.fire.service.enums.TopListener;
+import com.usky.fire.service.mqtt.SimpleContext;
+import com.usky.fire.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:13
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Slf4j
+@Component
+public class MqttListener {
+
+    public static final String MESSAGE_NAME = "messageInput";
+
+    @Autowired
+    private SimpleContext simpleContext;
+    /**
+     * 处理消息-消费者
+     * @return
+     */
+    @Bean(MESSAGE_NAME)
+    @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
+    public MessageHandler handler() {
+        return message -> {
+            String payload = message.getPayload().toString();
+            //进行接口推送
+            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+            if (null != mqttReceivedTopic){
+                String topic = mqttReceivedTopic.toString();
+                MqttBaseVO mqttBaseVO = new MqttBaseVO();
+                mqttBaseVO.setTopic(topic);
+                if (TopListener.MH_WATER_INFO.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("cy");
+                    mqttBaseVO.setData(payload);
+                }else if (TopListener.MH_WATER_ALERT.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("cy");
+                    mqttBaseVO.setData(payload);
+                }else if (TopListener.MH_WATER_STATISTICS.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("cy");
+                    mqttBaseVO.setData(payload);
+                }else if (TopListener.DEVICE_DETAIL.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("mhwater");
+                    mqttBaseVO.setData(payload);
+                }else if (TopListener.DEVICE_INFO.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("mhwater");
+                    mqttBaseVO.setData(payload);
+                }else if (TopListener.DEVICE_ALERT.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("mhwater");
+                    mqttBaseVO.setData(payload);
+                }else if (TopListener.DEVICE_AJ.getCode().equals(topic)){
+                    mqttBaseVO.setDescribe("mhwater");
+                    mqttBaseVO.setData(payload);
+                }else {
+                    mqttBaseVO.setDescribe("fireInfoAndAlarm");
+                    mqttBaseVO.setData(payload);
+                }
+                //统一处理数据
+                simpleContext.getResource(mqttBaseVO);
+            }
+        };
+    }
+}

+ 20 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/MqttStrategy.java

@@ -0,0 +1,20 @@
+package com.usky.fire.service.mqtt;
+
+
+import com.usky.fire.service.vo.MqttBaseVO;
+
+/**
+ * 策略类
+ *
+ * @author yq
+ * @date 2021/11/3 8:27
+ */
+public interface MqttStrategy {
+    /**
+     * 处理消息(策略模式由子类实现)
+     *
+     * @param mqttBaseVO
+     * @return
+     */
+    String disposeMessage(MqttBaseVO mqttBaseVO);
+}

+ 26 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/SimpleContext.java

@@ -0,0 +1,26 @@
+package com.usky.fire.service.mqtt;
+
+
+import com.usky.fire.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 中间处理消息转发
+ */
+@Service
+public class SimpleContext {
+    @Autowired
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MqttBaseVO mqttBaseVO) {
+        return strategyMap.get(mqttBaseVO.getDescribe()).disposeMessage(mqttBaseVO);
+    }
+}

+ 42 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/cy/CyStrategy.java

@@ -0,0 +1,42 @@
+package com.usky.fire.service.mqtt.cy;
+
+import com.usky.common.core.util.JsonUtils;
+import com.usky.fire.domain.WaterAj;
+import com.usky.fire.service.WaterAjService;
+import com.usky.fire.service.api.mhWater.OneCardApi;
+import com.usky.fire.service.enums.TopListener;
+import com.usky.fire.service.mqtt.MqttStrategy;
+import com.usky.fire.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+ * @author yq
+ * @date 2022/3/4 15:07
+ */
+@Service("cy")
+public class CyStrategy implements MqttStrategy {
+
+
+    @Autowired
+    private OneCardApi oneCardApi;
+    @Autowired
+    private WaterAjService waterAjService;
+    @Override
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+        String topic = mqttBaseVO.getTopic();
+        Map map = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+        if (TopListener.MH_WATER_INFO.getCode().equals(topic)){
+            oneCardApi.callInfoApi(map);
+        }else if (TopListener.MH_WATER_ALERT.getCode().equals(topic)){
+            oneCardApi.callAlarmApi(map);
+        }else if (TopListener.MH_WATER_STATISTICS.getCode().equals(topic)){
+            WaterAj waterAj = JsonUtils.fromJson(mqttBaseVO.getData().toString(), WaterAj.class);
+            waterAjService.saveOrUpdate(waterAj);
+        }
+        return null;
+
+    }
+}

+ 90 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/fire/FireStrategy.java

@@ -0,0 +1,90 @@
+package com.usky.fire.service.mqtt.fire;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.usky.common.core.util.BeanMapperUtils;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.fire.domain.SpOwnerCompany;
+import com.usky.fire.domain.TbAlarm;
+import com.usky.fire.service.SpOwnerCompanyService;
+import com.usky.fire.service.TbAlarmService;
+import com.usky.fire.service.TbInfoService;
+import com.usky.fire.service.mqtt.MqttStrategy;
+import com.usky.fire.service.vo.FireAlarmProperty;
+import com.usky.fire.service.vo.FireAlarmVO;
+import com.usky.fire.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/11/9 17:03
+ */
+@Service("fireInfoAndAlarm")
+public class FireStrategy implements MqttStrategy {
+    @Autowired
+    private TbAlarmService tbAlarmService;
+    @Autowired
+    private TbInfoService tbInfoService;
+    @Autowired
+    private SpOwnerCompanyService spOwnerCompanyService;
+
+    @Override
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+        LambdaQueryWrapper<SpOwnerCompany> queryWrapper = Wrappers.lambdaQuery();
+        queryWrapper.select(SpOwnerCompany::getOwnerId);
+        List<SpOwnerCompany> list = spOwnerCompanyService.list(queryWrapper);
+        String topic = mqttBaseVO.getTopic();
+        topic = topic.split("/")[3];
+        String finalTopic = topic;
+        list.stream().filter(s -> s.getOwnerId().equals(finalTopic))
+                .findAny().ifPresent(s -> {
+            FireAlarmVO fireAlarmVo = JsonUtils.fromJson(mqttBaseVO.getData().toString(), FireAlarmVO.class);
+            TbAlarm tbAlarm = this.enhanceData(fireAlarmVo);
+            if ("ALARM".equals(fireAlarmVo.getType())) {
+                tbAlarmService.save(tbAlarm);
+
+            } else if ("STATE".equals(fireAlarmVo.getType())) {
+//            LambdaUpdateWrapper<DeviceStatus> updateWrapper = Wrappers.lambdaUpdate();
+//            updateWrapper.set(DeviceStatus::getDevicestatus,fireAlarmVo.getDevState())
+//                    .eq(DeviceStatus::getDeviceid,fireAlarmVo.getDevId());
+//            deviceStatusService.update(updateWrapper);
+            }
+        });
+
+        return null;
+    }
+
+    /**
+     * 增强数据
+     *
+     * @return
+     */
+    public TbAlarm enhanceData(FireAlarmVO fireAlarmVO) {
+        TbAlarm tbAlarm = new TbAlarm();
+        tbAlarm.setCreateTime(LocalDateTime.now());
+        tbAlarm.setDeviceId(fireAlarmVO.getDevId());
+        tbAlarm.setDeviceName(fireAlarmVO.getDeviceName());
+        tbAlarm.setAlarmTime(LocalDateTime.now());
+        if ("ALARM".equals(fireAlarmVO.getType())) {
+            List<FireAlarmProperty> dp = fireAlarmVO.getAlarams().get(0).getDp();
+            if (CollectionUtils.isNotEmpty(dp)) {
+                BeanMapperUtils.copy(dp.get(0), tbAlarm);
+            }
+        } else {
+            tbAlarm.setAlarmContent(JsonUtils.toJson(fireAlarmVO));
+        }
+        return tbAlarm;
+    }
+
+    public static void main(String[] args) {
+        String topic = "/usky/ytDP0001/+/+/info";
+
+        String[] split = topic.split("/");
+        System.out.println(split[3]);
+    }
+}

+ 59 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/mqtt/mh/MhStrategy.java

@@ -0,0 +1,59 @@
+package com.usky.fire.service.mqtt.mh;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.fire.domain.Device;
+import com.usky.fire.domain.DeviceAj;
+import com.usky.fire.domain.DeviceAlert;
+import com.usky.fire.domain.DeviceInfo;
+import com.usky.fire.service.DeviceAjService;
+import com.usky.fire.service.DeviceAlertService;
+import com.usky.fire.service.DeviceInfoService;
+import com.usky.fire.service.DeviceService;
+import com.usky.fire.service.enums.TopListener;
+import com.usky.fire.service.mqtt.MqttStrategy;
+import com.usky.fire.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2022/3/4 15:07
+ */
+@Service("mhwater")
+public class MhStrategy implements MqttStrategy {
+
+    @Autowired
+    private DeviceService deviceService;
+    @Autowired
+    private DeviceInfoService deviceInfoService;
+    @Autowired
+    private DeviceAlertService deviceAlertService;
+    @Autowired
+    private DeviceAjService deviceAjService;
+
+    @Override
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+
+        String topic = mqttBaseVO.getTopic();
+        String payload = mqttBaseVO.getData().toString();
+        if (TopListener.DEVICE_DETAIL.getCode().equals(topic)) {
+            //设备录入
+            List<Device> devices = JsonUtils.fromJson(payload, new TypeReference<List<Device>>() {
+            });
+            deviceService.saveOrUpdateBatch(devices);
+        } else if (TopListener.DEVICE_INFO.getCode().equals(topic)) {
+            DeviceInfo deviceInfo = JsonUtils.fromJson(payload, DeviceInfo.class);
+            deviceInfoService.save(deviceInfo);
+        } else if (TopListener.DEVICE_ALERT.getCode().equals(topic)) {
+            DeviceAlert deviceAlert = JsonUtils.fromJson(payload, DeviceAlert.class);
+            deviceAlertService.save(deviceAlert);
+        } else if (TopListener.DEVICE_AJ.getCode().equals(topic)) {
+            DeviceAj deviceAj = JsonUtils.fromJson(payload, DeviceAj.class);
+            deviceAjService.saveOrUpdate(deviceAj);
+        }
+        return null;
+    }
+}

+ 41 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/FireAlarmDevice.java

@@ -0,0 +1,41 @@
+package com.usky.fire.service.vo;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * 告警设备
+ * @author yq
+ * @date 2021/11/10 8:11
+ */
+@Data
+public class FireAlarmDevice {
+    /**
+     * 设备属性集合
+     */
+    private List<FireAlarmProperty> dp;
+
+    /**
+     * 设备名称
+     */
+    private String deviceName;
+    /**
+     * 时间
+     */
+    private String timeStamp;
+    /**
+     * 设备类型
+     */
+    private String deviceType;
+    /**
+     *设备型号
+     */
+    private String deviceModel;
+    /**
+     * 设备id
+     */
+    private String devId;
+
+
+}

+ 45 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/FireAlarmProperty.java

@@ -0,0 +1,45 @@
+package com.usky.fire.service.vo;
+
+import lombok.Data;
+
+/**
+ * 告警设备属性
+ * @author yq
+ * @date 2021/11/10 8:08
+ */
+@Data
+public class FireAlarmProperty {
+
+    /**
+     * 属性
+     */
+    private String  property;
+    /**
+     * value
+     */
+    private String  alertValue;
+    /**
+     * 状态
+     */
+    private String  status;
+    /**
+     * 报警内容运行状态
+     */
+    private String  alertType;
+    /**
+     * 序号
+     */
+    private String  serial;
+    /**
+     * 地址
+     */
+    private String  address;
+    /**
+     * 端口
+     */
+    private String  port;
+    /**
+     * 设备状态名称
+     */
+    private String  stuname;
+}

+ 50 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/FireAlarmVO.java

@@ -0,0 +1,50 @@
+package com.usky.fire.service.vo;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * 火灾告警心跳信息
+ * @author yq
+ * @date 2021/11/10 8:06
+ */
+@Data
+public class FireAlarmVO {
+    /**
+     * 设备编号
+     */
+    private String devId;
+
+    /**
+     *传输方式
+     */
+    private String connType;
+    /**
+     * 设备名称
+      */
+    private String deviceName;
+    /**
+     * 告警类型心跳或者告警
+     */
+    private String type;
+    /**
+     * 时间戳
+      */
+    private String timeStamp;
+
+    /**
+     * 心跳设备信息
+     */
+    private List<FireAlarmDevice> devs;
+
+
+    /**
+     * 告警设备信息
+     */
+    private List<FireAlarmDevice> alarams;
+    /**
+     * 设备状态
+      */
+    private Integer devState;
+}

+ 21 - 0
service-fire/service-fire-biz/src/main/java/com/usky/fire/service/vo/MqttBaseVO.java

@@ -0,0 +1,21 @@
+package com.usky.fire.service.vo;
+
+import lombok.Data;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:32
+ */
+@Data
+public class MqttBaseVO {
+    /**
+     * 接口描述
+     */
+    private String describe;
+
+    private String topic;
+    /**
+     * 数据内容
+      */
+    private Object data;
+}

+ 21 - 0
service-fire/service-fire-biz/src/main/resources/mapper/fire/WaterAjMapper.xml

@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.usky.fire.mapper.WaterAjMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.usky.fire.domain.WaterAj">
+        <id column="id" property="id"/>
+        <result column="create_time" property="createTime"/>
+        <result column="aj_type" property="ajType"/>
+        <result column="street" property="street"/>
+        <result column="address" property="address"/>
+        <result column="grid" property="grid"/>
+        <result column="aj_describe" property="ajDescribe"/>
+        <result column="duty_group" property="dutyGroup"/>
+        <result column="aj_flag" property="ajFlag"/>
+        <result column="end_time" property="endTime"/>
+        <result column="enclosure" property="enclosure"/>
+        <result column="image" property="image"/>
+    </resultMap>
+
+</mapper>