package com.usky.mqtthandler; import com.usky.config.webScoket.WsSessionManager; import com.usky.constant.CommonConstant; import com.usky.service.mqtt.MqttService; import com.usky.utils.RedisUtil; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import net.sf.json.JSONObject; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; import javax.annotation.PostConstruct; @Log4j2 @Component public class Mqtt2MessageHandler implements MessageHandler { @Autowired private MqttPahoMessageDrivenChannelAdapter channel1MqttChannelAdapter; @Autowired private MqttService mqttService; @Autowired private RedisUtil redisUtil; @PostConstruct private void addFailEvent() { channel1MqttChannelAdapter.setApplicationEventPublisher((event) -> { log.error("【MQTT】失败事件 {}", event); }); } @SneakyThrows @ServiceActivator(inputChannel = "channel1") @Override public void handleMessage(Message message) throws MessagingException { Object o = redisUtil.get("15365185591"); String payload = (String) message.getPayload(); if (isJsonObject(payload)) { JSONObject data = JSONObject.fromObject(payload); if (data.has("type")) { if (CommonConstant.MQTT_MESSAGE_TYPE_INFO.equals(data.get("type"))) { log.info("【MQTT】心跳数据{}解析】", message); mqttService.infoSava(data); } else if (CommonConstant.MQTT_MESSAGE_TYPE_ALARM.equals(data.get("type"))) { log.info("【MQTT】告警数据{}解析】", message); mqttService.alarmSendAndSava(data); } else { log.error("【MQTT】数据类型异常{}", message); } } } else { log.error("【MQTT】数据格式异常{}", message); } } public static boolean isJsonObject(String content) { if (StringUtils.isBlank(content)) { return false; } try { JSONObject data = JSONObject.fromObject(content); return true; } catch (Exception e) { return false; } } }