123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package com.tidecloud.dataacceptance.service.impl;
- import java.util.Date;
- import javax.xml.bind.DatatypeConverter;
- import org.apache.commons.lang.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder;
- import com.tidecloud.dataacceptance.common.CRCUtil;
- import com.tidecloud.dataacceptance.common.DateUtil;
- import com.tidecloud.dataacceptance.common.NumUtil;
- import com.tidecloud.dataacceptance.entity.YiTongGPSDevice;
- import com.tidecloud.dataacceptance.entity.YiTongGpsForWarnDevice;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import com.tidecloud.dataacceptance.util.FileUtils;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.util.concurrent.Future;
- import io.netty.util.concurrent.GenericFutureListener;
- /**
- * @author cdk
- */
- @Component
- @ChannelHandler.Sharable
- @Scope("prototype")
- public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
- private static final Integer START_BITS = 2;
- private static final Byte START_BIT = 0x78;
- private static final Byte START_BIT2 = 0X79;
- private static final byte LOGIN_MSG = 0x01;
- private static final byte LOCATION_MSG = 0x22;
- private static final byte STATUS_MSG = 0x13;
- private static final byte WARNING_MSG = 0x26;
- private static final byte CORRECT_TIME_MSG = (byte) 0x8A;
- private static final byte VOLTAGE_MSG = (byte) 0x94;
- private static final byte VOLTAGE_SUB_MSG = (byte) 0x00;
- private static final byte COMMAND_COPY_MSG = 0x15;
- private static final Integer DATA_SIZE = 6;
- @Override
- protected void handle(ByteBuf in, Channel channel) throws Exception {
- if (in.isReadable()) {
- in.markReaderIndex();
- try {
- int index = 0;
- byte b = 0;
- while (index < START_BITS) {
- b = in.readByte();
- if (START_BIT != b && START_BIT2 != b) {
- channel.close();
- }
- index++;
- }
- int length = 0;
- if (START_BIT == b) {
- length = in.readByte() & 0xff;
- } else {
- length = in.readShort() & 0xffff;
- }
- handle(in, length, channel);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- private byte[] getOriginalData(ByteBuf in, String deviceId) {
- if (StringUtils.isNotBlank(deviceId)) {
- in.resetReaderIndex();
- byte[] deviceArr = deviceId.getBytes();
- int length = in.readableBytes();
- byte[] dataByteArray = new byte[length + deviceArr.length];
- in.readBytes(dataByteArray,0,in.readableBytes());
- System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
- return dataByteArray;
- } else
- return null;
- }
- private void handle(ByteBuf in, int length, Channel channel) throws Exception {
- if (in.isReadable()) {
- String deviceId = channelDeviceMap.get(channel);
- if (deviceId!=null) {
- MDC.put(MDC_DEVICEID, deviceId);
- }
- byte msgType = in.readByte();
- if (LOGIN_MSG == msgType) {
- resolveLoginMSG(in, channel);
- } else if (LOCATION_MSG == msgType) {
- logger.info("GPS 定位包");
- sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
- // resolveLocationMSG(in, deviceId);
- } else if (STATUS_MSG == msgType) {
-
- reply(channel, STATUS_MSG);
- } else if (WARNING_MSG == msgType) {
- logger.info("报警数据(UTC)");
- reply(channel, WARNING_MSG);
- sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
- } else if (CORRECT_TIME_MSG == msgType) {
-
- reply(channel, CORRECT_TIME_MSG);
- } else if (VOLTAGE_MSG == msgType) {
- logger.info("信息传输通用包");
- sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
- // resolveVoltageMSG(in, channel);
- } else if (COMMAND_COPY_MSG == msgType) {
- // resolveCommandCopyMSG(in, channel, length);
- } else {
- logger.info("client send data without handle type ...");
- }
- }
- }
- private void resolveLoginMSG(ByteBuf in, Channel channel) {
- byte[] deviceIdBytes = new byte[8];
- in.readBytes(deviceIdBytes);
- String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
- MDC.put(MDC_DEVICEID, deviceId);
- // 回复和链接管理
- String deviceIdInMap = channelDeviceMap.get(channel);
- if (!deviceId.equals(deviceIdInMap)) {
- manageChannel(channel, deviceId);
- }
- reply(channel, LOGIN_MSG);
- }
- private void reply(Channel channel, byte msgType) {
-
- ByteBuf buffer = Unpooled.buffer();
- byte[] crcBytes = new byte[] { 0x05, msgType, 0x00, 0x05 };
- int doCrc = CRCUtil.do_crc(65535, crcBytes);
- byte[] intToByte = NumUtil.intToByte(doCrc, 2);
- byte[] bytes = new byte[] { 0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A };
- buffer.writeBytes(bytes);
- ChannelFuture channelFuture = channel.write(buffer);
- channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- String deviceId = channelDeviceMap.get(channel);
- String type = "";
- switch (msgType) {
- case STATUS_MSG:
- type = "心跳包";
- break;
- case WARNING_MSG:
- type = "报警数据(UTC)";
- break;
- case CORRECT_TIME_MSG:
- type = "校时包";
- break;
- case LOGIN_MSG:
- type = "登录包";
- break;
- default:
-
- break;
- }
- logger.info("server reply [{}] to client device [{}] success:[{}] ",type,deviceId, DatatypeConverter.printHexBinary(bytes));
- }
- });
- }
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- byte[] headerSplitBytes = new byte[] { 0x78, 0x78 };
- byte[] headerSplitBytes1 = new byte[] { 0x79, 0x79 };
- byte[] tailSplitBytes = new byte[] { 0x0D, 0x0A };
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
- Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
- Unpooled.copiedBuffer(headerSplitBytes1)));
- ch.pipeline().addLast(YiTongGpsServerHandler.this);
- }
- }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture f = b.bind(port).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- }
|