BSJGpsServerHandler.java 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import java.util.concurrent.TimeUnit;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.slf4j.MDC;
  6. import org.springframework.context.annotation.Scope;
  7. import org.springframework.stereotype.Component;
  8. import com.tidecloud.dataacceptance.codec.MsgDecoder;
  9. import com.tidecloud.dataacceptance.common.Constants;
  10. import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
  11. import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg;
  12. import com.tidecloud.dataacceptance.entity.LocationSelfInfoUploadMsg;
  13. import com.tidecloud.dataacceptance.entity.PackageData;
  14. import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
  15. import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg;
  16. import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg;
  17. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  18. import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
  19. import io.netty.bootstrap.ServerBootstrap;
  20. import io.netty.buffer.ByteBuf;
  21. import io.netty.buffer.Unpooled;
  22. import io.netty.channel.Channel;
  23. import io.netty.channel.ChannelFuture;
  24. import io.netty.channel.ChannelHandler;
  25. import io.netty.channel.ChannelInitializer;
  26. import io.netty.channel.ChannelOption;
  27. import io.netty.channel.EventLoopGroup;
  28. import io.netty.channel.nio.NioEventLoopGroup;
  29. import io.netty.channel.socket.SocketChannel;
  30. import io.netty.channel.socket.nio.NioServerSocketChannel;
  31. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  32. import io.netty.handler.timeout.IdleStateHandler;
  33. /**
  34. * @author cdk
  35. */
  36. @Component
  37. @Scope("prototype")
  38. @ChannelHandler.Sharable
  39. public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  40. private static final Logger logger = LoggerFactory.getLogger(BSJGpsServerHandler.class);
  41. private final MsgDecoder decoder = new MsgDecoder();
  42. private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
  43. @Override
  44. public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
  45. try {
  46. if (!dataByteBuf.isReadable()) {
  47. return;
  48. }
  49. byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
  50. dataByteBuf.readBytes(dataByteArray);
  51. byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length);
  52. // let dataByteArray transfer 808DataEntity
  53. PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
  54. // link manage
  55. packageData.setChannel(channel);
  56. String deviceId = packageData.getMsgHeader().getTerminalPhone();
  57. MDC.put(MDC_DEVICEID, deviceId);
  58. packageData.setDeviceId(deviceId);
  59. //发送数据到kafka
  60. final MsgHeader header = packageData.getMsgHeader();
  61. if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
  62. sendMsg2Kafka(dataByteArray, packageData.getDeviceId(), channel);
  63. }
  64. this.processPackageData(packageData);
  65. } catch (Exception e) {
  66. logger.error(e.getLocalizedMessage());
  67. }
  68. }
  69. private void processPackageData(PackageData packageData) {
  70. final MsgHeader header = packageData.getMsgHeader();
  71. // 1. 终端心跳-消息体为空 ==> 平台通用应答
  72. if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
  73. logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  74. try {
  75. this.msgProcessService.processTerminalHeartBeatMsg(packageData);
  76. manageChannel(packageData.getChannel(), header.getTerminalPhone());
  77. logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  78. } catch (Exception e) {
  79. logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  80. e.getMessage());
  81. }
  82. }
  83. // 5. 终端鉴权 ==> 平台通用应答
  84. else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
  85. logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  86. try {
  87. TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
  88. this.msgProcessService.processAuthMsg(authenticationMsg);
  89. manageChannel(packageData.getChannel(), header.getTerminalPhone());
  90. logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  91. } catch (Exception e) {
  92. logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  93. e.getMessage());
  94. }
  95. }
  96. // 6. 终端注册 ==> 终端注册应答
  97. else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
  98. logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  99. try {
  100. TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
  101. this.msgProcessService.processRegisterMsg(msg);
  102. logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  103. } catch (Exception e) {
  104. logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  105. e.getMessage());
  106. }
  107. }
  108. // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
  109. else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
  110. logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  111. try {
  112. this.msgProcessService.processTerminalLogoutMsg(packageData);
  113. logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  114. } catch (Exception e) {
  115. logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  116. e.getMessage());
  117. }
  118. }
  119. // 3. 自定义位置信息汇报 ==> 平台通用应答
  120. else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) {
  121. logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  122. try {
  123. LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData);
  124. this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg);
  125. logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  126. } catch (Exception e) {
  127. logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
  128. header.getFlowId(), e.getMessage(), e);
  129. }
  130. }
  131. // 4. 位置信息汇报 ==> 平台通用应答
  132. else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
  133. logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  134. try {
  135. LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
  136. this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
  137. logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  138. } catch (Exception e) {
  139. logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  140. e.getMessage());
  141. }
  142. }
  143. // 5. 车辆控制回复
  144. else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) {
  145. logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  146. }
  147. // 其他情况
  148. else {
  149. logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
  150. packageData);
  151. }
  152. }
  153. public void startAcceptor() {
  154. EventLoopGroup bossGroup = new NioEventLoopGroup();
  155. EventLoopGroup workerGroup = new NioEventLoopGroup();
  156. byte[] splitBytes1 = new byte[] { 0x7e };
  157. byte[] splitBytes2 = new byte[] { 0x7e, 0x7e };
  158. try {
  159. ServerBootstrap b = new ServerBootstrap();
  160. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  161. .childHandler(new ChannelInitializer<SocketChannel>() {
  162. @Override
  163. protected void initChannel(SocketChannel ch) throws Exception {
  164. ch.pipeline().addLast("idleStateHandler",
  165. new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
  166. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204,
  167. Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2)));
  168. ch.pipeline().addLast(BSJGpsServerHandler.this);
  169. }
  170. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  171. ChannelFuture f = b.bind(this.getPort()).sync();
  172. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  173. this.getPort());
  174. f.channel().closeFuture().sync();
  175. } catch (Exception e) {
  176. logger.error(e.getMessage());
  177. } finally {
  178. cleanRedisLinkData();
  179. workerGroup.shutdownGracefully();
  180. bossGroup.shutdownGracefully();
  181. }
  182. }
  183. }