package com.tidecloud.dataacceptance.service.impl; import java.util.Date; import javax.xml.bind.DatatypeConverter; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder; import com.tidecloud.dataacceptance.common.CRCUtil; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.common.NumUtil; import com.tidecloud.dataacceptance.entity.YiTongGPSDevice; import com.tidecloud.dataacceptance.entity.YiTongGpsForWarnDevice; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import com.tidecloud.dataacceptance.util.FileUtils; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * @author cdk */ @Component @ChannelHandler.Sharable @Scope("prototype") public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class); private static final Integer START_BITS = 2; private static final Byte START_BIT = 0x78; private static final Byte START_BIT2 = 0X79; private static final byte LOGIN_MSG = 0x01; private static final byte LOCATION_MSG = 0x22; private static final byte STATUS_MSG = 0x13; private static final byte WARNING_MSG = 0x26; private static final byte CORRECT_TIME_MSG = (byte) 0x8A; private static final byte VOLTAGE_MSG = (byte) 0x94; private static final byte VOLTAGE_SUB_MSG = (byte) 0x00; private static final byte COMMAND_COPY_MSG = 0x15; private static final Integer DATA_SIZE = 6; @Override protected void handle(ByteBuf in, Channel channel) throws Exception { if (in.isReadable()) { in.markReaderIndex(); try { int index = 0; byte b = 0; while (index < START_BITS) { b = in.readByte(); if (START_BIT != b && START_BIT2 != b) { channel.close(); } index++; } int length = 0; if (START_BIT == b) { length = in.readByte() & 0xff; } else { length = in.readShort() & 0xffff; } handle(in, length, channel); } catch (Exception e) { logger.error(e.getMessage(), e); } } } private byte[] getOriginalData(ByteBuf in, String deviceId) { if (StringUtils.isNotBlank(deviceId)) { in.resetReaderIndex(); byte[] deviceArr = deviceId.getBytes(); int length = in.readableBytes(); byte[] dataByteArray = new byte[length + deviceArr.length]; in.readBytes(dataByteArray,0,in.readableBytes()); System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length); return dataByteArray; } else return null; } private void handle(ByteBuf in, int length, Channel channel) throws Exception { if (in.isReadable()) { String deviceId = channelDeviceMap.get(channel); if (deviceId!=null) { MDC.put(MDC_DEVICEID, deviceId); } byte msgType = in.readByte(); if (LOGIN_MSG == msgType) { resolveLoginMSG(in, channel); } else if (LOCATION_MSG == msgType) { logger.info("GPS 定位包"); sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel); // resolveLocationMSG(in, deviceId); } else if (STATUS_MSG == msgType) { reply(channel, STATUS_MSG); } else if (WARNING_MSG == msgType) { logger.info("报警数据(UTC)"); reply(channel, WARNING_MSG); sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel); } else if (CORRECT_TIME_MSG == msgType) { reply(channel, CORRECT_TIME_MSG); } else if (VOLTAGE_MSG == msgType) { logger.info("信息传输通用包"); sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel); // resolveVoltageMSG(in, channel); } else if (COMMAND_COPY_MSG == msgType) { // resolveCommandCopyMSG(in, channel, length); } else { logger.info("client send data without handle type ..."); } } } private void resolveLoginMSG(ByteBuf in, Channel channel) { byte[] deviceIdBytes = new byte[8]; in.readBytes(deviceIdBytes); String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes); MDC.put(MDC_DEVICEID, deviceId); // 回复和链接管理 String deviceIdInMap = channelDeviceMap.get(channel); if (!deviceId.equals(deviceIdInMap)) { manageChannel(channel, deviceId); } reply(channel, LOGIN_MSG); } private void reply(Channel channel, byte msgType) { ByteBuf buffer = Unpooled.buffer(); byte[] crcBytes = new byte[] { 0x05, msgType, 0x00, 0x05 }; int doCrc = CRCUtil.do_crc(65535, crcBytes); byte[] intToByte = NumUtil.intToByte(doCrc, 2); byte[] bytes = new byte[] { 0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A }; buffer.writeBytes(bytes); ChannelFuture channelFuture = channel.write(buffer); channelFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { String deviceId = channelDeviceMap.get(channel); String type = ""; switch (msgType) { case STATUS_MSG: type = "心跳包"; break; case WARNING_MSG: type = "报警数据(UTC)"; break; case CORRECT_TIME_MSG: type = "校时包"; break; case LOGIN_MSG: type = "登录包"; break; default: break; } logger.info("server reply [{}] to client device [{}] success:[{}] ",type,deviceId, DatatypeConverter.printHexBinary(bytes)); } }); } public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); byte[] headerSplitBytes = new byte[] { 0x78, 0x78 }; byte[] headerSplitBytes1 = new byte[] { 0x79, 0x79 }; byte[] tailSplitBytes = new byte[] { 0x0D, 0x0A }; try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false, Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes), Unpooled.copiedBuffer(headerSplitBytes1))); ch.pipeline().addLast(YiTongGpsServerHandler.this); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error(e.getMessage()); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }