Ver Fonte

康凯斯接口调整

jianghouwei há 6 anos atrás
pai
commit
0e5a3cb7e0

+ 213 - 201
src/main/java/com/tidecloud/dataacceptance/service/impl/GK309GpsServerHandler.java

@@ -29,228 +29,240 @@ import java.util.Date;
 
 /**
  * @author jhw
- * GK309 卡牌数据
+ * GK309 卡牌数据 康凯斯
  */
 @Component
 @ChannelHandler.Sharable
 @Scope("prototype")
 public class GK309GpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-    /**
-     * 协议信息数据
-     * [ 起始位|包长度|协议号|信息内容|信息序列号|错误校验|停止位 ]
-     */
-    private static final Logger logger = LoggerFactory.getLogger(GK309GpsServerHandler.class);
+	/**
+	 * 协议信息数据
+	 * [ 起始位|包长度|协议号|信息内容|信息序列号|错误校验|停止位 ]
+	 */
+	private static final Logger logger = LoggerFactory.getLogger(GK309GpsServerHandler.class);
 
-    // 起始位
-    private static final Integer START_BITS = 2;// 起始位长度
-    private static final Byte START_BIT = 0x78; //单字节长度数据包
-    private static final Byte START_BIT2 = 0X79;// 双字节长度数据包
+	// 起始位
+	private static final Integer START_BITS = 2;// 起始位长度
+	private static final Byte START_BIT = 0x78; //单字节长度数据包
+	private static final Byte START_BIT2 = 0X79;// 双字节长度数据包
 
 //    private static final int START_BIT = 120; //单字节长度数据包
 //    private static final int START_BIT2 = 121;// 双字节长度数据包
 
-    // 协议号
-    private static final byte LOGIN_MSG = 0x01;// 登录 (必须回复)
-    private static final byte STATUS_MSG = 0x13;// 心跳,状态信息包  (必须回复)
-    private static final byte LOCATION_MSG = 0x10;// 一般GPS 位置信息上传
-    private static final byte CORRECT_TIME_MSG = 0x1F;// 校验时间
+	// 协议号
+	private static final byte LOGIN_MSG = 0x01;// 登录 (必须回复)
+	private static final byte STATUS_MSG = 0x13;// 心跳,状态信息包  (必须回复)
+	private static final byte LOCATION_MSG = 0x10;// 一般GPS 位置信息上传
+	private static final byte CORRECT_TIME_MSG = 0x1F;// 校验时间
 
-    private static final Integer DATA_SIZE = 6;
+	private static final Integer DATA_SIZE = 6;
 
