Parcourir la source

订阅消防物联网告警信息

hanzhengyi il y a 1 an
Parent
commit
a5f1fd7d15

+ 53 - 0
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/config/mqtt/MqttInConfig.java

@@ -0,0 +1,53 @@
+package com.usky.alarm.service.config.mqtt;
+
+import com.usky.alarm.service.enums.TopListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+
+import java.util.List;
+
+/**
+ * @author yq
+ * @date 2021/11/1 16:37
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Configuration
+public class MqttInConfig {
+
+    @Autowired
+    private MqttBaseConfig mqttBaseConfig;
+
+    public static final String CHANNEL_NAME_INPUT = "mqttInputChannel";
+
+    @Bean(name = CHANNEL_NAME_INPUT)
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 消息订阅绑定-消费者
+     *
+     * @return
+     */
+    @Bean
+    public MessageProducer inbound() {
+        List<TopListener> parse = TopListener.parse(1);
+        String[] tops = parse.stream().map(TopListener::getCode).toArray(String[]::new);
+        String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
+                mqttBaseConfig.mqttClientFactory(), tops);
+        adapter.setCompletionTimeout(mqttBaseConfig.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2);
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+}

+ 2 - 22
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/enums/TopListener.java

@@ -1,4 +1,4 @@
-package com.usky.iot.service.enums;
+package com.usky.alarm.service.enums;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -8,37 +8,17 @@ import java.util.List;
  * @date 2021/12/31 11:11
  */
 public enum TopListener {
-
-    /**
-     * 城运对接
-     */
-    MH_WATER_INFO("fireInfo","mh/water/info",1),
-    MH_WATER_ALERT("fireInfo","mh/water/alert",1),
-    MH_WATER_STATISTICS("fireInfo","mh/water/statistics",1),
-    DEVICE_INFO("fireInfo","device/info",1),
-    DEVICE_ALERT("fireInfo","device/alert",1),
-    DEVICE_DETAIL("fireInfo","device/detail",1),
-    DEVICE_AJ("fireInfo","device/aj",1),
-
     /**
-     * 全部设备对接
+     * 全部设备告警对接
      */
-    FIRE_INFO("fireInfo","/usky/ytDP0001/+/+/info",1),
     FIRE_ALERT("fireAlarm","/usky/ytDP0001/+/+/alarm",1),
-    WATER_INFO("waterInfo","/usky/ytDP0002/+/+/info",1),
     WATER_ALERT("waterAlert","/usky/ytDP0002/+/+/alarm",1),
-    SMOKE_INFO("smokeInfo","/usky/ytDP0003/+/+/info",1),
     SMOKE_ALERT("smokeAlarm","/usky/ytDP0003/+/+/alarm",1),
-    LIQUID_INFO("waterInfo","/usky/ytDP0005/+/+/info",1),
     LIQUID_ALERT("waterAlert","/usky/ytDP0005/+/+/alarm",1),
-    RTU_INFO("rtuinfo","/usky/ytDP0006/+/+/info",1),
     RTU_ALERT("rtuAlert","/usky/ytDP0006/+/+/alarm",1),
-    ELECTRICAL_INFO("electricalInfo","/usky/ytDP0007/+/+/info",1),
     ELECTRICAL_ALERT("electricalAlarm","/usky/ytDP0007/+/+/alarm",1),
-    MH_COVER_INFO("waterInfo","/usky/ytDP0008/+/+/info",1),
     MH_COVER_ALERT("waterAlert","/usky/ytDP0008/+/+/alarm",1),
     VIDEO_ALERT("videoAlert","/usky/ytCamCore/+/+/alarm",1),
-    LR_SMOKE_INFO("smokeInfo","/usky/ytDP00033/+/+/info",1),
     LR_SMOKE_ALERT("smokeAlarm","/usky/ytDP00033/+/+/alarm",1);
 
 

+ 69 - 0
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/listener/MqttListener.java

@@ -0,0 +1,69 @@
+package com.usky.alarm.service.listener;
+
+import com.usky.alarm.service.config.mqtt.MqttInConfig;
+import com.usky.alarm.service.enums.TopListener;
+import com.usky.alarm.service.mqtt.SimpleContext;
+import com.usky.alarm.service.vo.MqttBaseVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author yq
+ * @date 2021/11/3 8:13
+ */
+@ConditionalOnProperty(prefix = "mqtt", value = {"enabled"}, havingValue = "true")
+@Slf4j
+@Component
+public class MqttListener {
+
+    public static final String MESSAGE_NAME = "messageInput";
+
+    @Autowired
+    private SimpleContext simpleContext;
+
+    /**
+     * 处理消息-消费者
+     *
+     * @return
+     */
+    @Bean(MESSAGE_NAME)
+    @ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
+    public MessageHandler handler() {
+        return message -> {
+            String payload = message.getPayload().toString();
+            //进行接口推送
+            String[] code1 = TopListener.FIRE_ALERT.getCode().split("/");
+            String[] code2 = TopListener.WATER_ALERT.getCode().split("/");
+            String[] code3 = TopListener.RTU_ALERT.getCode().split("/");
+            String[] code4 = TopListener.VIDEO_ALERT.getCode().split("/");
+            String[] code5 = TopListener.SMOKE_ALERT.getCode().split("/");
+            String[] code6 = TopListener.ELECTRICAL_ALERT.getCode().split("/");
+            String[] code7 = TopListener.LIQUID_ALERT.getCode().split("/");
+            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+            if (null != mqttReceivedTopic) {
+                String topic = mqttReceivedTopic.toString();
+                MqttBaseVO mqttBaseVO = new MqttBaseVO();
+                mqttBaseVO.setTopic(topic);
+                if ((topic.indexOf(code1[2]) != -1 && topic.indexOf(code1[5]) != -1) ||
+                        (topic.indexOf(code2[2]) != -1 && topic.indexOf(code2[5]) != -1) ||
+                        (topic.indexOf(code3[2]) != -1 && topic.indexOf(code3[5]) != -1) ||
+                        (topic.indexOf(code4[2]) != -1 && topic.indexOf(code4[5]) != -1) ||
+                        (topic.indexOf(code5[2]) != -1 && topic.indexOf(code5[5]) != -1) ||
+                        (topic.indexOf(code6[2]) != -1 && topic.indexOf(code6[5]) != -1) ||
+                        (topic.indexOf(code7[2]) != -1 && topic.indexOf(code7[5]) != -1)) {
+                    mqttBaseVO.setDescribe("alarm");
+                }else {
+                    mqttBaseVO.setDescribe("fireInfoAndAlarm");
+                }
+                mqttBaseVO.setData(payload);
+                //统一处理数据
+                simpleContext.getResource(mqttBaseVO);
+            }
+        };
+    }
+}

+ 1 - 0
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/mqtt/MqttStrategy.java

@@ -1,6 +1,7 @@
 package com.usky.alarm.service.mqtt;
 
 
+import com.aliyuncs.exceptions.ClientException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.usky.alarm.service.vo.MqttBaseVO;
 

+ 26 - 0
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/mqtt/SimpleContext.java

@@ -0,0 +1,26 @@
+package com.usky.alarm.service.mqtt;
+
+
+import com.usky.alarm.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 中间处理消息转发
+ */
+@Service
+public class SimpleContext {
+    @Autowired
+    private final Map<String, MqttStrategy> strategyMap = new ConcurrentHashMap<>();
+
+    public SimpleContext(Map<String, MqttStrategy> strategyMap) {
+        strategyMap.forEach(this.strategyMap::put);
+    }
+
+    public String getResource(MqttBaseVO mqttBaseVO) {
+        return strategyMap.get(mqttBaseVO.getDescribe()).disposeMessage(mqttBaseVO);
+    }
+}

+ 93 - 0
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/mqtt/alarm/Alarm.java

@@ -0,0 +1,93 @@
+package com.usky.alarm.service.mqtt.alarm;
+
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.usky.alarm.service.enums.TopListener;
+import com.usky.common.core.util.JsonUtils;
+import com.usky.alarm.domain.*;
+import com.usky.alarm.service.*;
+import com.usky.alarm.service.mqtt.MqttStrategy;
+import com.usky.alarm.service.vo.MqttBaseVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author yq
+ * @date 2022/3/4 15:07
+ */
+@Service("alarm")
+public class Alarm implements MqttStrategy {
+
+    @Autowired
+    private BaseAlarmService baseAlarmService;
+
+    @Autowired
+    private DmpDeviceService dmpDeviceService;
+
+    @Override
+    public String disposeMessage(MqttBaseVO mqttBaseVO) {
+        Map map = JsonUtils.fromJson(mqttBaseVO.getData().toString(), Map.class);
+        List<Map<String, Object>> alarams = (List) JSONObject.parseArray(JSONObject.toJSONString(map.get("alarams")));
+        List<Map<String, Object>> dp = (List) JSONObject.parseArray(JSONObject.toJSONString(alarams.get(0).get("dp")));
+        BaseAlarm baseAlarm = new BaseAlarm();
+        long timestamp = Long.valueOf(alarams.get(0).get("timeStamp").toString()).longValue();
+        LocalDateTime time2 = LocalDateTime.ofEpochSecond(timestamp, 0, ZoneOffset.ofHours(8));
+        String topic = mqttBaseVO.getTopic();
+        String[] code4 = TopListener.VIDEO_ALERT.getCode().split("/");
+        LambdaQueryWrapper<DmpDevice> queryWrapper1 = Wrappers.lambdaQuery();
+        queryWrapper1.select(DmpDevice::getProductCode)
+                .eq(DmpDevice::getDeviceId, map.get("devId").toString())
+                .eq(DmpDevice::getDeleteFlag, 0);
+        List<DmpDevice> list = dmpDeviceService.list(queryWrapper1);
+        baseAlarm.setDeviceId(alarams.get(0).get("devId").toString());
+        baseAlarm.setAlarmTime(time2);
+        baseAlarm.setAlarmType(dp.get(0).get("serial").toString());
+        baseAlarm.setAlarmObject(alarams.get(0).get("deviceName").toString());
+        baseAlarm.setAlarmData(dp.get(0).get("status").toString());
+        baseAlarm.setAlarmAttribute(dp.get(0).get("property").toString());
+        if (alarams.get(0).get("deviceType").equals("1")){
+            baseAlarm.setAlarmContent(dp.get(0).get("property").toString()+","+dp.get(0).get("value").toString());
+            baseAlarm.setAlarmGrade(1);
+            baseAlarm.setAlarmAddress(dp.get(0).get("value").toString());
+        }else if (alarams.get(0).get("deviceType").equals("2")){
+            baseAlarm.setAlarmContent(dp.get(0).get("property").toString()+","+dp.get(0).get("value").toString());
+            baseAlarm.setAlarmGrade(2);
+            baseAlarm.setAlarmAddress(alarams.get(0).get("deviceName").toString());
+        }else if (alarams.get(0).get("deviceType").equals("3")){
+            baseAlarm.setAlarmContent(dp.get(0).get("property").toString());
+            baseAlarm.setAlarmGrade(2);
+            baseAlarm.setAlarmAddress(alarams.get(0).get("deviceName").toString());
+        }else if (alarams.get(0).get("deviceType").equals("6")){
+            baseAlarm.setAlarmContent(dp.get(0).get("property").toString()+","+dp.get(0).get("stuname").toString());
+            baseAlarm.setAlarmGrade(2);
+            baseAlarm.setAlarmAddress(alarams.get(0).get("deviceName").toString());
+        }else if (alarams.get(0).get("deviceType").equals("7")){
+            baseAlarm.setAlarmContent(dp.get(0).get("property").toString());
+            baseAlarm.setAlarmGrade(2);
+            baseAlarm.setAlarmAddress(alarams.get(0).get("deviceName").toString());
+        }else if (topic.indexOf(code4[2]) != -1 && topic.indexOf(code4[5]) != -1){
+            baseAlarm.setAlarmContent(dp.get(0).get("property").toString());
+            baseAlarm.setAlarmGrade(2);
+            baseAlarm.setAlarmAddress(alarams.get(0).get("deviceName").toString());
+            baseAlarm.setSitePhoto(dp.get(0).get("value").toString());
+        }
+        if (CollectionUtils.isNotEmpty(list)) {
+            baseAlarm.setProductCode(list.get(0).getProductCode());
+            try {
+                Boolean addFlag = baseAlarmService.add(baseAlarm);
+            }catch (Exception a){
+
+            }
+        }
+        return null;
+    }
+
+
+}

+ 136 - 0
service-alarm/service-alarm-biz/src/main/java/com/usky/alarm/service/util/HttpClientUtils.java

@@ -0,0 +1,136 @@
+package com.usky.alarm.service.util;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ 1. 跨域请求工具类
+ */
+public class HttpClientUtils {
+
+    public static String doGet(String url, Map<String, String> param) {
+
+        // 创建Httpclient对象
+        CloseableHttpClient httpclient = HttpClients.createDefault();
+
+        String resultString = "";
+        CloseableHttpResponse response = null;
+        try {
+            // 创建uri
+            URIBuilder builder = new URIBuilder(url);
+            if (param != null) {
+                for (String key : param.keySet()) {
+                    builder.addParameter(key, param.get(key));
+                }
+            }
+            URI uri = builder.build();
+
+            // 创建http GET请求
+            HttpGet httpGet = new HttpGet(uri);
+
+            // 执行请求
+            response = httpclient.execute(httpGet);
+            // 判断返回状态是否为200
+            if (response.getStatusLine().getStatusCode() == 200) {
+                resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                if (response != null) {
+                    response.close();
+                }
+                httpclient.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        return resultString;
+    }
+
+    public static String doGet(String url) {
+        return doGet(url, null);
+    }
+
+    public static String doPost(String url, Map<String, String> param) {
+        // 创建Httpclient对象
+        CloseableHttpClient httpClient = HttpClients.createDefault();
+        CloseableHttpResponse response = null;
+        String resultString = "";
+        try {
+            // 创建Http Post请求
+            HttpPost httpPost = new HttpPost(url);
+            // 创建参数列表
+            if (param != null) {
+                List<NameValuePair> paramList = new ArrayList<>();
+                for (String key : param.keySet()) {
+                    paramList.add(new BasicNameValuePair(key, param.get(key)));
+                }
+                // 模拟表单
+                UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList);
+                httpPost.setEntity(entity);
+            }
+            // 执行http请求
+            response = httpClient.execute(httpPost);
+            resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        return resultString;
+    }
+
+    public static String doPost(String url) {
+        return doPost(url, null);
+    }
+
+    public static String doPostJson(String url, String json) {
+        // 创建Httpclient对象
+        CloseableHttpClient httpClient = HttpClients.createDefault();
+        CloseableHttpResponse response = null;
+        String resultString = "";
+        try {
+            // 创建Http Post请求
+            HttpPost httpPost = new HttpPost(url);
+            // 创建请求内容
+            StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+            httpPost.setEntity(entity);
+            // 执行http请求
+            response = httpClient.execute(httpPost);
+            resultString = EntityUtils.toString(response.getEntity(), "utf-8");
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        return resultString;
+    }
+}