PinShenData86ServerHandler.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import javax.xml.bind.DatatypeConverter;
  3. import org.apache.commons.lang.ArrayUtils;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.context.annotation.Scope;
  7. import org.springframework.stereotype.Component;
  8. import com.tidecloud.dataacceptance.common.BitOperator;
  9. import com.tidecloud.dataacceptance.common.CRCUtil;
  10. import com.tidecloud.dataacceptance.common.PinShenWaterUtils;
  11. import com.tidecloud.dataacceptance.entity.PinShenWaterDevice;
  12. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  13. import io.netty.bootstrap.ServerBootstrap;
  14. import io.netty.buffer.ByteBuf;
  15. import io.netty.buffer.Unpooled;
  16. import io.netty.channel.Channel;
  17. import io.netty.channel.ChannelFuture;
  18. import io.netty.channel.ChannelHandler;
  19. import io.netty.channel.ChannelInitializer;
  20. import io.netty.channel.ChannelOption;
  21. import io.netty.channel.EventLoopGroup;
  22. import io.netty.channel.nio.NioEventLoopGroup;
  23. import io.netty.channel.socket.SocketChannel;
  24. import io.netty.channel.socket.nio.NioServerSocketChannel;
  25. import io.netty.util.concurrent.Future;
  26. import io.netty.util.concurrent.GenericFutureListener;
  27. /**
  28. * @author cdk
  29. */
  30. @Component
  31. @ChannelHandler.Sharable
  32. @Scope("prototype")
  33. public class PinShenData86ServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  34. private static final Logger logger = LoggerFactory.getLogger(PinShenData86ServerHandler.class);
  35. @Override
  36. protected void handle(ByteBuf in, Channel channel) throws Exception {
  37. PinShenWaterDevice device = PinShenWaterUtils.handle(in);
  38. if (device != null)
  39. this.handle(device, channel, in);
  40. }
  41. private void handle(PinShenWaterDevice device, Channel channel, ByteBuf in) throws Exception {
  42. byte typeByte = device.getTypeByte();
  43. byte[] packageArr = device.getPackageArr();
  44. // GPRS方式数据帧
  45. if (device.getTypeByte() == PinShenWaterUtils.data_gps_type) {
  46. String deviceId = device.getDeviceId();
  47. sendMsg2Kafka(getOriginalData(in), deviceId, channel);
  48. }
  49. // GPRS方式链路帧
  50. else if (typeByte == PinShenWaterUtils.link_gps_type) {
  51. if (packageArr.length == 1) {
  52. int link = packageArr[0] & 0xff;
  53. if (link == PinShenWaterUtils.link_req_code) {
  54. byte[] rspBody = { PinShenWaterUtils.link_rsp_code };
  55. reply(channel, device, rspBody);
  56. sendReq(channel, device, createReqDataArr(), PinShenWaterUtils.data_gps_type);
  57. }
  58. }
  59. }
  60. }
  61. private byte[] createReqDataArr() {
  62. byte[] req = new byte[] { 0x02, 0x04, 0x75, 0x31, 0x00, 0x05 };
  63. byte[] crcCode = CRCUtil.getCrc16(req);
  64. byte[] reqBytes = ArrayUtils.addAll(req, crcCode);
  65. return reqBytes;
  66. }
  67. private void reply(Channel channel, PinShenWaterDevice device, byte[] rspBody) {
  68. ByteBuf buffer = Unpooled.buffer();
  69. byte[] rsp = new byte[] {};
  70. rsp = ArrayUtils.addAll(rsp, device.getIdentityArr());
  71. rsp = ArrayUtils.addAll(rsp, device.getLengthArr());
  72. rsp = ArrayUtils.add(rsp, device.getPackageNo());
  73. rsp = ArrayUtils.add(rsp, device.getTypeByte());
  74. rsp = ArrayUtils.add(rsp, device.getDstAddrLength());
  75. rsp = ArrayUtils.addAll(rsp, device.getDstAddr());
  76. rsp = ArrayUtils.add(rsp, device.getSrcAddrLength());
  77. rsp = ArrayUtils.addAll(rsp, device.getSrcAddr());
  78. rsp = ArrayUtils.addAll(rsp, rspBody);
  79. byte code = PinShenWaterUtils.getXor(rsp);
  80. rsp = ArrayUtils.add(rsp, code);
  81. buffer.writeBytes(rsp);
  82. String result = DatatypeConverter.printHexBinary(rsp);
  83. ChannelFuture channelFuture = channel.write(buffer);
  84. channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
  85. @Override
  86. public void operationComplete(Future<? super Void> future) throws Exception {
  87. logger.info("server reply [{}] to client success", result);
  88. }
  89. });
  90. }
  91. private void sendReq(Channel channel, PinShenWaterDevice device, byte[] rspBody, Byte gpsType) {
  92. ByteBuf buffer = Unpooled.buffer();
  93. byte[] rsp = new byte[] {};
  94. rsp = ArrayUtils.addAll(rsp, device.getIdentityArr());
  95. rsp = ArrayUtils.addAll(rsp, device.getLengthArr());
  96. rsp = ArrayUtils.add(rsp, device.getPackageNo());
  97. rsp = ArrayUtils.add(rsp, gpsType);
  98. rsp = ArrayUtils.add(rsp, device.getDstAddrLength());
  99. rsp = ArrayUtils.addAll(rsp, device.getDstAddr());
  100. rsp = ArrayUtils.add(rsp, device.getSrcAddrLength());
  101. rsp = ArrayUtils.addAll(rsp, device.getSrcAddr());
  102. rsp = ArrayUtils.addAll(rsp, rspBody);
  103. byte code = PinShenWaterUtils.getXor(rsp);
  104. rsp = ArrayUtils.add(rsp, code);
  105. buffer.writeBytes(rsp);
  106. String result = DatatypeConverter.printHexBinary(rsp);
  107. ChannelFuture channelFuture = channel.write(buffer);
  108. channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
  109. @Override
  110. public void operationComplete(Future<? super Void> future) throws Exception {
  111. logger.info("server send request [{}] to client success", result);
  112. }
  113. });
  114. }
  115. private byte[] getOriginalData(ByteBuf in) {
  116. in.resetReaderIndex();
  117. int length = in.readableBytes();
  118. byte[] dataByteArray = new byte[length];
  119. in.readBytes(dataByteArray, 0, in.readableBytes());
  120. return dataByteArray;
  121. }
  122. public void startAcceptor() {
  123. EventLoopGroup bossGroup = new NioEventLoopGroup();
  124. EventLoopGroup workerGroup = new NioEventLoopGroup();
  125. try {
  126. ServerBootstrap b = new ServerBootstrap();
  127. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  128. .childHandler(new ChannelInitializer<SocketChannel>() {
  129. @Override
  130. protected void initChannel(SocketChannel ch) throws Exception {
  131. // ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
  132. // Unpooled.copiedBuffer(tailSplitBytes),
  133. // Unpooled.copiedBuffer(headerSplitBytes),
  134. // Unpooled.copiedBuffer(headerSplitBytes1)));
  135. ch.pipeline().addLast(PinShenData86ServerHandler.this);
  136. }
  137. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  138. ChannelFuture f = b.bind(port).sync();
  139. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  140. this.getPort());
  141. f.channel().closeFuture().sync();
  142. } catch (InterruptedException e) {
  143. logger.error(e.getMessage());
  144. } finally {
  145. cleanRedisLinkData();
  146. workerGroup.shutdownGracefully();
  147. bossGroup.shutdownGracefully();
  148. }
  149. }
  150. }