Browse Source

康凯斯- 开发 增加 .release() 回收

jianghouwei 6 years ago
parent
commit
8632604e4c

+ 215 - 213
src/main/java/com/tidecloud/dataacceptance/service/impl/GK309GpsServerHandler.java

@@ -36,235 +36,237 @@ import java.util.Date;
 @Scope("prototype")
 public class GK309GpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-	/**
-	 * 协议信息数据
-	 * [ 起始位|包长度|协议号|信息内容|信息序列号|错误校验|停止位 ]
-	 */
-	private static final Logger logger = LoggerFactory.getLogger(GK309GpsServerHandler.class);
+    /**
+     * 协议信息数据
+     * [ 起始位|包长度|协议号|信息内容|信息序列号|错误校验|停止位 ]
+     */
+    private static final Logger logger = LoggerFactory.getLogger(GK309GpsServerHandler.class);
 
-	// 起始位
-	private static final Integer START_BITS = 2;// 起始位长度
-	private static final Byte START_BIT = 0x78; //单字节长度数据包
-	private static final Byte START_BIT2 = 0X79;// 双字节长度数据包
+    // 起始位
+    private static final Integer START_BITS = 2;// 起始位长度
+    private static final Byte START_BIT = 0x78; //单字节长度数据包
+    private static final Byte START_BIT2 = 0X79;// 双字节长度数据包
 
 //    private static final int START_BIT = 120; //单字节长度数据包
 //    private static final int START_BIT2 = 121;// 双字节长度数据包
 
-	// 协议号
-	private static final byte LOGIN_MSG = 0x01;// 登录 (必须回复)
-	private static final byte STATUS_MSG = 0x13;// 心跳,状态信息包  (必须回复)
-	private static final byte LOCATION_MSG = 0x10;// 一般GPS 位置信息上传
-	private static final byte CORRECT_TIME_MSG = 0x1F;// 校验时间
+    // 协议号
+    private static final byte LOGIN_MSG = 0x01;// 登录 (必须回复)
+    private static final byte STATUS_MSG = 0x13;// 心跳,状态信息包  (必须回复)
+    private static final byte LOCATION_MSG = 0x10;// 一般GPS 位置信息上传
+    private static final byte CORRECT_TIME_MSG = 0x1F;// 校验时间
 
-	private static final Integer DATA_SIZE = 6;
+    private static final Integer DATA_SIZE = 6;
 
-	@Override
-	protected void handle(ByteBuf in, Channel channel) throws Exception {
-		if (in.isReadable()) {
-			in.markReaderIndex();
-			try {
-				byte b = 0;
-				int index = 0;
-				while (index < START_BITS) {
-					b = in.readByte();
-					if (START_BIT2 != b && START_BIT != b) {
-						channel.close();
-					}
-					index++;
-				}
-				/*  单字节长度或者双字节长度 */
-				int length = 0;
-				if (START_BIT == b) {
-					length = in.readByte() & 0xff; // 一个字节
-				} else {
-					length = in.readShort() & 0xffff;// 两个字节
-				}
-				handle(in, length, channel);
-			} catch (Exception e) {
-				logger.error(e.getMessage(), e);
-			}
-		}
-	}
+    @Override
+    protected void handle(ByteBuf in, Channel channel) throws Exception {
+        if (in.isReadable()) {
+            in.markReaderIndex();
+            try {
+                byte b = 0;
+                int index = 0;
+                while (index < START_BITS) {
+                    b = in.readByte();
+                    if (START_BIT2 != b && START_BIT != b) {
+                        channel.close();
+                    }
+                    index++;
+                }
+                /*  单字节长度或者双字节长度 */
+                int length = 0;
+                if (START_BIT == b) {
+                    length = in.readByte() & 0xff; // 一个字节
+                } else {
+                    length = in.readShort() & 0xffff;// 两个字节
+                }
+                handle(in, length, channel);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            } finally {
+                in.release();
+            }
+        }
+    }
 