-    @Override
-    protected void handle(ByteBuf in, Channel channel) throws Exception {
-        if (in.isReadable()) {
-            in.markReaderIndex();
-            try {
-                byte b = 0;
-                int index = 0;
-                while (index < START_BITS) {
-                    b = in.readByte();
-                    if (START_BIT2 != b && START_BIT != b) {
-                        channel.close();
-                    }
-                    index++;
-                }
-                /*  单字节长度或者双字节长度 */
-                int length = 0;
-                if (START_BIT == b) {
-                    length = in.readByte() & 0xff; // 一个字节
-                } else  {
-                    length = in.readShort() & 0xffff;// 两个字节
-                }
-                handle(in, length, channel);
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		if (in.isReadable()) {
+			in.markReaderIndex();
+			try {
+				byte b = 0;
+				int index = 0;
+				while (index < START_BITS) {
+					b = in.readByte();
+					if (START_BIT2 != b && START_BIT != b) {
+						channel.close();
+					}
+					index++;
+				}
+				/*  单字节长度或者双字节长度 */
+				int length = 0;
+				if (START_BIT == b) {
+					length = in.readByte() & 0xff; // 一个字节
+				} else {
+					length = in.readShort() & 0xffff;// 两个字节
+				}
+				handle(in, length, channel);
+			} catch (Exception e) {
+				logger.error(e.getMessage(), e);
+			}
+		}
+	}
 
-    private void handle(ByteBuf in, int length, Channel channel) throws Exception {
-        if (in.isReadable()) {
-            String deviceId = channelDeviceMap.get(channel);
-            if (deviceId != null) {
-                MDC.put(MDC_DEVICEID, deviceId);
-            }
-            byte msgType = in.readByte();// 协议号 1个字节
-            if (LOGIN_MSG == msgType) {
-                resolveLoginMSG(in, channel);
-            } else if (STATUS_MSG == msgType) {
-                reply(channel, STATUS_MSG, "心跳包");
-            } else if (LOCATION_MSG == msgType) {
-                logger.info("GPS 定位包");
-               // resolveLocationMSG(in,deviceId);
-                sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
-            } else if (CORRECT_TIME_MSG == msgType) {
-                reply(channel, CORRECT_TIME_MSG, "校时包");
-            } else {
-                logger.info("client send data without handle type ...");
-            }
-        }
+	private void handle(ByteBuf in, int length, Channel channel) throws Exception {
+		if (in.isReadable()) {
+			String deviceId = channelDeviceMap.get(channel);
+			if (deviceId != null) {
+				MDC.put(MDC_DEVICEID, deviceId);
+			}else{
+				channel.close();
+			}
+			byte msgType = in.readByte();// 协议号 1个字节
+			if (LOGIN_MSG == msgType) {
+				resolveLoginMSG(in, channel);
+			} else if (STATUS_MSG == msgType) {
+				reply(channel, STATUS_MSG, "心跳包");
+				byte[] bytes = byteMerger(getOriginalData(in, deviceId), DateUtil.formatDate2String(new Date()).getBytes());
+				sendMsg2Kafka(bytes, deviceId, channel);// 心跳数据对接
+			} else if (LOCATION_MSG == msgType) {
+				logger.info("GPS 定位包");
+				// resolveLocationMSG(in,deviceId);
+				byte[] bytes = byteMerger(getOriginalData(in, deviceId), DateUtil.formatDate2String(new Date()).getBytes());
+				sendMsg2Kafka(bytes, deviceId, channel);
+			} else if (CORRECT_TIME_MSG == msgType) {
+				reply(channel, CORRECT_TIME_MSG, "校时包");
+			} else {
+				logger.info("client send data without handle type ...");
+			}
+		}
 
-    }
+	}
 
-    /**
-     * 登录管理
-     *
-     * @param in
-     * @param channel
-     */
-    private void resolveLoginMSG(ByteBuf in, Channel channel) {
-        /**  终端ID|类型识别码|扩展位*/
-        byte[] deviceIdBytes = new byte[8];// 终端ID [8位]
-        in.readBytes(deviceIdBytes);
-        String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
-        String deviceIdInMap = channelDeviceMap.get(channel);
-        MDC.put(MDC_DEVICEID, deviceId);
-        if (!deviceId.equals(deviceIdInMap)) {
-            manageChannel(channel, deviceId);
-        }
-        reply(channel, LOGIN_MSG, "登录包");
-    }
+	/**
+	 * 登录管理
+	 *
+	 * @param in
+	 * @param channel
+	 */
+	private void resolveLoginMSG(ByteBuf in, Channel channel) {
+		/**  终端ID|类型识别码|扩展位*/
+		byte[] deviceIdBytes = new byte[8];// 终端ID [8位]
+		in.readBytes(deviceIdBytes);
+		String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
+		String deviceIdInMap = channelDeviceMap.get(channel);
+		MDC.put(MDC_DEVICEID, deviceId);
+		if (!deviceId.equals(deviceIdInMap)) {
+			manageChannel(channel, deviceId);
+		}
+		reply(channel, LOGIN_MSG, "登录包");
+	}
 
+	public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
+		byte[] byte_3 = new byte[byte_1.length + byte_2.length];
+		System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
+		System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
+		return byte_3;
+	}
 
-    /**
-     * 回复包
-     *
-     * @param channel
-     * @param msgType
-     */
-    private void reply(Channel channel, byte msgType, String msg) {
 
-        /**
-         * 起始位[]长度[] 协议号[]	序列号[]CRC校验[]停止位[2]
-         */
-        ByteBuf buffer = Unpooled.buffer();
-        byte[] crcBytes = new byte[]{0x05, msgType, 0x00, 0x05}; // CRC校验数据
-        int doCrc = CRCUtil.do_crc(65535, crcBytes);//CRC-ITU
-        byte[] intToByte = NumUtil.intToByte(doCrc, 2);
-        byte[] bytes = new byte[]{0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A};
-        buffer.writeBytes(bytes);
-        ChannelFuture channelFuture = channel.write(buffer);
-        channelFuture.addListener(future -> {
-            String deviceId = channelDeviceMap.get(channel);
-            logger.info("server reply [{}] to client device [{}] success:[{}] ", msg, deviceId, DatatypeConverter.printHexBinary(bytes));
-        });
-    }
+	/**
+	 * 回复包
+	 *
+	 * @param channel
+	 * @param msgType
+	 */
+	private void reply(Channel channel, byte msgType, String msg) {
 
-    /**
-     * 原始数据重新组装
-     *
-     * @param in
-     * @param deviceId
-     * @return
-     */
-    private byte[] getOriginalData(ByteBuf in, String deviceId) {
-        if (StringUtils.isNotBlank(deviceId)) {
-            in.resetReaderIndex();
-            byte[] deviceArr = deviceId.getBytes();
-            int length = in.readableBytes();
-            byte[] dataByteArray = new byte[length + deviceArr.length];
-            in.readBytes(dataByteArray, 0, in.readableBytes());
-            System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
-            return dataByteArray;
-        } else
-            return null;
-    }
+		/**
+		 * 起始位[]长度[] 协议号[]	序列号[]CRC校验[]停止位[2]
+		 */
+		ByteBuf buffer = Unpooled.buffer();
+		byte[] crcBytes = new byte[]{0x05, msgType, 0x00, 0x05}; // CRC校验数据
+		int doCrc = CRCUtil.do_crc(65535, crcBytes);//CRC-ITU
+		byte[] intToByte = NumUtil.intToByte(doCrc, 2);
+		byte[] bytes = new byte[]{0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A};
+		buffer.writeBytes(bytes);
+		ChannelFuture channelFuture = channel.write(buffer);
+		channelFuture.addListener(future -> {
+			String deviceId = channelDeviceMap.get(channel);
+			logger.info("server reply [{}] to client device [{}] success:[{}] ", msg, deviceId, DatatypeConverter.printHexBinary(bytes));
+		});
+	}
 
-    private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
-        YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
-        StringBuffer dateTimeStrBuf = new StringBuffer();
-        int indexOfDateTime = 0;
-        while (indexOfDateTime < DATA_SIZE) {
-            byte b = in.readByte();
-            dateTimeStrBuf.append(NumUtil.byte2String(b));
-            indexOfDateTime++;
-        }
-        // 日期
-        yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
-        // gps信息卫星数
-        yiTongGPSDevice.setGpsCount(in.readByte());
-        // 维度
-        yiTongGPSDevice.setLat(in.readInt());
-        // 经度
-        yiTongGPSDevice.setLng(in.readInt());
-        // 速度
-        yiTongGPSDevice.setSpeedbyte(in.readByte());
-        // 航向
-        yiTongGPSDevice.setCourseStatus(in.readShort());
-        // 国家代号
-        yiTongGPSDevice.setMcc(in.readShort());
-        // 移动网号码
-        yiTongGPSDevice.setMnc(in.readByte());
-        // 位置区码
-        yiTongGPSDevice.setLac(in.readShort());
-        // 移动基站Cell Tower ID
-        yiTongGPSDevice.setCellId(in.readMedium());
-        yiTongGPSDevice.setAcc(in.readByte());
-        // 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
-        yiTongGPSDevice.setReportModel(in.readByte());
-        // 0x01:实时 0x00:补传
-        yiTongGPSDevice.setIsmendMsg(in.readByte());
-        double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
-        // 里程设备默认是关闭的,需要指令,设备端才发送
-        yiTongGPSDevice.setDeviceId(deviceId);
-        yiTongGPSDevice.setMileage(mileage);
-        yiTongGPSDevice.setDataType(1);
-        // 写文件操作
-        String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
-        FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
-    }
+	/**
+	 * 原始数据重新组装
+	 *
+	 * @param in
+	 * @param deviceId
+	 * @return
+	 */
+	private byte[] getOriginalData(ByteBuf in, String deviceId) {
+		if (StringUtils.isNotBlank(deviceId)) {
+			in.resetReaderIndex();
+			byte[] deviceArr = deviceId.getBytes();
+			int length = in.readableBytes();
+			byte[] dataByteArray = new byte[length + deviceArr.length];
+			in.readBytes(dataByteArray, 0, in.readableBytes());
+			System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
+			return dataByteArray;
+		} else
+			return null;
+	}
 
