MqttUtils.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package com.usky.utils.mqtt;
  2. import lombok.extern.log4j.Log4j2;
  3. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  4. import org.springframework.integration.mqtt.support.MqttHeaders;
  5. import org.springframework.integration.support.MessageBuilder;
  6. import org.springframework.messaging.Message;
  7. import java.util.Collection;
  8. import java.util.HashMap;
  9. import java.util.Iterator;
  10. import java.util.Map;
  11. /**
  12. * mqtt工具类,可以根据通道名称发送消息
  13. */
  14. @Log4j2
  15. public class MqttUtils {
  16. /**
  17. * qos 0
  18. */
  19. public static final int QOS_0 = 0;
  20. /**
  21. * qos 1
  22. */
  23. public static final int QOS_1 = 1;
  24. /**
  25. * qos 2
  26. */
  27. public static final int QOS_2 = 2;
  28. private final static Map<String, MqttPahoMessageHandler> HANDLER_MAP = new HashMap<>(16);
  29. public final static String CHANNEL_NAME_SUFFIX = "MqttPahoMessageHandler";
  30. /**
  31. * 存放handler
  32. *
  33. * @param channelName
  34. * @param handler
  35. */
  36. public static void put(String channelName, MqttPahoMessageHandler handler) {
  37. HANDLER_MAP.put(channelName + CHANNEL_NAME_SUFFIX, handler);
  38. }
  39. /**
  40. * 发送消息
  41. *
  42. * @param topic 要发送的主题
  43. * @param message 消息内容
  44. * @param qos qos级别
  45. * @param channelName 发送到指定的通道
  46. */
  47. public static void sendMessage(String topic, String message, int qos, String channelName) {
  48. MqttPahoMessageHandler handler = getHandler(channelName);
  49. Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
  50. .setHeader(MqttHeaders.QOS, qos).build();
  51. handler.handleMessage(mqttMessage);
  52. }
  53. /**
  54. * 发送消息,默认qos级别为1
  55. *
  56. * @param topic 要发送的主题
  57. * @param message 消息内容
  58. * @param channelName 发送到指定的通道
  59. */
  60. public static void sendMessage(String topic, String message, String channelName) {
  61. MqttPahoMessageHandler handler = getHandler(channelName);
  62. Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
  63. .setHeader(MqttHeaders.QOS, QOS_1).build();
  64. handler.handleMessage(mqttMessage);
  65. }
  66. /**
  67. * 发送消息
  68. *
  69. * @param mqttMessage 消息
  70. * @param channelName 发送到指定的通道
  71. */
  72. public static void sendMessage(Message<String> mqttMessage, String channelName) {
  73. MqttPahoMessageHandler handler = getHandler(channelName);
  74. handler.handleMessage(mqttMessage);
  75. }
  76. /**
  77. * 如果只有一个通道将使用该通道发送消息
  78. *
  79. * @param topic
  80. * @param message
  81. * @param qos
  82. */
  83. public static void sendMessage(String topic, String message, int qos) {
  84. MqttPahoMessageHandler handler = getDefaultHeadler();
  85. Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
  86. .setHeader(MqttHeaders.QOS, qos).build();
  87. handler.handleMessage(mqttMessage);
  88. }
  89. /**
  90. * 如果只有一个通道将使用该通道发送消息,默认qos级别为1
  91. *
  92. * @param topic
  93. * @param message
  94. */
  95. public static void sendMessage(String topic, String message) {
  96. MqttPahoMessageHandler handler = getDefaultHeadler();
  97. Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
  98. .setHeader(MqttHeaders.QOS, QOS_1).build();
  99. handler.handleMessage(mqttMessage);
  100. }
  101. /**
  102. * 如果只有一个通道将使用该通道发送消息,默认qos级别为1
  103. *
  104. * @param mqttMessage 消息信息
  105. */
  106. public static void sendMessage(Message<String> mqttMessage) {
  107. MqttPahoMessageHandler handler = getDefaultHeadler();
  108. handler.handleMessage(mqttMessage);
  109. }
  110. /**
  111. * 获取默认的handler
  112. *
  113. * @return
  114. */
  115. private static MqttPahoMessageHandler getDefaultHeadler() {
  116. Collection<MqttPahoMessageHandler> values = HANDLER_MAP.values();
  117. Iterator<MqttPahoMessageHandler> iterator = values.iterator();
  118. MqttPahoMessageHandler handler = iterator.next();
  119. if (handler == null) {
  120. log.error("发送消息失败,无可用的headler");
  121. throw new RuntimeException("发送消息失败,无可用的headler");
  122. }
  123. return handler;
  124. }
  125. /**
  126. * 根据通道获取handler
  127. *
  128. * @param channelName
  129. * @return
  130. */
  131. private static MqttPahoMessageHandler getHandler(String channelName) {
  132. MqttPahoMessageHandler handler = HANDLER_MAP.get(channelName + CHANNEL_NAME_SUFFIX);
  133. if (handler == null) {
  134. log.error("未查询到相应通道{}的handler,存在的通道名称{}", channelName, HANDLER_MAP.keySet());
  135. throw new IllegalArgumentException("未查询到相应通道" + channelName + "的handler");
  136. }
  137. return handler;
  138. }
  139. }