|
@@ -0,0 +1,162 @@
|
|
|
|
|
+package com.usky.sas.common.global;
|
|
|
|
|
+
|
|
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import javax.websocket.OnClose;
|
|
|
|
|
+import javax.websocket.OnError;
|
|
|
|
|
+import javax.websocket.OnMessage;
|
|
|
|
|
+import javax.websocket.OnOpen;
|
|
|
|
|
+import javax.websocket.Session;
|
|
|
|
|
+import javax.websocket.server.ServerEndpoint;
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * SAS WebSocket 服务
|
|
|
|
|
+ * 参考 service-alarm 模块实现
|
|
|
|
|
+ * 优化:去除路径参数userId,支持参数传递或默认sessionId
|
|
|
|
|
+ */
|
|
|
|
|
+@ServerEndpoint(value = "/websocket")
|
|
|
|
|
+@Component
|
|
|
|
|
+public class SasWebSocket {
|
|
|
|
|
+
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(SasWebSocket.class);
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 存放所有在线的客户端
|
|
|
|
|
+ */
|
|
|
|
|
+ private static ConcurrentHashMap<String, SasWebSocket> webSocketMap = new ConcurrentHashMap<>();
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
|
|
+ */
|
|
|
|
|
+ private Session session;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 接收 userId 或 sessionId
|
|
|
|
|
+ */
|
|
|
|
|
+ private String userId;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 连接建立成功调用的方法
|
|
|
|
|
+ */
|
|
|
|
|
+ @OnOpen
|
|
|
|
|
+ public void onOpen(Session session) {
|
|
|
|
|
+ this.session = session;
|
|
|
|
|
+ // 尝试从请求参数中获取 userId
|
|
|
|
|
+ Map<String, List<String>> params = session.getRequestParameterMap();
|
|
|
|
|
+ if (params != null && params.containsKey("userId")) {
|
|
|
|
|
+ List<String> userIds = params.get("userId");
|
|
|
|
|
+ if (userIds != null && !userIds.isEmpty()) {
|
|
|
|
|
+ this.userId = userIds.get(0);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 如果没有 userId,则使用 sessionId
|
|
|
|
|
+ if (this.userId == null) {
|
|
|
|
|
+ this.userId = session.getId();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 无论是否存在,直接覆盖(put操作本身就是覆盖)
|
|
|
|
|
+ webSocketMap.put(this.userId, this);
|
|
|
|
|
+
|
|
|
|
|
+ log.info("【websocket消息】有新的连接, ID={}, 当前在线人数={}", this.userId, webSocketMap.size());
|
|
|
|
|
+ sendMessage("CONNECT_SUCCESS");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 连接关闭调用的方法
|
|
|
|
|
+ */
|
|
|
|
|
+ @OnClose
|
|
|
|
|
+ public void onClose() {
|
|
|
|
|
+ if (this.userId != null && webSocketMap.containsKey(this.userId)) {
|
|
|
|
|
+ // 只有当 map 中的对象是当前对象时才移除?
|
|
|
|
|
+ // 简单起见,直接移除 key。如果是同一个 key 的新连接,可能会被错误移除吗?
|
|
|
|
|
+ // 如果新连接已经 put 了,这里 remove 会把新连接移除。
|
|
|
|
|
+ // 应该判断 value 是否是 this
|
|
|
|
|
+ SasWebSocket stored = webSocketMap.get(this.userId);
|
|
|
|
|
+ if (stored == this) {
|
|
|
|
|
+ webSocketMap.remove(this.userId);
|
|
|
|
|
+ log.info("【websocket消息】连接断开, ID={}, 当前在线人数={}", this.userId, webSocketMap.size());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.info("【websocket消息】连接断开 (已被新连接覆盖), ID={}", this.userId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 收到客户端消息后调用的方法
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param message 客户端发送过来的消息
|
|
|
|
|
+ */
|
|
|
|
|
+ @OnMessage
|
|
|
|
|
+ public void onMessage(String message, Session session) {
|
|
|
|
|
+ log.info("【websocket消息】收到用户={}的消息: {}", userId, message);
|
|
|
|
|
+ if ("ping".equals(message)) {
|
|
|
|
|
+ sendMessage("pong");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 发生错误时调用
|
|
|
|
|
+ */
|
|
|
|
|
+ @OnError
|
|
|
|
|
+ public void onError(Session session, Throwable error) {
|
|
|
|
|
+ log.error("【websocket消息】用户={}发生错误", userId, error);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 实现服务器主动推送
|
|
|
|
|
+ */
|
|
|
|
|
+ public void sendMessage(String message) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ this.session.getBasicRemote().sendText(message);
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ log.error("【websocket消息】发送消息失败, ID={}", userId, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 向指定用户发送消息
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param message 消息内容
|
|
|
|
|
+ * @param userId 用户ID
|
|
|
|
|
+ */
|
|
|
|
|
+ public static void sendInfo(Object message, String userId) {
|
|
|
|
|
+ SasWebSocket sasWebSocket = webSocketMap.get(userId);
|
|
|
|
|
+ if (sasWebSocket != null) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String msgStr = (message instanceof String) ? (String) message : JSONUtil.toJsonStr(message);
|
|
|
|
|
+ sasWebSocket.session.getBasicRemote().sendText(msgStr);
|
|
|
|
|
+ log.info("【websocket消息】发送消息成功, ID={}, 消息内容={}", userId, msgStr);
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ log.error("【websocket消息】发送消息失败, ID={}", userId, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.warn("【websocket消息】用户未连接, 无法发送消息, ID={}", userId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 广播消息
|
|
|
|
|
+ */
|
|
|
|
|
+ public static void sendAll(Object message) {
|
|
|
|
|
+ String msgStr = (message instanceof String) ? (String) message : JSONUtil.toJsonStr(message);
|
|
|
|
|
+ for (Map.Entry<String, SasWebSocket> entry : webSocketMap.entrySet()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ entry.getValue().session.getBasicRemote().sendText(msgStr);
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ log.error("【websocket消息】广播消息失败, ID={}", entry.getKey(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static ConcurrentHashMap<String, SasWebSocket> getWebSocketMap() {
|
|
|
|
|
+ return webSocketMap;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|