-	private void handle(ByteBuf in, int length, Channel channel) throws Exception {
-		if (in.isReadable()) {
-			byte msgType = in.readByte();// 协议号 1个字节
-			String deviceId = channelDeviceMap.get(channel);
-			if (deviceId != null) {
-				MDC.put(MDC_DEVICEID, deviceId);
-			} else {
-				if (LOGIN_MSG != msgType) {
-					channel.close();
-				}
-			}
-			if (LOGIN_MSG == msgType) {
-				resolveLoginMSG(in, channel);
-			} else if (STATUS_MSG == msgType) {
-				reply(channel, STATUS_MSG, "心跳包");
-				byte[] bytes = byteMerger(getOriginalData(in, deviceId), DateUtil.formatDate2String(new Date()).getBytes());
-				sendMsg2Kafka(bytes, deviceId, channel);// 心跳数据对接
-			} else if (LOCATION_MSG == msgType) {
-				logger.info("GPS 定位包");
-				// resolveLocationMSG(in,deviceId);
-				byte[] bytes = byteMerger(getOriginalData(in, deviceId), DateUtil.formatDate2String(new Date()).getBytes());
-				sendMsg2Kafka(bytes, deviceId, channel);
-			} else if (CORRECT_TIME_MSG == msgType) {
-				reply(channel, CORRECT_TIME_MSG, "校时包");
-			} else {
-				logger.info("client send data without handle type ...");
-			}
-		}
+    private void handle(ByteBuf in, int length, Channel channel) throws Exception {
+        if (in.isReadable()) {
+            byte msgType = in.readByte();// 协议号 1个字节
+            String deviceId = channelDeviceMap.get(channel);
+            if (deviceId != null) {
+                MDC.put(MDC_DEVICEID, deviceId);
+            } else {
+                if (LOGIN_MSG != msgType) {
+                    channel.close();
+                }
+            }
+            if (LOGIN_MSG == msgType) {
+                resolveLoginMSG(in, channel);
+            } else if (STATUS_MSG == msgType) {
+                reply(channel, STATUS_MSG, "心跳包");
+                byte[] bytes = byteMerger(getOriginalData(in, deviceId), DateUtil.formatDate2String(new Date()).getBytes());
+                sendMsg2Kafka(bytes, deviceId, channel);// 心跳数据对接
+            } else if (LOCATION_MSG == msgType) {
+                logger.info("GPS 定位包");
+                // resolveLocationMSG(in,deviceId);
+                byte[] bytes = byteMerger(getOriginalData(in, deviceId), DateUtil.formatDate2String(new Date()).getBytes());
+                sendMsg2Kafka(bytes, deviceId, channel);
+            } else if (CORRECT_TIME_MSG == msgType) {
+                reply(channel, CORRECT_TIME_MSG, "校时包");
+            } else {
+                logger.info("client send data without handle type ...");
+            }
+        }
 
-	}
+    }
 
-	/**
-	 * 登录管理
-	 *
-	 * @param in
-	 * @param channel
-	 */
-	private void resolveLoginMSG(ByteBuf in, Channel channel) {
-		/**  终端ID|类型识别码|扩展位*/
-		byte[] deviceIdBytes = new byte[8];// 终端ID [8位]
-		in.readBytes(deviceIdBytes);
-		String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
-		String deviceIdInMap = channelDeviceMap.get(channel);
-		MDC.put(MDC_DEVICEID, deviceId);
-		if (!deviceId.equals(deviceIdInMap)) {
-			manageChannel(channel, deviceId);
-		}
-		reply(channel, LOGIN_MSG, "登录包");
-	}
+    /**
+     * 登录管理
+     *
+     * @param in
+     * @param channel
+     */
+    private void resolveLoginMSG(ByteBuf in, Channel channel) {
+        /**  终端ID|类型识别码|扩展位*/
+        byte[] deviceIdBytes = new byte[8];// 终端ID [8位]
+        in.readBytes(deviceIdBytes);
+        String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
+        String deviceIdInMap = channelDeviceMap.get(channel);
+        MDC.put(MDC_DEVICEID, deviceId);
+        if (!deviceId.equals(deviceIdInMap)) {
+            manageChannel(channel, deviceId);
+        }
+        reply(channel, LOGIN_MSG, "登录包");
+    }
 