-    /**
-     * 按照开始,结束标记位读取数据
-     */
-    @Override
-    public void startAcceptor() {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        byte[] tailSplitBytes = new byte[]{0x0D, 0x0A};
-        byte[] headerSplitBytes = new byte[]{0x78, 0x78};
-        byte[] headerSplitBytes1 = new byte[]{0x79, 0x79};
-        try {
-            ServerBootstrap b = new ServerBootstrap();
-            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        protected void initChannel(SocketChannel ch) throws Exception {
-                            ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
-                                    Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
-                                    Unpooled.copiedBuffer(headerSplitBytes1)));
-                            ch.pipeline().addLast(GK309GpsServerHandler.this);
-                        }
-                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
-            ChannelFuture f = b.bind(port).sync();
-            f.channel().closeFuture().sync();
-            logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
-                    this.getPort());
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
-        } finally {
-            cleanRedisLinkData();
-            workerGroup.shutdownGracefully();
-            bossGroup.shutdownGracefully();
-        }
-    }
+	private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
+		YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
+		StringBuffer dateTimeStrBuf = new StringBuffer();
+		int indexOfDateTime = 0;
+		while (indexOfDateTime < DATA_SIZE) {
+			byte b = in.readByte();
+			dateTimeStrBuf.append(NumUtil.byte2String(b));
+			indexOfDateTime++;
+		}
+		// 日期
+		yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
+		// gps信息卫星数
+		yiTongGPSDevice.setGpsCount(in.readByte());
+		// 维度
+		yiTongGPSDevice.setLat(in.readInt());
+		// 经度
+		yiTongGPSDevice.setLng(in.readInt());
+		// 速度
+		yiTongGPSDevice.setSpeedbyte(in.readByte());
+		// 航向
+		yiTongGPSDevice.setCourseStatus(in.readShort());
+		// 国家代号
+		yiTongGPSDevice.setMcc(in.readShort());
+		// 移动网号码
+		yiTongGPSDevice.setMnc(in.readByte());
+		// 位置区码
+		yiTongGPSDevice.setLac(in.readShort());
+		// 移动基站Cell Tower ID
+		yiTongGPSDevice.setCellId(in.readMedium());
+		yiTongGPSDevice.setAcc(in.readByte());
+		// 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
+		yiTongGPSDevice.setReportModel(in.readByte());
+		// 0x01:实时 0x00:补传
+		yiTongGPSDevice.setIsmendMsg(in.readByte());
+		double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
+		// 里程设备默认是关闭的,需要指令,设备端才发送
+		yiTongGPSDevice.setDeviceId(deviceId);
+		yiTongGPSDevice.setMileage(mileage);
+		yiTongGPSDevice.setDataType(1);
+		// 写文件操作
+		String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
+		FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
+	}
+
+	/**
+	 * 按照开始,结束标记位读取数据
+	 */
+	@Override
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		byte[] tailSplitBytes = new byte[]{0x0D, 0x0A};
+		byte[] headerSplitBytes = new byte[]{0x78, 0x78};
+		byte[] headerSplitBytes1 = new byte[]{0x79, 0x79};
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
+									Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
+									Unpooled.copiedBuffer(headerSplitBytes1)));
+							ch.pipeline().addLast(GK309GpsServerHandler.this);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+			ChannelFuture f = b.bind(port).sync();
+			f.channel().closeFuture().sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+		} catch (InterruptedException e) {
+			logger.error(e.getMessage());
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
 
 }