1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package com.usky.mqtthandler;
- import com.usky.config.webScoket.WsSessionManager;
- import com.usky.constant.CommonConstant;
- import com.usky.service.mqtt.MqttService;
- import com.usky.utils.RedisUtil;
- import lombok.SneakyThrows;
- import lombok.extern.log4j.Log4j2;
- import net.sf.json.JSONObject;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageHandler;
- import org.springframework.messaging.MessagingException;
- import org.springframework.stereotype.Component;
- import org.springframework.web.socket.TextMessage;
- import javax.annotation.PostConstruct;
- @Log4j2
- @Component
- public class Mqtt2MessageHandler implements MessageHandler {
- @Autowired
- private MqttPahoMessageDrivenChannelAdapter channel1MqttChannelAdapter;
- @Autowired
- private MqttService mqttService;
- @Autowired
- private RedisUtil redisUtil;
- @PostConstruct
- private void addFailEvent() {
- channel1MqttChannelAdapter.setApplicationEventPublisher((event) -> {
- log.error("【MQTT】失败事件 {}", event);
- });
- }
- @SneakyThrows
- @ServiceActivator(inputChannel = "channel1")
- @Override
- public void handleMessage(Message<?> message) throws MessagingException {
- Object o = redisUtil.get("15365185591");
- String payload = (String) message.getPayload();
- if (isJsonObject(payload)) {
- JSONObject data = JSONObject.fromObject(payload);
- if (data.has("type")) {
- if (CommonConstant.MQTT_MESSAGE_TYPE_INFO.equals(data.get("type"))) {
- log.info("【MQTT】心跳数据{}解析】", message);
- mqttService.infoSava(data);
- } else if (CommonConstant.MQTT_MESSAGE_TYPE_ALARM.equals(data.get("type"))) {
- log.info("【MQTT】告警数据{}解析】", message);
- mqttService.alarmSendAndSava(data);
- } else {
- log.error("【MQTT】数据类型异常{}", message);
- }
- }
- } else {
- log.error("【MQTT】数据格式异常{}", message);
- }
- }
- public static boolean isJsonObject(String content) {
- if (StringUtils.isBlank(content)) {
- return false;
- }
- try {
- JSONObject data = JSONObject.fromObject(content);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
- }
|