package jnpf.message.util; import cn.dev33.satoken.stp.StpUtil; import com.alibaba.fastjson.JSONObject; import jnpf.consts.AuthConsts; import jnpf.util.UserProvider; import lombok.extern.slf4j.Slf4j; import org.springframework.util.ObjectUtils; import jakarta.websocket.Session; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import static jnpf.consts.AuthConsts.TOKEN_PREFIX; /** * * @author JNPF开发平台组 * @version V3.1.0 * @copyright 引迈信息技术有限公司 * @date 2021/3/16 10:51 */ @Slf4j public class OnlineUserProvider { /** * 在线用户 */ private static final List onlineUserList = new ArrayList<>(); public static List getOnlineUserList() { return OnlineUserProvider.onlineUserList; } public static void addModel(OnlineUserModel model){ synchronized (onlineUserList) { OnlineUserProvider.onlineUserList.add(model); } } public static void removeModel(OnlineUserModel onlineUserModel){ synchronized (onlineUserList) { onlineUserList.remove(onlineUserModel); } } // =================== Websocket相关操作 =================== /** * 根据Token精准推送Websocket 登出消息 * @param token */ public static void removeWebSocketByToken(String... token) { List tokens = Arrays.stream(token).map(t -> t.contains(AuthConsts.TOKEN_PREFIX) ? t : TOKEN_PREFIX + " " + t).collect(Collectors.toList()); //清除websocket登录状态 List users = OnlineUserProvider.getOnlineUserList().stream().filter(t -> tokens.contains(t.getToken())).collect(Collectors.toList()); if (!ObjectUtils.isEmpty(users)) { for (OnlineUserModel user : users) { OnlineUserProvider.logoutWS(user, null); //先移除对象, 并推送下线信息, 避免网络原因导致就用户未断开 新用户连不上WebSocket OnlineUserProvider.removeModel(user); //通知所有在线,有用户离线 //功能已删除 // for (OnlineUserModel item : OnlineUserProvider.getOnlineUserList().stream().filter(t -> !Objects.equals(user.getUserId(), t.getUserId()) && Objects.equals(user.getTenantId(),t.getTenantId())).collect(Collectors.toList())) { // if (!item.getUserId().equals(user.getUserId())) { // JSONObject obj = new JSONObject(); // obj.put("method", "Offline"); // //推送给前端 // OnlineUserProvider.sendMessage(item, obj); // // } // } } } } /** * 根据用户ID 推送全部Websocket 登出消息 * @param userId */ public static void removeWebSocketByUser(String userId) { List tokens = StpUtil.getTokenValueListByLoginId(UserProvider.splicingLoginId(userId)); removeWebSocketByToken(tokens.toArray(new String[tokens.size()])); } /** * 发送用户退出消息 * @param session */ public static void logoutWS(OnlineUserModel onlineUserModel, Session session) { JSONObject obj = new JSONObject(); obj.put("method", "logout"); obj.put("token", onlineUserModel.getToken()); if(onlineUserModel != null) { sendMessage(onlineUserModel, obj); }else{ sendMessage(session, obj); } } /** * 发送关闭WebSocket消息, 前端不在重连 * @param session */ public static void closeFrontWs(OnlineUserModel onlineUserModel, Session session) { JSONObject obj = new JSONObject(); obj.put("method", "closeSocket"); if(onlineUserModel != null) { sendMessage(onlineUserModel, obj); }else{ sendMessage(session, obj); } } public static void sendMessage(OnlineUserModel onlineUserModel, Object message){ Session session = onlineUserModel.getWebSocket(); synchronized (session) { try { if (session.isOpen()) { session.getAsyncRemote().sendText(JSONObject.toJSONString(message)); }else{ log.debug("WS未打开: {}, {}, {}, {}, {}", onlineUserModel.getTenantId(), session.getId(), onlineUserModel.getUserId(), onlineUserModel.getToken(), message); try{ session.close(); }catch (Exception ee){ } finally { OnlineUserProvider.removeModel(onlineUserModel); } } }catch (Exception e){ log.debug(String.format("WS消息发送失败: %s, %s, %s, %s, %s", onlineUserModel.getTenantId(), session.getId(), onlineUserModel.getUserId(), onlineUserModel.getToken(), message), e); } } } public static void sendMessage(Session session, Object message){ OnlineUserModel onlineUserModel = OnlineUserProvider.getOnlineUserList().stream().filter(t -> t.getConnectionId().equals(session.getId())).findFirst().orElse(null); if(onlineUserModel == null){ onlineUserModel = new OnlineUserModel(); onlineUserModel.setWebSocket(session); } synchronized (session) { sendMessage(onlineUserModel, message); } } }