JSTGPServerHandler.java 9.4 KB


  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.accept.client.DeviceMsgClient;
  3. import com.alibaba.fastjson.JSON;
  4. import com.tidecloud.dataacceptance.codec.MsgDecoder;
  5. import com.tidecloud.dataacceptance.common.Constants;
  6. import com.tidecloud.dataacceptance.common.DateUtil;
  7. import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
  8. import com.tidecloud.dataacceptance.entity.*;
  9. import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
  10. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  11. import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
  12. import io.netty.bootstrap.ServerBootstrap;
  13. import io.netty.buffer.ByteBuf;
  14. import io.netty.buffer.Unpooled;
  15. import io.netty.channel.*;
  16. import io.netty.channel.nio.NioEventLoopGroup;
  17. import io.netty.channel.socket.SocketChannel;
  18. import io.netty.channel.socket.nio.NioServerSocketChannel;
  19. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  20. import io.netty.handler.timeout.IdleStateHandler;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import org.slf4j.MDC;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.context.annotation.Scope;
  26. import org.springframework.data.redis.util.ByteUtils;
  27. import org.springframework.stereotype.Component;
  28. import java.util.Date;
  29. import java.util.concurrent.TimeUnit;
  30. /**
  31. * @author cdk
  32. */
  33. @Component
  34. @Scope("prototype")
  35. @ChannelHandler.Sharable
  36. public class JSTGPServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  37. private static final Logger logger = LoggerFactory.getLogger(JSTGPServerHandler.class);
  38. private final MsgDecoder decoder = new MsgDecoder();
  39. private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
  40. @Autowired
  41. private DeviceMsgClient deviceMsgClient;
  42. @Override
  43. public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
  44. try {
  45. if (!dataByteBuf.isReadable()) {
  46. return;
  47. }
  48. byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
  49. dataByteBuf.readBytes(dataByteArray);
  50. byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length);
  51. // let dataByteArray transfer 808DataEntity
  52. PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
  53. // link manage
  54. packageData.setChannel(channel);
  55. String deviceId = packageData.getMsgHeader().getTerminalPhone();
  56. MDC.put(MDC_DEVICEID, deviceId);
  57. packageData.setDeviceId(deviceId);
  58. //发送数据到kafka
  59. final MsgHeader header = packageData.getMsgHeader();
  60. if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId() || Constants.JT808_BATTERY == header.getMsgId()) {
  61. sendMsg2Kafka(byteMerger(dataByteArray, DateUtil.formatDate2String(new Date()).getBytes()), packageData.getDeviceId(), channel);
  62. }
  63. this.processPackageData(packageData);
  64. } catch (Exception e) {
  65. logger.error(e.getLocalizedMessage());
  66. }
  67. }
  68. public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
  69. byte[] byte_3 = new byte[byte_1.length + byte_2.length];
  70. System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
  71. System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
  72. return byte_3;
  73. }
  74. private void processPackageData(PackageData packageData) {
  75. final MsgHeader header = packageData.getMsgHeader();
  76. // 1. 终端心跳-消息体为空 ==> 平台通用应答
  77. if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
  78. logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  79. try {
  80. this.msgProcessService.processTerminalHeartBeatMsg(packageData);
  81. manageChannel(packageData.getChannel(), header.getTerminalPhone());
  82. logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  83. deviceMsgClient.acceptDeviceMsgParam("jinshatian",
  84. packageData.getDeviceId(),
  85. 1, JSON.toJSONString(packageData, true), System.currentTimeMillis());
  86. } catch (Exception e) {
  87. logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  88. e.getMessage());
  89. }
  90. }
  91. // 5. 终端鉴权 ==> 平台通用应答
  92. else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
  93. logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  94. try {
  95. TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
  96. this.msgProcessService.processAuthMsg(authenticationMsg);
  97. manageChannel(packageData.getChannel(), header.getTerminalPhone());
  98. logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  99. } catch (Exception e) {
  100. logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  101. e.getMessage());
  102. }
  103. }
  104. // 6. 终端注册 ==> 终端注册应答
  105. else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
  106. logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  107. try {
  108. TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
  109. this.msgProcessService.processRegisterMsg(msg);
  110. logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  111. } catch (Exception e) {
  112. logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  113. e.getMessage());
  114. }
  115. }
  116. // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
  117. else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
  118. logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  119. try {
  120. this.msgProcessService.processTerminalLogoutMsg(packageData);
  121. logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  122. } catch (Exception e) {
  123. logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  124. e.getMessage());
  125. }
  126. }
  127. // 4. 位置信息汇报 ==> 平台通用应答
  128. else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
  129. logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  130. try {
  131. LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
  132. this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
  133. logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  134. deviceMsgClient.acceptDeviceMsgParam("jinshatian",
  135. packageData.getDeviceId(),
  136. 2, JSON.toJSONString(packageData, true), System.currentTimeMillis());
  137. } catch (Exception e) {
  138. logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
  139. e.getMessage());
  140. }
  141. }
  142. // 3. 自定义电量信息汇报 ==> 平台通用应答
  143. else if (Constants.JT808_BATTERY == header.getMsgId()) {
  144. logger.info(">>>>>[自定义电量信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  145. try {
  146. this.msgProcessService.processTerminaMsg(packageData);
  147. } catch (Exception e) {
  148. logger.error("<<<<<[自定义电量信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
  149. header.getFlowId(), e.getMessage(), e);
  150. }
  151. }
  152. // 3. 自定义卡片登入汇报 ==> 平台通用应答
  153. else if (Constants.JT808_KP_LOGIN == header.getMsgId()) {
  154. logger.info(">>>>>[自定义卡片登录信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
  155. try {
  156. this.msgProcessService.processTerminaMsg(packageData);
  157. } catch (Exception e) {
  158. logger.error("<<<<<[自定义卡片登录信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
  159. header.getFlowId(), e.getMessage(), e);
  160. }
  161. }
  162. // 其他情况
  163. else {
  164. logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
  165. packageData);
  166. }
  167. }
  168. public void startAcceptor() {
  169. EventLoopGroup bossGroup = new NioEventLoopGroup();
  170. EventLoopGroup workerGroup = new NioEventLoopGroup();
  171. byte[] splitBytes1 = new byte[]{0x7e};
  172. byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
  173. try {
  174. ServerBootstrap b = new ServerBootstrap();
  175. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  176. .childHandler(new ChannelInitializer<SocketChannel>() {
  177. @Override
  178. protected void initChannel(SocketChannel ch) throws Exception {
  179. ch.pipeline().addLast("idleStateHandler",
  180. new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
  181. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204,
  182. Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2)));
  183. ch.pipeline().addLast(JSTGPServerHandler.this);
  184. }
  185. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  186. ChannelFuture f = b.bind(this.getPort()).sync();
  187. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  188. this.getPort());
  189. f.channel().closeFuture().sync();
  190. } catch (Exception e) {
  191. logger.error(e.getMessage());
  192. } finally {
  193. cleanRedisLinkData();
  194. workerGroup.shutdownGracefully();
  195. bossGroup.shutdownGracefully();
  196. }
  197. }
  198. }