123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package com.usky.utils.mqtt;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
- import org.springframework.integration.mqtt.support.MqttHeaders;
- import org.springframework.integration.support.MessageBuilder;
- import org.springframework.messaging.Message;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map;
- /**
- * mqtt工具类,可以根据通道名称发送消息
- */
- @Log4j2
- public class MqttUtils {
- /**
- * qos 0
- */
- public static final int QOS_0 = 0;
- /**
- * qos 1
- */
- public static final int QOS_1 = 1;
- /**
- * qos 2
- */
- public static final int QOS_2 = 2;
- private final static Map<String, MqttPahoMessageHandler> HANDLER_MAP = new HashMap<>(16);
- public final static String CHANNEL_NAME_SUFFIX = "MqttPahoMessageHandler";
- /**
- * 存放handler
- *
- * @param channelName
- * @param handler
- */
- public static void put(String channelName, MqttPahoMessageHandler handler) {
- HANDLER_MAP.put(channelName + CHANNEL_NAME_SUFFIX, handler);
- }
- /**
- * 发送消息
- *
- * @param topic 要发送的主题
- * @param message 消息内容
- * @param qos qos级别
- * @param channelName 发送到指定的通道
- */
- public static void sendMessage(String topic, String message, int qos, String channelName) {
- MqttPahoMessageHandler handler = getHandler(channelName);
- Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
- .setHeader(MqttHeaders.QOS, qos).build();
- handler.handleMessage(mqttMessage);
- }
- /**
- * 发送消息,默认qos级别为1
- *
- * @param topic 要发送的主题
- * @param message 消息内容
- * @param channelName 发送到指定的通道
- */
- public static void sendMessage(String topic, String message, String channelName) {
- MqttPahoMessageHandler handler = getHandler(channelName);
- Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
- .setHeader(MqttHeaders.QOS, QOS_1).build();
- handler.handleMessage(mqttMessage);
- }
- /**
- * 发送消息
- *
- * @param mqttMessage 消息
- * @param channelName 发送到指定的通道
- */
- public static void sendMessage(Message<String> mqttMessage, String channelName) {
- MqttPahoMessageHandler handler = getHandler(channelName);
- handler.handleMessage(mqttMessage);
- }
- /**
- * 如果只有一个通道将使用该通道发送消息
- *
- * @param topic
- * @param message
- * @param qos
- */
- public static void sendMessage(String topic, String message, int qos) {
- MqttPahoMessageHandler handler = getDefaultHeadler();
- Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
- .setHeader(MqttHeaders.QOS, qos).build();
- handler.handleMessage(mqttMessage);
- }
- /**
- * 如果只有一个通道将使用该通道发送消息,默认qos级别为1
- *
- * @param topic
- * @param message
- */
- public static void sendMessage(String topic, String message) {
- MqttPahoMessageHandler handler = getDefaultHeadler();
- Message<String> mqttMessage = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topic)
- .setHeader(MqttHeaders.QOS, QOS_1).build();
- handler.handleMessage(mqttMessage);
- }
- /**
- * 如果只有一个通道将使用该通道发送消息,默认qos级别为1
- *
- * @param mqttMessage 消息信息
- */
- public static void sendMessage(Message<String> mqttMessage) {
- MqttPahoMessageHandler handler = getDefaultHeadler();
- handler.handleMessage(mqttMessage);
- }
- /**
- * 获取默认的handler
- *
- * @return
- */
- private static MqttPahoMessageHandler getDefaultHeadler() {
- Collection<MqttPahoMessageHandler> values = HANDLER_MAP.values();
- Iterator<MqttPahoMessageHandler> iterator = values.iterator();
- MqttPahoMessageHandler handler = iterator.next();
- if (handler == null) {
- log.error("发送消息失败,无可用的headler");
- throw new RuntimeException("发送消息失败,无可用的headler");
- }
- return handler;
- }
- /**
- * 根据通道获取handler
- *
- * @param channelName
- * @return
- */
- private static MqttPahoMessageHandler getHandler(String channelName) {
- MqttPahoMessageHandler handler = HANDLER_MAP.get(channelName + CHANNEL_NAME_SUFFIX);
- if (handler == null) {
- log.error("未查询到相应通道{}的handler,存在的通道名称{}", channelName, HANDLER_MAP.keySet());
- throw new IllegalArgumentException("未查询到相应通道" + channelName + "的handler");
- }
- return handler;
- }
- }
|