package com.tidecloud.dataacceptance.service.handle; import java.util.HashMap; import java.util.Map; import javax.xml.bind.DatatypeConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.smartsanitation.common.util.StringUtil; import com.tidecloud.dataacceptance.codec.MsgDecoder; import com.tidecloud.dataacceptance.common.Constants; import com.tidecloud.dataacceptance.common.JT808ProtocolUtils; import com.tidecloud.dataacceptance.entity.ConnectMsg; import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg; import com.tidecloud.dataacceptance.entity.LocationSelfInfoUploadMsg; import com.tidecloud.dataacceptance.entity.PackageData; import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader; import com.tidecloud.dataacceptance.entity.Session; import com.tidecloud.dataacceptance.entity.SessionManager; import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg; import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg; import com.tidecloud.dataacceptance.service.TerminalMsgProcessService; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * @author cdk */ @Component @ChannelHandler.Sharable public class YiTongGpsServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class); public static String PREFIX_LINK = "s."; public static String PREFIX_LINK_BACK = "s.b."; public static String PREFIX_DEVICE = "d."; public static final Integer REDIS_INDEX_LINK = 15; public static Map socketyChannelMap = new HashMap<>(); public static Map channelDeviceMap = new HashMap<>(); public static Map commandCopy = new HashMap<>(); private final SessionManager sessionManager; private final MsgDecoder decoder; private TerminalMsgProcessService msgProcessService; /** * * @Title: YiTongGpsServerHandler * @Description: initialzation sessionManager and msgDecoder */ public YiTongGpsServerHandler() { this.sessionManager = SessionManager.getInstance(); this.decoder = new MsgDecoder(); this.msgProcessService = new TerminalMsgProcessService(); } @Autowired private JedisPool jedisPool; @Value("${server.localaddress}") private String address; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf dataByteBuf = (ByteBuf) msg; // print acceptance data printAcceptanceData(dataByteBuf); try { if (!dataByteBuf.isReadable()) { return; } byte[] dataByteArray = new byte[dataByteBuf.readableBytes()]; dataByteBuf.readBytes(dataByteArray); byte[] dataByteArrayDoEscape = JT808ProtocolUtils. doEscape4Receive(dataByteArray, 0, dataByteArray.length); //let dataByteArray transfer 808DataEntity PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape); //link manage packageData.setChannel(ctx.channel()); this.processPackageData(packageData); } catch (Exception e) { logger.error(e.getLocalizedMessage()); } ctx.flush(); } private void manageChannel(Channel channel, String deviceId) { String socketkey = channel.id().asLongText(); Channel channelInMap = socketyChannelMap.get(socketkey); String deviceIdInMap = channelDeviceMap.get(channel); if (channelInMap != null && deviceIdInMap != null) { logger.debug("device [{}] has link [{}]", deviceId, socketkey); return; } socketyChannelMap.put(socketkey, channel); channelDeviceMap.put(channel, deviceId); String addressStr = ConnectMsg.ipToLong(address); ConnectMsg cMsg = new ConnectMsg(address, socketkey); Jedis jedis = jedisPool.getResource(); try { jedis.select(REDIS_INDEX_LINK); String insertKey = PREFIX_LINK + addressStr; String selectKey = PREFIX_DEVICE + deviceId; String insertBackupKey = PREFIX_LINK_BACK + addressStr; jedis.sadd(insertKey, socketkey); jedis.sadd(insertBackupKey, deviceId); jedis.set(selectKey, StringUtil.convert2String(cMsg)); } catch (Exception e) { e.printStackTrace(); } finally { jedis.close(); } } private void deleteLinkFromRedis(String deviceId) { String deleteKey = PREFIX_DEVICE + deviceId; try(Jedis jedis = jedisPool.getResource()) { jedis.select(REDIS_INDEX_LINK); String connectMsg = jedis.get(deleteKey); if (connectMsg != null) { ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class); String socketId = cmsg.getSocketId(); socketyChannelMap.remove(socketId); String socketQueryKey = PREFIX_LINK + address; jedis.srem(socketQueryKey, socketId); jedis.del(deleteKey); logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId); } } catch (Exception e) { logger.error(e.getLocalizedMessage()); } } private void processPackageData(PackageData packageData) { final MsgHeader header = packageData.getMsgHeader(); // 1. 终端心跳-消息体为空 ==> 平台通用应答 if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) { logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { this.msgProcessService.processTerminalHeartBeatMsg(packageData); manageChannel(packageData.getChannel(), header.getTerminalPhone()); logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 5. 终端鉴权 ==> 平台通用应答 else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) { logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData); this.msgProcessService.processAuthMsg(authenticationMsg); logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 6. 终端注册 ==> 终端注册应答 else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) { logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData); this.msgProcessService.processRegisterMsg(msg); logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答 else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) { logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { this.msgProcessService.processTerminalLogoutMsg(packageData); logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 3. 自定义位置信息汇报 ==> 平台通用应答 else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) { logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData); this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg); logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage(), e); } } // 4. 位置信息汇报 ==> 平台通用应答 else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) { logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData); this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg); logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 5. 车辆控制回复 else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) { logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } // 其他情况 else { logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(), packageData); } } private void printAcceptanceData(ByteBuf dataByteBuf) { ByteBuf dataByteBufCopy = dataByteBuf.copy(); byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()]; dataByteBufCopy.readBytes(dataByteArray); String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray); logger.info("acceptance original data [{}]", printHexBinary); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); if (!channel.isActive()) { String deviceId = channelDeviceMap.get(channel); if (deviceId != null) { channelDeviceMap.remove(channel); deleteLinkFromRedis(deviceId); } } super.channelInactive(ctx); ctx.close(); final String sessionId = ctx.channel().id().asLongText(); Session session = sessionManager.findBySessionId(sessionId); this.sessionManager.removeBySessionId(sessionId); logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Session session = Session.buildSession(ctx.channel()); sessionManager.put(session.getId(), session); logger.debug("client linking server session : [{}]", StringUtil.convert2String(session)); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel())); logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session)); ctx.close(); } } } }