Browse Source

集成mqtt

yq 3 years ago
parent
commit
ce48f5fe3b

+ 4 - 0
mhfire-controller/src/main/resources/application-dev.properties

@@ -76,6 +76,10 @@ mqtt.sub-topics=mh/water/info,mh/water/alert,mh/water/statistics
 mqtt.url=tcp://47.98.201.73:1883
 
 
+# ¶Ô½ÓãÉÐеÄÍøÂç
+mhwater.path=http://180.169.15.121:1680
+
+
 # eureka
 #eureka.client.service-url.defaultZone=http://172.31.101.251:8099/eureka/,http://172.31.101.252:8099/eureka/
 #eureka.client.service-url.defaultZone=http://localhost:8088/eureka/

+ 3 - 0
mhfire-controller/src/main/resources/application-prod.properties

@@ -71,6 +71,9 @@ mqtt.password=usky
 mqtt.sub-topics=/usky/ytDP0007/+/+/info
 mqtt.url=tcp://47.98.201.73:1883
 
+# ¶Ô½ÓãÉÐеÄÍøÂç
+mhwater.path=http://32.0.15.62:1680
+
 # eureka
 #eureka.client.service-url.defaultZone=http://172.31.101.251:8099/eureka/,http://172.31.101.252:8099/eureka/
 #eureka.client.service-url.defaultZone=http://localhost:8088/eureka/

+ 35 - 12
mhfire-controller/src/main/resources/application-test.properties

@@ -2,7 +2,7 @@ debug=true
 spring.main.lazy-initialization=false
 spring.main.allow-bean-definition-overriding=true
 # application
-server.port=8082
+server.port=8083
 # mybatis-plus
 mybatis-plus.mapper-locations=classpath*:mapper/**/*.xml
 mybatis-plus.configuration.lazy-loading-enabled=true
@@ -15,15 +15,15 @@ mybatis.refresh.delay-seconds=10
 mybatis.refresh.sleep-seconds=20
 # datasource
 spring.autoconfigure.exclude=com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
-spring.datasource.dynamic.primary=product
-spring.datasource.dynamic.datasource.product.url=jdbc:mysql://dev1.shuqian.com:3306/product?allowMultiQueries=true&createDatabaseIfNotExist=true&autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&useCompression=true
-spring.datasource.dynamic.datasource.product.username=dev
-spring.datasource.dynamic.datasource.product.password=Coozo0628
-#老库
-spring.datasource.dynamic.datasource.old.url=jdbc:mysql://dev1.shuqian.com:3306/amazonold?allowMultiQueries=true&createDatabaseIfNotExist=true&autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true
-spring.datasource.dynamic.datasource.old.username=dev
-spring.datasource.dynamic.datasource.old.password=Coozo0628
-spring.datasource.dynamic.druid.initial-size=5
+spring.datasource.dynamic.primary=mhfire
+spring.datasource.dynamic.datasource.mhfire.url=jdbc:mysql://101.133.214.75:3306/mhfire?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&allowMultiQueries=true
+spring.datasource.dynamic.datasource.mhfire.username=root
+spring.datasource.dynamic.datasource.mhfire.password=123456
+#蓝小帮
+spring.datasource.dynamic.datasource.bulehelp.url=jdbc:mysql://101.133.214.75:3306/bulehelp?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC&characterEncoding=utf8
+spring.datasource.dynamic.datasource.bulehelp.username=root
+spring.datasource.dynamic.datasource.bulehelp.password=123456
+spring.datasource.dynamic.druid.initial-size=5                                                                       
 spring.datasource.dynamic.druid.min-idle=5
 spring.datasource.dynamic.druid.max-active=30
 spring.datasource.dynamic.druid.max-wait=60000