-	public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
-		byte[] byte_3 = new byte[byte_1.length + byte_2.length];
-		System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
-		System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
-		return byte_3;
-	}
+    public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
+        byte[] byte_3 = new byte[byte_1.length + byte_2.length];
+        System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
+        System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
+        return byte_3;
+    }
 
 
-	/**
-	 * 回复包
-	 *
-	 * @param channel
-	 * @param msgType
-	 */
-	private void reply(Channel channel, byte msgType, String msg) {
+    /**
+     * 回复包
+     *
+     * @param channel
+     * @param msgType
+     */
+    private void reply(Channel channel, byte msgType, String msg) {
 
-		/**
-		 * 起始位[]长度[] 协议号[]	序列号[]CRC校验[]停止位[2]
-		 */
-		ByteBuf buffer = Unpooled.buffer();
-		byte[] crcBytes = new byte[]{0x05, msgType, 0x00, 0x05}; // CRC校验数据
-		int doCrc = CRCUtil.do_crc(65535, crcBytes);//CRC-ITU
-		byte[] intToByte = NumUtil.intToByte(doCrc, 2);
-		byte[] bytes = new byte[]{0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A};
-		buffer.writeBytes(bytes);
-		ChannelFuture channelFuture = channel.write(buffer);
-		channelFuture.addListener(future -> {
-			String deviceId = channelDeviceMap.get(channel);
-			logger.info("server reply [{}] to client device [{}] success:[{}] ", msg, deviceId, DatatypeConverter.printHexBinary(bytes));
-		});
-	}
+        /**
+         * 起始位[]长度[] 协议号[]	序列号[]CRC校验[]停止位[2]
+         */
+        ByteBuf buffer = Unpooled.buffer();
+        byte[] crcBytes = new byte[]{0x05, msgType, 0x00, 0x05}; // CRC校验数据
+        int doCrc = CRCUtil.do_crc(65535, crcBytes);//CRC-ITU
+        byte[] intToByte = NumUtil.intToByte(doCrc, 2);
+        byte[] bytes = new byte[]{0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A};
+        buffer.writeBytes(bytes);
+        ChannelFuture channelFuture = channel.write(buffer);
+        channelFuture.addListener(future -> {
+            String deviceId = channelDeviceMap.get(channel);
+            logger.info("server reply [{}] to client device [{}] success:[{}] ", msg, deviceId, DatatypeConverter.printHexBinary(bytes));
+        });
+    }
 
-	/**
-	 * 原始数据重新组装
-	 *
-	 * @param in
-	 * @param deviceId
-	 * @return
-	 */
-	private byte[] getOriginalData(ByteBuf in, String deviceId) {
-		if (StringUtils.isNotBlank(deviceId)) {
-			in.resetReaderIndex();
-			byte[] deviceArr = deviceId.getBytes();
-			int length = in.readableBytes();
-			byte[] dataByteArray = new byte[length + deviceArr.length];
-			in.readBytes(dataByteArray, 0, in.readableBytes());
-			System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
-			return dataByteArray;
-		} else
-			return null;
-	}
+    /**
+     * 原始数据重新组装
+     *
+     * @param in
+     * @param deviceId
+     * @return
+     */
+    private byte[] getOriginalData(ByteBuf in, String deviceId) {
+        if (StringUtils.isNotBlank(deviceId)) {
+            in.resetReaderIndex();
+            byte[] deviceArr = deviceId.getBytes();
+            int length = in.readableBytes();
+            byte[] dataByteArray = new byte[length + deviceArr.length];
+            in.readBytes(dataByteArray, 0, in.readableBytes());
+            System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
+            return dataByteArray;
+        } else
+            return null;
+    }
 
