package com.tidecloud.dataacceptance.service.impl; import java.util.Date; import javax.xml.bind.DatatypeConverter; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder; import com.tidecloud.dataacceptance.common.CRCUtil; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.common.NumUtil; import com.tidecloud.dataacceptance.entity.YiTongGPSDevice; import com.tidecloud.dataacceptance.entity.YiTongGpsForWarnDevice; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import com.tidecloud.dataacceptance.util.FileUtils; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * @author cdk */ @Component @ChannelHandler.Sharable @Scope("prototype") public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class); private static final Integer START_BITS = 2; private static final Byte START_BIT = 0x78; private static final Byte START_BIT2 = 0X79; private static final byte LOGIN_MSG = 0x01; private static final byte LOCATION_MSG = 0x22; private static final byte STATUS_MSG = 0x13; private static final byte WARNING_MSG = 0x26; private static final byte CORRECT_TIME_MSG = (byte) 0x8A; private static final byte VOLTAGE_MSG = (byte) 0x94; private static final byte VOLTAGE_SUB_MSG = (byte) 0x00; private static final byte COMMAND_COPY_MSG = 0x15; private static final Integer DATA_SIZE = 6; @Override protected void handle(ByteBuf in, Channel channel) throws Exception { if (in.isReadable()) { in.markReaderIndex(); try { int index = 0; byte b = 0; while (index < START_BITS) { b = in.readByte(); if (START_BIT != b && START_BIT2 != b) { channel.close(); } index++; } int length = 0; if (START_BIT == b) { length = in.readByte() & 0xff; } else { length = in.readShort() & 0xffff; } handle(in, length, channel); } catch (Exception e) { logger.error(e.getMessage(), e); } } } private byte[] getOriginalData(ByteBuf in, String deviceId) { if (StringUtils.isNotBlank(deviceId)) { in.resetReaderIndex(); byte[] deviceArr = deviceId.getBytes(); int length = in.readableBytes(); byte[] dataByteArray = new byte[length + deviceArr.length]; in.readBytes(dataByteArray,0,in.readableBytes()); System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length); return dataByteArray; } else return null; } private void handle(ByteBuf in, int length, Channel channel) throws Exception { if (in.isReadable()) { String deviceId = channelDeviceMap.get(channel); if (deviceId!=null) { MDC.put(MDC_DEVICEID, deviceId); } byte msgType = in.readByte(); if (LOGIN_MSG == msgType) { resolveLoginMSG(in, channel); } else if (LOCATION_MSG == msgType) { logger.info("GPS 定位包"); sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel); // resolveLocationMSG(in, deviceId); } else if (STATUS_MSG == msgType) { reply(channel, STATUS_MSG); } else if (WARNING_MSG == msgType) { logger.info("报警数据(UTC)"); reply(channel, WARNING_MSG); sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel); } else if (CORRECT_TIME_MSG == msgType) { reply(channel, CORRECT_TIME_MSG); } else if (VOLTAGE_MSG == msgType) { logger.info("信息传输通用包"); sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel); // resolveVoltageMSG(in, channel); } else if (COMMAND_COPY_MSG == msgType) { // resolveCommandCopyMSG(in, channel, length); } else { logger.info("client send data without handle type ..."); } } } private void resolveCommandCopyMSG(ByteBuf in, Channel channel, Integer length) { byte readByte = in.readByte(); int checkCode = in.readInt(); if (checkCode != 0) { logger.error("illegal checkcode [{}]", checkCode); } else { // byte codingType = in.readByte(); byte[] coypByes = new byte[length - 14]; in.readBytes(coypByes); String copyStr = new String(coypByes); logger.info("设备指令回复 [{}]", copyStr); } } private void resolveWarningMsg(ByteBuf in, String deviceId) { YiTongGpsForWarnDevice yiTongGPSDevice = new YiTongGpsForWarnDevice(); StringBuffer dateTimeStrBuf = new StringBuffer(); int indexOfDateTime = 0; while (indexOfDateTime < DATA_SIZE) { byte b = in.readByte(); dateTimeStrBuf.append(NumUtil.byte2String(b)); indexOfDateTime++; } // 日期 yiTongGPSDevice.setDate(dateTimeStrBuf.toString()); // gps信息卫星数 yiTongGPSDevice.setGpsCount(in.readByte()); // 维度 yiTongGPSDevice.setLat(in.readInt()); // 经度 yiTongGPSDevice.setLng(in.readInt()); // 速度 // yiTongGPSDevice.setSpeedbyte(in.readByte()); // 航向 yiTongGPSDevice.setCourseStatus(in.readShort()); in.readByte(); // 国家代号 yiTongGPSDevice.setMcc(in.readShort()); // 移动网号码 yiTongGPSDevice.setMnc(in.readByte()); // 位置区码 yiTongGPSDevice.setLac(in.readShort()); in.readMedium(); yiTongGPSDevice.setTerminalMsg((int) in.readByte()); yiTongGPSDevice.setElectric((int) in.readByte()); yiTongGPSDevice.setGmsSign((int) in.readByte()); yiTongGPSDevice.setWarningReason((int) in.readByte()); in.readByte(); yiTongGPSDevice.setDeviceId(deviceId); // 写文件操作 String deviceStr = yiTongGPSDevice.buildDeviceStr(); FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName); } private void resolveVoltageMSG(ByteBuf in, Channel channel) { String deviceId = channelDeviceMap.get(channel); byte subMsgType = in.readByte(); if (VOLTAGE_SUB_MSG == subMsgType) { short voltage = in.readShort(); Double voltageDouble = NumUtil.toFixed2Place((double) voltage); String date = DateUtil.formatDate2String(DateUtil.calculateByHour(new Date(), -8)); YiTongGPSDevice yiTongGPSDevice = buildYiTongGpsDevcie(voltageDouble, deviceId, date); String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice); FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName); } } private YiTongGPSDevice buildYiTongGpsDevcie(Double voltageDouble, String deviceId, String date) { return new YiTongGPSDevice(deviceId, date, null, null, null, null, null, null, null, null, null, null, null, null, null, null, voltageDouble, 0); } private void resolveLoginMSG(ByteBuf in, Channel channel) { byte[] deviceIdBytes = new byte[8]; in.readBytes(deviceIdBytes); String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes); MDC.put(MDC_DEVICEID, deviceId); // 回复和链接管理 String deviceIdInMap = channelDeviceMap.get(channel); if (!deviceId.equals(deviceIdInMap)) { manageChannel(channel, deviceId); } reply(channel, LOGIN_MSG); } private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception { YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice(); StringBuffer dateTimeStrBuf = new StringBuffer(); int indexOfDateTime = 0; while (indexOfDateTime < DATA_SIZE) { byte b = in.readByte(); dateTimeStrBuf.append(NumUtil.byte2String(b)); indexOfDateTime++; } // 日期 yiTongGPSDevice.setDate(dateTimeStrBuf.toString()); // gps信息卫星数 yiTongGPSDevice.setGpsCount(in.readByte()); // 维度 yiTongGPSDevice.setLat(in.readInt()); // 经度 yiTongGPSDevice.setLng(in.readInt()); // 速度 yiTongGPSDevice.setSpeedbyte(in.readByte()); // 航向 yiTongGPSDevice.setCourseStatus(in.readShort()); // 国家代号 yiTongGPSDevice.setMcc(in.readShort()); // 移动网号码 yiTongGPSDevice.setMnc(in.readByte()); // 位置区码 yiTongGPSDevice.setLac(in.readShort()); // 移动基站Cell Tower ID yiTongGPSDevice.setCellId(in.readMedium()); yiTongGPSDevice.setAcc(in.readByte()); // 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息 yiTongGPSDevice.setReportModel(in.readByte()); // 0x01:实时 0x00:补传 yiTongGPSDevice.setIsmendMsg(in.readByte()); double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000); // 里程设备默认是关闭的,需要指令,设备端才发送 yiTongGPSDevice.setDeviceId(deviceId); yiTongGPSDevice.setMileage(mileage); yiTongGPSDevice.setDataType(1); // 写文件操作 String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice); FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName); } private void reply(Channel channel, byte msgType) { ByteBuf buffer = Unpooled.buffer(); byte[] crcBytes = new byte[] { 0x05, msgType, 0x00, 0x05 }; int doCrc = CRCUtil.do_crc(65535, crcBytes); byte[] intToByte = NumUtil.intToByte(doCrc, 2); byte[] bytes = new byte[] { 0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A }; buffer.writeBytes(bytes); ChannelFuture channelFuture = channel.write(buffer); channelFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { String deviceId = channelDeviceMap.get(channel); String type = ""; switch (msgType) { case STATUS_MSG: type = "心跳包"; break; case WARNING_MSG: type = "报警数据(UTC)"; break; case CORRECT_TIME_MSG: type = "校时包"; break; case LOGIN_MSG: type = "登录包"; break; default: break; } logger.info("server reply [{}] to client device [{}] success:[{}] ",type,deviceId, DatatypeConverter.printHexBinary(bytes)); } }); } public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); byte[] headerSplitBytes = new byte[] { 0x78, 0x78 }; byte[] headerSplitBytes1 = new byte[] { 0x79, 0x79 }; byte[] tailSplitBytes = new byte[] { 0x0D, 0x0A }; try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false, Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes), Unpooled.copiedBuffer(headerSplitBytes1))); ch.pipeline().addLast(YiTongGpsServerHandler.this); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error(e.getMessage()); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }