Mqtt2MessageHandler.java 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package com.usky.mqtthandler;
  2. import com.usky.config.webScoket.WsSessionManager;
  3. import com.usky.constant.CommonConstant;
  4. import com.usky.service.mqtt.MqttService;
  5. import com.usky.utils.RedisUtil;
  6. import lombok.SneakyThrows;
  7. import lombok.extern.log4j.Log4j2;
  8. import net.sf.json.JSONObject;
  9. import org.apache.commons.lang3.StringUtils;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.integration.annotation.ServiceActivator;
  12. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  13. import org.springframework.messaging.Message;
  14. import org.springframework.messaging.MessageHandler;
  15. import org.springframework.messaging.MessagingException;
  16. import org.springframework.stereotype.Component;
  17. import org.springframework.web.socket.TextMessage;
  18. import javax.annotation.PostConstruct;
  19. @Log4j2
  20. @Component
  21. public class Mqtt2MessageHandler implements MessageHandler {
  22. @Autowired
  23. private MqttPahoMessageDrivenChannelAdapter channel1MqttChannelAdapter;
  24. @Autowired
  25. private MqttService mqttService;
  26. @Autowired
  27. private RedisUtil redisUtil;
  28. @PostConstruct
  29. private void addFailEvent() {
  30. channel1MqttChannelAdapter.setApplicationEventPublisher((event) -> {
  31. log.error("【MQTT】失败事件 {}", event);
  32. });
  33. }
  34. @SneakyThrows
  35. @ServiceActivator(inputChannel = "channel1")
  36. @Override
  37. public void handleMessage(Message<?> message) throws MessagingException {
  38. Object o = redisUtil.get("15365185591");
  39. String payload = (String) message.getPayload();
  40. if (isJsonObject(payload)) {
  41. JSONObject data = JSONObject.fromObject(payload);
  42. if (data.has("type")) {
  43. if (CommonConstant.MQTT_MESSAGE_TYPE_INFO.equals(data.get("type"))) {
  44. log.info("【MQTT】心跳数据{}解析】", message);
  45. mqttService.infoSava(data);
  46. } else if (CommonConstant.MQTT_MESSAGE_TYPE_ALARM.equals(data.get("type"))) {
  47. log.info("【MQTT】告警数据{}解析】", message);
  48. mqttService.alarmSendAndSava(data);
  49. } else {
  50. log.error("【MQTT】数据类型异常{}", message);
  51. }
  52. }
  53. } else {
  54. log.error("【MQTT】数据格式异常{}", message);
  55. }
  56. }
  57. public static boolean isJsonObject(String content) {
  58. if (StringUtils.isBlank(content)) {
  59. return false;
  60. }
  61. try {
  62. JSONObject data = JSONObject.fromObject(content);
  63. return true;
  64. } catch (Exception e) {
  65. return false;
  66. }
  67. }
  68. }