-	private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
-		YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
-		StringBuffer dateTimeStrBuf = new StringBuffer();
-		int indexOfDateTime = 0;
-		while (indexOfDateTime < DATA_SIZE) {
-			byte b = in.readByte();
-			dateTimeStrBuf.append(NumUtil.byte2String(b));
-			indexOfDateTime++;
-		}
-		// 日期
-		yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
-		// gps信息卫星数
-		yiTongGPSDevice.setGpsCount(in.readByte());
-		// 维度
-		yiTongGPSDevice.setLat(in.readInt());
-		// 经度
-		yiTongGPSDevice.setLng(in.readInt());
-		// 速度
-		yiTongGPSDevice.setSpeedbyte(in.readByte());
-		// 航向
-		yiTongGPSDevice.setCourseStatus(in.readShort());
-		// 国家代号
-		yiTongGPSDevice.setMcc(in.readShort());
-		// 移动网号码
-		yiTongGPSDevice.setMnc(in.readByte());
-		// 位置区码
-		yiTongGPSDevice.setLac(in.readShort());
-		// 移动基站Cell Tower ID
-		yiTongGPSDevice.setCellId(in.readMedium());
-		yiTongGPSDevice.setAcc(in.readByte());
-		// 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
-		yiTongGPSDevice.setReportModel(in.readByte());
-		// 0x01:实时 0x00:补传
-		yiTongGPSDevice.setIsmendMsg(in.readByte());
-		double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
-		// 里程设备默认是关闭的,需要指令,设备端才发送
-		yiTongGPSDevice.setDeviceId(deviceId);
-		yiTongGPSDevice.setMileage(mileage);
-		yiTongGPSDevice.setDataType(1);
-		// 写文件操作
-		String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
-		FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
-	}
+    private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
+        YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
+        StringBuffer dateTimeStrBuf = new StringBuffer();
+        int indexOfDateTime = 0;
+        while (indexOfDateTime < DATA_SIZE) {
+            byte b = in.readByte();
+            dateTimeStrBuf.append(NumUtil.byte2String(b));
+            indexOfDateTime++;
+        }
+        // 日期
+        yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
+        // gps信息卫星数
+        yiTongGPSDevice.setGpsCount(in.readByte());
+        // 维度
+        yiTongGPSDevice.setLat(in.readInt());
+        // 经度
+        yiTongGPSDevice.setLng(in.readInt());
+        // 速度
+        yiTongGPSDevice.setSpeedbyte(in.readByte());
+        // 航向
+        yiTongGPSDevice.setCourseStatus(in.readShort());
+        // 国家代号
+        yiTongGPSDevice.setMcc(in.readShort());
+        // 移动网号码
+        yiTongGPSDevice.setMnc(in.readByte());
+        // 位置区码
+        yiTongGPSDevice.setLac(in.readShort());
+        // 移动基站Cell Tower ID
+        yiTongGPSDevice.setCellId(in.readMedium());
+        yiTongGPSDevice.setAcc(in.readByte());
+        // 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
+        yiTongGPSDevice.setReportModel(in.readByte());
+        // 0x01:实时 0x00:补传
+        yiTongGPSDevice.setIsmendMsg(in.readByte());
+        double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
+        // 里程设备默认是关闭的,需要指令,设备端才发送
+        yiTongGPSDevice.setDeviceId(deviceId);
+        yiTongGPSDevice.setMileage(mileage);
+        yiTongGPSDevice.setDataType(1);
+        // 写文件操作
+        String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
+        FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
+    }
 
-	/**
-	 * 按照开始,结束标记位读取数据
-	 */
-	@Override
-	public void startAcceptor() {
-		EventLoopGroup bossGroup = new NioEventLoopGroup();
-		EventLoopGroup workerGroup = new NioEventLoopGroup();
-		byte[] tailSplitBytes = new byte[]{0x0D, 0x0A};
-		byte[] headerSplitBytes = new byte[]{0x78, 0x78};
-		byte[] headerSplitBytes1 = new byte[]{0x79, 0x79};
-		try {
-			ServerBootstrap b = new ServerBootstrap();
-			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-					.childHandler(new ChannelInitializer<SocketChannel>() {
-						@Override
-						protected void initChannel(SocketChannel ch) throws Exception {
-							ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
-									Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
-									Unpooled.copiedBuffer(headerSplitBytes1)));
-							ch.pipeline().addLast(GK309GpsServerHandler.this);
-						}
-					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
-			ChannelFuture f = b.bind(port).sync();
-			f.channel().closeFuture().sync();
-			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
-					this.getPort());
-		} catch (InterruptedException e) {
-			logger.error(e.getMessage());
-		} finally {
-			cleanRedisLinkData();
-			workerGroup.shutdownGracefully();
-			bossGroup.shutdownGracefully();
-		}
-	}
+    /**
+     * 按照开始,结束标记位读取数据
+     */
+    @Override
+    public void startAcceptor() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        byte[] tailSplitBytes = new byte[]{0x0D, 0x0A};
+        byte[] headerSplitBytes = new byte[]{0x78, 0x78};
+        byte[] headerSplitBytes1 = new byte[]{0x79, 0x79};
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        protected void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
+                                    Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
+                                    Unpooled.copiedBuffer(headerSplitBytes1)));
+                            ch.pipeline().addLast(GK309GpsServerHandler.this);
+                        }
+                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+            ChannelFuture f = b.bind(port).sync();
+            f.channel().closeFuture().sync();
+            logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+                    this.getPort());
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            cleanRedisLinkData();
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
 
 }