@@ -36,6 +36,7 @@ spring.datasource.dynamic.druid.max-pool-prepared-statement-per-connection-size=
 spring.datasource.dynamic.druid.time-between-eviction-runs-millis=60000
 spring.datasource.dynamic.druid.min-evictable-idle-time-millis=300000
 spring.datasource.dynamic.druid.filters=wall,stat,log4j2
+spring.datasource.dynamic.druid.wall.multi-statement-allow=true
 # druid \u76D1\u63A7 WebStatFilter\u914D\u7F6E
 spring.datasource.druid.web-stat-filter.enabled=true
 spring.datasource.druid.web-stat-filter.url-pattern=/*
@@ -54,8 +55,8 @@ spring.datasource.druid.filter.slf4j.statement-close-after-log-enabled=false
 spring.datasource.druid.filter.slf4j.result-set-open-after-log-enabled=false
 spring.datasource.druid.filter.slf4j.result-set-close-after-log-enabled=false
 # jackson
-spring.jackson.date-format=yyyy-MM-dd'T'HH:mm:ssZ
-spring.jackson.time-zone=GMT+0
+spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
+spring.jackson.time-zone=GMT+8
 spring.jackson.default-property-inclusion=always
 #spring.jackson.serialization.indent_output=true
 spring.jackson.serialization.fail-on-empty-beans=false
@@ -65,3 +66,25 @@ spring.jackson.parser.allow-single-quotes=true
 # gzip
 server.compression.enabled=true
 server.compression.mime-types=application/javascript,text/css,application/json,application/xml,text/html,text/xml,text/plain
+
+#mqttt
+mqtt.completionTimeout=5000
+mqtt.keep-alive-interval=60
+mqtt.username=usky
+mqtt.password=usky
+mqtt.sub-topics=mh/water/info,mh/water/alert,mh/water/statistics
+mqtt.url=tcp://47.98.201.73:1883
+
+# 对接闵行的网络
+mhwater.path=http://180.169.15.121:1680
+
+
+# eureka
+#eureka.client.service-url.defaultZone=http://172.31.101.251:8099/eureka/,http://172.31.101.252:8099/eureka/
+#eureka.client.service-url.defaultZone=http://localhost:8088/eureka/
+##eureka.client.healthcheck.enabled=true
+#eureka.instance.prefer-ip-address=true
+#eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ip-address}:${server.port}
+#eureka.instance.lease-renewal-interval-in-seconds=30
+#eureka.instance.lease-expiration-duration-in-seconds=90
+#eureka.client.registryFetchIntervalSeconds=30

+ 0 - 19
mhfire-service/src/main/java/com/bizmatics/mhfire/service/api/OneCardApi.java

@@ -1,19 +0,0 @@
-package com.bizmatics.mhfire.service.api;
-
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * 一网通卡api
- * @author yq
- * @date 2021/11/4 11:24
- */
-@Slf4j
-public class OneCardApi {
-    private static final String URL = "http://32.1.7.42";
-
-    /**
-     * 推送地址
-     */
-    private static final String LOGIN_URL = String.format("%s%s",URL,"/mhxfzd/mhapi/bsUser/user/login");
-
-}

+ 128 - 0
mhfire-service/src/main/java/com/bizmatics/mhfire/service/api/mhWater/OneCardApi.java

