YiTongGpsServerHandler.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package com.tidecloud.dataacceptance.service.handle;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import javax.xml.bind.DatatypeConverter;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import com.smartsanitation.common.util.StringUtil;
  11. import com.tidecloud.dataacceptance.codec.MsgDecoder;
  12. import com.tidecloud.dataacceptance.common.Constants;
  13. import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
  14. import com.tidecloud.dataacceptance.entity.ConnectMsg;
  15. import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg;
  16. import com.tidecloud.dataacceptance.entity.LocationSelfInfoUploadMsg;
  17. import com.tidecloud.dataacceptance.entity.PackageData;
  18. import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
  19. import com.tidecloud.dataacceptance.entity.Session;
  20. import com.tidecloud.dataacceptance.entity.SessionManager;
  21. import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg;
  22. import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg;
  23. import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
  24. import io.netty.buffer.ByteBuf;
  25. import io.netty.channel.Channel;
  26. import io.netty.channel.ChannelHandler;
  27. import io.netty.channel.ChannelHandlerContext;
  28. import io.netty.channel.ChannelInboundHandlerAdapter;
  29. import io.netty.handler.timeout.IdleState;
  30. import io.netty.handler.timeout.IdleStateEvent;
  31. import redis.clients.jedis.Jedis;
  32. import redis.clients.jedis.JedisPool;
  33. /**
  34. * @author cdk
  35. */
  36. @Component
  37. @ChannelHandler.Sharable
  38. public class YiTongGpsServerHandler extends ChannelInboundHandlerAdapter {
  39. private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
  40. public static String PREFIX_LINK = "s.";
  41. public static String PREFIX_LINK_BACK = "s.b.";
  42. public static String PREFIX_DEVICE = "d.";
  43. public static final Integer REDIS_INDEX_LINK = 15;
  44. public static Map<String, Channel> socketyChannelMap = new HashMap<>();
  45. public static Map<Channel, String> channelDeviceMap = new HashMap<>();
  46. public static Map<String, String> commandCopy = new HashMap<>();
  47. private final SessionManager sessionManager;
  48. private final MsgDecoder decoder;
  49. private TerminalMsgProcessService msgProcessService;
  50. /**
  51. *
  52. * @Title: YiTongGpsServerHandler
  53. * @Description: initialzation sessionManager and msgDecoder
  54. */
  55. public YiTongGpsServerHandler() {
  56. this.sessionManager = SessionManager.getInstance();
  57. this.decoder = new MsgDecoder();
  58. this.msgProcessService = new TerminalMsgProcessService();
  59. }
  60. @Autowired
  61. private JedisPool jedisPool;
  62. @Value("${server.localaddress}")
  63. private String address;
  64. @Override
  65. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  66. ByteBuf dataByteBuf = (ByteBuf) msg;
  67. // print acceptance data
  68. printAcceptanceData(dataByteBuf);
  69. try {
  70. if (!dataByteBuf.isReadable()) {
  71. return;
  72. }
  73. byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
  74. dataByteBuf.readBytes(dataByteArray);
  75. byte[] dataByteArrayDoEscape = JT808ProtocolUtils.
  76. doEscape4Receive(dataByteArray, 0, dataByteArray.length);
  77. //let dataByteArray transfer 808DataEntity
  78. PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
  79. //link manage
  80. packageData.setChannel(ctx.channel());
  81. this.processPackageData(packageData);
  82. } catch (Exception e) {
  83. logger.error(e.getLocalizedMessage());
  84. }
  85. ctx.flush();
  86. }
  87. private void manageChannel(Channel channel, String deviceId) {
  88. String socketkey = channel.id().asLongText();
  89. Channel channelInMap = socketyChannelMap.get(socketkey);
  90. String deviceIdInMap = channelDeviceMap.get(channel);
  91. if (channelInMap != null && deviceIdInMap != null) {
  92. logger.debug("device [{}] has link [{}]", deviceId, socketkey);
  93. return;
  94. }
  95. socketyChannelMap.put(socketkey, channel);
  96. channelDeviceMap.put(channel, deviceId);
  97. String addressStr = ConnectMsg.ipToLong(address);
  98. ConnectMsg cMsg = new ConnectMsg(address, socketkey);
  99. Jedis jedis = jedisPool.getResource();
  100. try {
  101. jedis.select(REDIS_INDEX_LINK);
  102. String insertKey = PREFIX_LINK + addressStr;
  103. String selectKey = PREFIX_DEVICE + deviceId;
  104. String insertBackupKey = PREFIX_LINK_BACK + addressStr;
  105. jedis.sadd(insertKey, socketkey);
  106. jedis.sadd(insertBackupKey, deviceId);
  107. jedis.set(selectKey, StringUtil.convert2String(cMsg));
  108. } catch (Exception e) {
  109. e.printStackTrace();
  110. } finally {
  111. jedis.close();
  112. }
  113. }
  114. private void deleteLinkFromRedis(String deviceId) {
  115. String deleteKey = PREFIX_DEVICE + deviceId;
  116. try(Jedis jedis = jedisPool.getResource()) {
  117. jedis.select(REDIS_INDEX_LINK);
  118. String connectMsg = jedis.get(deleteKey);
  119. if (connectMsg != null) {
  120. ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
  121. String socketId = cmsg.getSocketId();
  122. socketyChannelMap.remove(socketId);
  123. String socketQueryKey = PREFIX_LINK + address;
  124. jedis.srem(socketQueryKey, socketId);
  125. jedis.del(deleteKey);
  126. logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
  127. }
  128. } catch (Exception e) {
  129. logger.error(e.getLocalizedMessage());
  130. }
  131. }
  132. private void processPackageData(PackageData packageData) {
  133. final MsgHeader header = packageData.getMsgHeader();
  134. // 1. 终端心跳-消息体为空 ==> 平台通用应答
  135. if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
  136. logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  137. try {
  138. this.msgProcessService.processTerminalHeartBeatMsg(packageData);
  139. manageChannel(packageData.getChannel(), header.getTerminalPhone());
  140. logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  141. } catch (Exception e) {
  142. logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  143. e.getMessage());
  144. }
  145. }
  146. // 5. 终端鉴权 ==> 平台通用应答
  147. else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
  148. logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  149. try {
  150. TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
  151. this.msgProcessService.processAuthMsg(authenticationMsg);
  152. logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  153. } catch (Exception e) {
  154. logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  155. e.getMessage());
  156. }
  157. }
  158. // 6. 终端注册 ==> 终端注册应答
  159. else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
  160. logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  161. try {
  162. TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
  163. this.msgProcessService.processRegisterMsg(msg);
  164. logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  165. } catch (Exception e) {
  166. logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  167. e.getMessage());
  168. }
  169. }
  170. // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
  171. else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
  172. logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  173. try {
  174. this.msgProcessService.processTerminalLogoutMsg(packageData);
  175. logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  176. } catch (Exception e) {
  177. logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  178. e.getMessage());
  179. }
  180. }
  181. // 3. 自定义位置信息汇报 ==> 平台通用应答
  182. else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) {
  183. logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  184. try {
  185. LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData);
  186. this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg);
  187. logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  188. } catch (Exception e) {
  189. logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  190. e.getMessage(), e);
  191. }
  192. }
  193. // 4. 位置信息汇报 ==> 平台通用应答
  194. else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
  195. logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  196. try {
  197. LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
  198. this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
  199. logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  200. } catch (Exception e) {
  201. logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  202. e.getMessage());
  203. }
  204. }
  205. // 5. 车辆控制回复
  206. else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) {
  207. logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  208. }
  209. // 其他情况
  210. else {
  211. logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
  212. packageData);
  213. }
  214. }
  215. private void printAcceptanceData(ByteBuf dataByteBuf) {
  216. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  217. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  218. dataByteBufCopy.readBytes(dataByteArray);
  219. String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray);
  220. logger.info("acceptance original data [{}]", printHexBinary);
  221. }
  222. @Override
  223. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  224. cause.printStackTrace();
  225. ctx.close();
  226. }
  227. @Override
  228. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  229. Channel channel = ctx.channel();
  230. if (!channel.isActive()) {
  231. String deviceId = channelDeviceMap.get(channel);
  232. if (deviceId != null) {
  233. channelDeviceMap.remove(channel);
  234. deleteLinkFromRedis(deviceId);
  235. }
  236. }
  237. super.channelInactive(ctx);
  238. ctx.close();
  239. final String sessionId = ctx.channel().id().asLongText();
  240. Session session = sessionManager.findBySessionId(sessionId);
  241. this.sessionManager.removeBySessionId(sessionId);
  242. logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session));
  243. }
  244. @Override
  245. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  246. Session session = Session.buildSession(ctx.channel());
  247. sessionManager.put(session.getId(), session);
  248. logger.debug("client linking server session : [{}]", StringUtil.convert2String(session));
  249. }
  250. @Override
  251. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  252. if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
  253. IdleStateEvent event = (IdleStateEvent) evt;
  254. if (event.state() == IdleState.READER_IDLE) {
  255. Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
  256. logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session));
  257. ctx.close();
  258. }
  259. }
  260. }
  261. }