YiTongGpsServerHandler.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import java.util.Date;
  3. import javax.xml.bind.DatatypeConverter;
  4. import org.apache.commons.lang.StringUtils;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.slf4j.MDC;
  8. import org.springframework.context.annotation.Scope;
  9. import org.springframework.stereotype.Component;
  10. import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder;
  11. import com.tidecloud.dataacceptance.common.CRCUtil;
  12. import com.tidecloud.dataacceptance.common.DateUtil;
  13. import com.tidecloud.dataacceptance.common.NumUtil;
  14. import com.tidecloud.dataacceptance.entity.YiTongGPSDevice;
  15. import com.tidecloud.dataacceptance.entity.YiTongGpsForWarnDevice;
  16. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  17. import com.tidecloud.dataacceptance.util.FileUtils;
  18. import io.netty.bootstrap.ServerBootstrap;
  19. import io.netty.buffer.ByteBuf;
  20. import io.netty.buffer.Unpooled;
  21. import io.netty.channel.Channel;
  22. import io.netty.channel.ChannelFuture;
  23. import io.netty.channel.ChannelHandler;
  24. import io.netty.channel.ChannelInitializer;
  25. import io.netty.channel.ChannelOption;
  26. import io.netty.channel.EventLoopGroup;
  27. import io.netty.channel.nio.NioEventLoopGroup;
  28. import io.netty.channel.socket.SocketChannel;
  29. import io.netty.channel.socket.nio.NioServerSocketChannel;
  30. import io.netty.util.concurrent.Future;
  31. import io.netty.util.concurrent.GenericFutureListener;
  32. /**
  33. * @author cdk
  34. */
  35. @Component
  36. @ChannelHandler.Sharable
  37. @Scope("prototype")
  38. public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  39. private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
  40. private static final Integer START_BITS = 2;
  41. private static final Byte START_BIT = 0x78;
  42. private static final Byte START_BIT2 = 0X79;
  43. private static final byte LOGIN_MSG = 0x01;
  44. private static final byte LOCATION_MSG = 0x22;
  45. private static final byte STATUS_MSG = 0x13;
  46. private static final byte WARNING_MSG = 0x26;
  47. private static final byte CORRECT_TIME_MSG = (byte) 0x8A;
  48. private static final byte VOLTAGE_MSG = (byte) 0x94;
  49. private static final byte VOLTAGE_SUB_MSG = (byte) 0x00;
  50. private static final byte COMMAND_COPY_MSG = 0x15;
  51. private static final Integer DATA_SIZE = 6;
  52. @Override
  53. protected void handle(ByteBuf in, Channel channel) throws Exception {
  54. if (in.isReadable()) {
  55. in.markReaderIndex();
  56. try {
  57. int index = 0;
  58. byte b = 0;
  59. while (index < START_BITS) {
  60. b = in.readByte();
  61. if (START_BIT != b && START_BIT2 != b) {
  62. channel.close();
  63. }
  64. index++;
  65. }
  66. int length = 0;
  67. if (START_BIT == b) {
  68. length = in.readByte() & 0xff;
  69. } else {
  70. length = in.readShort() & 0xffff;
  71. }
  72. handle(in, length, channel);
  73. } catch (Exception e) {
  74. logger.error(e.getMessage(), e);
  75. }
  76. }
  77. }
  78. private byte[] getOriginalData(ByteBuf in, String deviceId) {
  79. if (StringUtils.isNotBlank(deviceId)) {
  80. in.resetReaderIndex();
  81. byte[] deviceArr = deviceId.getBytes();
  82. int length = in.readableBytes();
  83. byte[] dataByteArray = new byte[length + deviceArr.length];
  84. in.readBytes(dataByteArray,0,in.readableBytes());
  85. System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
  86. return dataByteArray;
  87. } else
  88. return null;
  89. }
  90. private void handle(ByteBuf in, int length, Channel channel) throws Exception {
  91. if (in.isReadable()) {
  92. String deviceId = channelDeviceMap.get(channel);
  93. if (deviceId!=null) {
  94. MDC.put(MDC_DEVICEID, deviceId);
  95. }
  96. byte msgType = in.readByte();
  97. if (LOGIN_MSG == msgType) {
  98. resolveLoginMSG(in, channel);
  99. } else if (LOCATION_MSG == msgType) {
  100. logger.info("GPS 定位包");
  101. sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
  102. // resolveLocationMSG(in, deviceId);
  103. } else if (STATUS_MSG == msgType) {
  104. reply(channel, STATUS_MSG);
  105. } else if (WARNING_MSG == msgType) {
  106. logger.info("报警数据(UTC)");
  107. reply(channel, WARNING_MSG);
  108. sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
  109. } else if (CORRECT_TIME_MSG == msgType) {
  110. reply(channel, CORRECT_TIME_MSG);
  111. } else if (VOLTAGE_MSG == msgType) {
  112. logger.info("信息传输通用包");
  113. sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
  114. // resolveVoltageMSG(in, channel);
  115. } else if (COMMAND_COPY_MSG == msgType) {
  116. // resolveCommandCopyMSG(in, channel, length);
  117. } else {
  118. logger.info("client send data without handle type ...");
  119. }
  120. }
  121. }
  122. private void resolveCommandCopyMSG(ByteBuf in, Channel channel, Integer length) {
  123. byte readByte = in.readByte();
  124. int checkCode = in.readInt();
  125. if (checkCode != 0) {
  126. logger.error("illegal checkcode [{}]", checkCode);
  127. } else {
  128. // byte codingType = in.readByte();
  129. byte[] coypByes = new byte[length - 14];
  130. in.readBytes(coypByes);
  131. String copyStr = new String(coypByes);
  132. logger.info("设备指令回复 [{}]", copyStr);
  133. }
  134. }
  135. private void resolveWarningMsg(ByteBuf in, String deviceId) {
  136. YiTongGpsForWarnDevice yiTongGPSDevice = new YiTongGpsForWarnDevice();
  137. StringBuffer dateTimeStrBuf = new StringBuffer();
  138. int indexOfDateTime = 0;
  139. while (indexOfDateTime < DATA_SIZE) {
  140. byte b = in.readByte();
  141. dateTimeStrBuf.append(NumUtil.byte2String(b));
  142. indexOfDateTime++;
  143. }
  144. // 日期
  145. yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
  146. // gps信息卫星数
  147. yiTongGPSDevice.setGpsCount(in.readByte());
  148. // 维度
  149. yiTongGPSDevice.setLat(in.readInt());
  150. // 经度
  151. yiTongGPSDevice.setLng(in.readInt());
  152. // 速度
  153. // yiTongGPSDevice.setSpeedbyte(in.readByte());
  154. // 航向
  155. yiTongGPSDevice.setCourseStatus(in.readShort());
  156. in.readByte();
  157. // 国家代号
  158. yiTongGPSDevice.setMcc(in.readShort());
  159. // 移动网号码
  160. yiTongGPSDevice.setMnc(in.readByte());
  161. // 位置区码
  162. yiTongGPSDevice.setLac(in.readShort());
  163. in.readMedium();
  164. yiTongGPSDevice.setTerminalMsg((int) in.readByte());
  165. yiTongGPSDevice.setElectric((int) in.readByte());
  166. yiTongGPSDevice.setGmsSign((int) in.readByte());
  167. yiTongGPSDevice.setWarningReason((int) in.readByte());
  168. in.readByte();
  169. yiTongGPSDevice.setDeviceId(deviceId);
  170. // 写文件操作
  171. String deviceStr = yiTongGPSDevice.buildDeviceStr();
  172. FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
  173. }
  174. private void resolveVoltageMSG(ByteBuf in, Channel channel) {
  175. String deviceId = channelDeviceMap.get(channel);
  176. byte subMsgType = in.readByte();
  177. if (VOLTAGE_SUB_MSG == subMsgType) {
  178. short voltage = in.readShort();
  179. Double voltageDouble = NumUtil.toFixed2Place((double) voltage);
  180. String date = DateUtil.formatDate2String(DateUtil.calculateByHour(new Date(), -8));
  181. YiTongGPSDevice yiTongGPSDevice = buildYiTongGpsDevcie(voltageDouble, deviceId, date);
  182. String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
  183. FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
  184. }
  185. }
  186. private YiTongGPSDevice buildYiTongGpsDevcie(Double voltageDouble, String deviceId, String date) {
  187. return new YiTongGPSDevice(deviceId, date, null, null, null, null, null, null, null, null, null, null, null,
  188. null, null, null, voltageDouble, 0);
  189. }
  190. private void resolveLoginMSG(ByteBuf in, Channel channel) {
  191. byte[] deviceIdBytes = new byte[8];
  192. in.readBytes(deviceIdBytes);
  193. String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
  194. MDC.put(MDC_DEVICEID, deviceId);
  195. // 回复和链接管理
  196. String deviceIdInMap = channelDeviceMap.get(channel);
  197. if (!deviceId.equals(deviceIdInMap)) {
  198. manageChannel(channel, deviceId);
  199. }
  200. reply(channel, LOGIN_MSG);
  201. }
  202. private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
  203. YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
  204. StringBuffer dateTimeStrBuf = new StringBuffer();
  205. int indexOfDateTime = 0;
  206. while (indexOfDateTime < DATA_SIZE) {
  207. byte b = in.readByte();
  208. dateTimeStrBuf.append(NumUtil.byte2String(b));
  209. indexOfDateTime++;
  210. }
  211. // 日期
  212. yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
  213. // gps信息卫星数
  214. yiTongGPSDevice.setGpsCount(in.readByte());
  215. // 维度
  216. yiTongGPSDevice.setLat(in.readInt());
  217. // 经度
  218. yiTongGPSDevice.setLng(in.readInt());
  219. // 速度
  220. yiTongGPSDevice.setSpeedbyte(in.readByte());
  221. // 航向
  222. yiTongGPSDevice.setCourseStatus(in.readShort());
  223. // 国家代号
  224. yiTongGPSDevice.setMcc(in.readShort());
  225. // 移动网号码
  226. yiTongGPSDevice.setMnc(in.readByte());
  227. // 位置区码
  228. yiTongGPSDevice.setLac(in.readShort());
  229. // 移动基站Cell Tower ID
  230. yiTongGPSDevice.setCellId(in.readMedium());
  231. yiTongGPSDevice.setAcc(in.readByte());
  232. // 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
  233. yiTongGPSDevice.setReportModel(in.readByte());
  234. // 0x01:实时 0x00:补传
  235. yiTongGPSDevice.setIsmendMsg(in.readByte());
  236. double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
  237. // 里程设备默认是关闭的,需要指令,设备端才发送
  238. yiTongGPSDevice.setDeviceId(deviceId);
  239. yiTongGPSDevice.setMileage(mileage);
  240. yiTongGPSDevice.setDataType(1);
  241. // 写文件操作
  242. String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
  243. FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
  244. }
  245. private void reply(Channel channel, byte msgType) {
  246. ByteBuf buffer = Unpooled.buffer();
  247. byte[] crcBytes = new byte[] { 0x05, msgType, 0x00, 0x05 };
  248. int doCrc = CRCUtil.do_crc(65535, crcBytes);
  249. byte[] intToByte = NumUtil.intToByte(doCrc, 2);
  250. byte[] bytes = new byte[] { 0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A };
  251. buffer.writeBytes(bytes);
  252. ChannelFuture channelFuture = channel.write(buffer);
  253. channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
  254. @Override
  255. public void operationComplete(Future<? super Void> future) throws Exception {
  256. String deviceId = channelDeviceMap.get(channel);
  257. String type = "";
  258. switch (msgType) {
  259. case STATUS_MSG:
  260. type = "心跳包";
  261. break;
  262. case WARNING_MSG:
  263. type = "报警数据(UTC)";
  264. break;
  265. case CORRECT_TIME_MSG:
  266. type = "校时包";
  267. break;
  268. case LOGIN_MSG:
  269. type = "登录包";
  270. break;
  271. default:
  272. break;
  273. }
  274. logger.info("server reply [{}] to client device [{}] success:[{}] ",type,deviceId, DatatypeConverter.printHexBinary(bytes));
  275. }
  276. });
  277. }
  278. public void startAcceptor() {
  279. EventLoopGroup bossGroup = new NioEventLoopGroup();
  280. EventLoopGroup workerGroup = new NioEventLoopGroup();
  281. byte[] headerSplitBytes = new byte[] { 0x78, 0x78 };
  282. byte[] headerSplitBytes1 = new byte[] { 0x79, 0x79 };
  283. byte[] tailSplitBytes = new byte[] { 0x0D, 0x0A };
  284. try {
  285. ServerBootstrap b = new ServerBootstrap();
  286. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  287. .childHandler(new ChannelInitializer<SocketChannel>() {
  288. @Override
  289. protected void initChannel(SocketChannel ch) throws Exception {
  290. ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
  291. Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
  292. Unpooled.copiedBuffer(headerSplitBytes1)));
  293. ch.pipeline().addLast(YiTongGpsServerHandler.this);
  294. }
  295. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  296. ChannelFuture f = b.bind(port).sync();
  297. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  298. this.getPort());
  299. f.channel().closeFuture().sync();
  300. } catch (InterruptedException e) {
  301. logger.error(e.getMessage());
  302. } finally {
  303. cleanRedisLinkData();
  304. workerGroup.shutdownGracefully();
  305. bossGroup.shutdownGracefully();
  306. }
  307. }
  308. }