|
@@ -1,11 +1,9 @@
|
|
|
package com.bizmatics.mhfire.service.listener;
|
|
|
|
|
|
-import com.bizmatics.common.spring.util.JsonUtils;
|
|
|
-import com.bizmatics.mhfire.model.*;
|
|
|
-import com.bizmatics.mhfire.service.*;
|
|
|
-import com.bizmatics.mhfire.service.api.mhWater.OneCardApi;
|
|
|
import com.bizmatics.mhfire.service.config.mqtt.MqttInConfig;
|
|
|
-import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
+import com.bizmatics.mhfire.service.enums.TopListener;
|
|
|
+import com.bizmatics.mhfire.service.mqtt.SimpleContext;
|
|
|
+import com.bizmatics.mhfire.service.vo.MqttBaseVO;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
@@ -14,9 +12,6 @@ import org.springframework.integration.annotation.ServiceActivator;
|
|
|
import org.springframework.messaging.MessageHandler;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
/**
|
|
|
* @author yq
|
|
|
* @date 2021/11/3 8:13
|
|
@@ -25,21 +20,11 @@ import java.util.Map;
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
public class MqttListener {
|
|
|
- @Autowired
|
|
|
- private OneCardApi oneCardApi;
|
|
|
- @Autowired
|
|
|
- private WaterAjService waterAjService;
|
|
|
|
|
|
public static final String MESSAGE_NAME = "messageInput";
|
|
|
- @Autowired
|
|
|
- private DeviceService deviceService;
|
|
|
- @Autowired
|
|
|
- private DeviceInfoService deviceInfoService;
|
|
|
- @Autowired
|
|
|
- private DeviceAlertService deviceAlertService;
|
|
|
- @Autowired
|
|
|
- private DeviceAjService deviceAjService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private SimpleContext simpleContext;
|
|
|
/**
|
|
|
* 处理消息-消费者
|
|
|
* @return
|
|
@@ -48,35 +33,40 @@ public class MqttListener {
|
|
|
@ServiceActivator(inputChannel = MqttInConfig.CHANNEL_NAME_INPUT)
|
|
|
public MessageHandler handler() {
|
|
|
return message -> {
|
|
|
- log.info("mqtt-----接受到的消息"+message);
|
|
|
String payload = message.getPayload().toString();
|
|
|
//进行接口推送
|
|
|
Object mqttReceivedTopic = message.getHeaders().get("mqtt_receivedTopic");
|
|
|
if (null != mqttReceivedTopic){
|
|
|
String topic = mqttReceivedTopic.toString();
|
|
|
- Map map = JsonUtils.fromJson(payload, Map.class);
|
|
|
- if ("mh/water/info".equals(topic)){
|
|
|
- oneCardApi.callInfoApi(map);
|
|
|
- }else if ("mh/water/alert".equals(topic)){
|
|
|
- oneCardApi.callAlarmApi(map);
|
|
|
- }else if ("mh/water/statistics".equals(topic)){
|
|
|
- WaterAj waterAj = JsonUtils.fromJson(payload, WaterAj.class);
|
|
|
- waterAjService.saveOrUpdate(waterAj);
|
|
|
- }else if ("device/detail".equals(topic)){
|
|
|
- //设备录入
|
|
|
- List<Device> devices = JsonUtils.fromJson(payload, new TypeReference<List<Device>>() {
|
|
|
- });
|
|
|
- deviceService.saveOrUpdateBatch(devices);
|
|
|
- }else if ("device/info".equals(topic)){
|
|
|
- DeviceInfo deviceInfo = JsonUtils.fromJson(payload, DeviceInfo.class);
|
|
|
- deviceInfoService.save(deviceInfo);
|
|
|
- }else if ("device/alert".equals(topic)){
|
|
|
- DeviceAlert deviceAlert = JsonUtils.fromJson(payload, DeviceAlert.class);
|
|
|
- deviceAlertService.save(deviceAlert);
|
|
|
- }else if ("device/aj".equals(topic)){
|
|
|
- DeviceAj deviceAj = JsonUtils.fromJson(payload, DeviceAj.class);
|
|
|
- deviceAjService.saveOrUpdate(deviceAj);
|
|
|
+ MqttBaseVO mqttBaseVO = new MqttBaseVO();
|
|
|
+ mqttBaseVO.setTopic(topic);
|
|
|
+ if (TopListener.MH_WATER_INFO.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("cy");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else if (TopListener.MH_WATER_ALERT.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("cy");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else if (TopListener.MH_WATER_STATISTICS.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("cy");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else if (TopListener.DEVICE_DETAIL.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("mhwater");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else if (TopListener.DEVICE_INFO.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("mhwater");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else if (TopListener.DEVICE_ALERT.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("mhwater");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else if (TopListener.DEVICE_AJ.getCode().equals(topic)){
|
|
|
+ mqttBaseVO.setDescribe("mhwater");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
+ }else {
|
|
|
+ mqttBaseVO.setDescribe("fireInfoAndAlarm");
|
|
|
+ mqttBaseVO.setData(payload);
|
|
|
}
|
|
|
+ //统一处理数据
|
|
|
+ simpleContext.getResource(mqttBaseVO);
|
|
|
}
|
|
|
};
|
|
|
}
|