@@ -0,0 +1,128 @@
+package com.bizmatics.mhfire.service.api.mhWater;
+
+import com.bizmatics.common.core.util.HttpUtils;
+import com.bizmatics.common.core.util.StringUtils;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * 一网通卡api
+ * @author yq
+ * @date 2021/11/4 11:24
+ */
+@Component
+@ConfigurationProperties(prefix = "mhwater")
+@Slf4j
+public class OneCardApi {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /**
+     * token
+     */
+    private static final String ACCESS_TOKEN = "";
+
+    /**
+     * 内网地址
+     */
+    @Value("${mhwater.path}")
+    private String path = "http://32.0.15.62:1680";
+
+    /**
+     *  SHA256 加密方法
+     * @param str 参数:明文密码
+     * @return 密文
+     */
+    public static String getSHA256StrJava(String str) {
+        try {
+            if(StringUtils.isBlank(str)){
+                return null;
+            }
+
+            MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+            messageDigest.update(str.getBytes("UTF-8"));
+            byte[] bytes = messageDigest.digest();
+
+            StringBuffer stringBuffer = new StringBuffer();
+            String temp;
+            for (int i = 0; i < bytes.length; i++) {
+                temp = Integer.toHexString(bytes[i] & 0xFF);
+                if (temp.length() == 1) {
+                    // 1得到一位的进行补0操作
+                    stringBuffer.append("0");
+                }
+                stringBuffer.append(temp);
+            }
+            return stringBuffer.toString();
+        }catch (Exception e){
+            return null;
+        }
+    }
+
+    /**
+     * 获取token
+     * @return
+     */
+    public Map<String,String> getAccessToken(){
+        //系统当前时间戳
+        Long timestamp = System.currentTimeMillis();
+        //进行明文加密
+        String token = getSHA256StrJava(ACCESS_TOKEN + timestamp);
+        token = token + "&" + timestamp;
+        Map<String,String> headMaps = new HashMap<>();
+        headMaps.put("AccessToken",token);
+        return headMaps;
+    }
+
+    /**
+     * 统一的解析数据
+     * @param url
+     * @param param
+     * @return
+     */
+    public void sendApiBase(String url, Object param, Consumer<String> consumer){
+        try {
+            String result = HttpUtils.postJson(url, param, getAccessToken());
+            JsonNode arrNode = MAPPER.readTree(result);
+            if ("0".equals(arrNode.get("status").asText())){
+                JsonNode data = arrNode.get("data");
+                consumer.accept(data.asText());
+                log.info("闵行水系统接口-----调用成功");
+            }else {
+                log.info("闵行水系统接口-----调用异常:"+arrNode.get("msg").asText());
+            }
+        } catch (IOException e) {
+            log.info("系统异常:"+e.getMessage());
+        }
+    }
+
+    /**
+     * 调用心跳接口
+     */
+    public void callInfoApi(String param){
+        log.info("闵行水系统心跳---开始");
+        sendApiBase(String.format("%s%s",path,"/iot/bomb/{XX}"),param,data -> log.info("获取到的数据"+ data));
+        log.info("闵行水系统心跳---结束");
+    }
+
+
+    /**
+     * 调用告警接口
+     */
+    public void callAlarmApi(String param){
+        log.info("闵行水系统告警---开始");
+        sendApiBase(String.format("%s%s",path,"/iot/alarm/{XX}"),param,data -> log.info("获取到的数据"+ data));
+        log.info("闵行水系统告警---结束");
+    }
+
+}

+ 13 - 1
mhfire-service/src/main/java/com/bizmatics/mhfire/service/listener/MqttListener.java

@@ -1,7 +1,9 @@
 package com.bizmatics.mhfire.service.listener;
 
+import com.bizmatics.mhfire.service.api.mhWater.OneCardApi;
 import com.bizmatics.mhfire.service.config.mqtt.MqttInConfig;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.messaging.MessageHandler;
@@ -14,6 +16,8 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class MqttListener {
+    @Autowired
+    private OneCardApi oneCardApi;
 
     public static final String MESSAGE_NAME = "messageInput";
 
@@ -28,7 +32,15 @@ public class MqttListener {
             log.info("mqtt-----接受到的消息"+message);
             String payload = message.getPayload().toString();
             //进行接口推送
-
+            Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
+            if (null != mqttReceivedTopic){
+                String topic = mqttReceivedTopic.toString();
+                if (topic.equals("mh/water/info")){
+                    oneCardApi.callInfoApi(payload);
+                }else if (topic.equals("mh/water/alert")){
+                    oneCardApi.callAlarmApi(payload);
+                }
+            }
         };
     }
 }