package com.tidecloud.dataacceptance.service.impl; import javax.xml.bind.DatatypeConverter; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.tidecloud.dataacceptance.common.BitOperator; import com.tidecloud.dataacceptance.common.CRCUtil; import com.tidecloud.dataacceptance.common.PinShenWaterUtils; import com.tidecloud.dataacceptance.entity.PinShenWaterDevice; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * @author cdk */ @Component @ChannelHandler.Sharable @Scope("prototype") public class PinShenData86ServerHandler extends HexBinaryAcceptanceHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(PinShenData86ServerHandler.class); @Override protected void handle(ByteBuf in, Channel channel) throws Exception { PinShenWaterDevice device = PinShenWaterUtils.handle(in); if (device != null) this.handle(device, channel, in); } private void handle(PinShenWaterDevice device, Channel channel, ByteBuf in) throws Exception { byte typeByte = device.getTypeByte(); byte[] packageArr = device.getPackageArr(); // GPRS方式数据帧 if (device.getTypeByte() == PinShenWaterUtils.data_gps_type) { String deviceId = device.getDeviceId(); sendMsg2Kafka(getOriginalData(in), deviceId, channel); } // GPRS方式链路帧 else if (typeByte == PinShenWaterUtils.link_gps_type) { if (packageArr.length == 1) { int link = packageArr[0] & 0xff; if (link == PinShenWaterUtils.link_req_code) { byte[] rspBody = { PinShenWaterUtils.link_rsp_code }; reply(channel, device, rspBody); sendReq(channel, device, createReqDataArr(), PinShenWaterUtils.data_gps_type); } } } } private byte[] createReqDataArr() { byte[] req = new byte[] { 0x02, 0x04, 0x75, 0x31, 0x00, 0x05 }; byte[] crcCode = CRCUtil.getCrc16(req); byte[] reqBytes = ArrayUtils.addAll(req, crcCode); return reqBytes; } private void reply(Channel channel, PinShenWaterDevice device, byte[] rspBody) { ByteBuf buffer = Unpooled.buffer(); byte[] rsp = new byte[] {}; rsp = ArrayUtils.addAll(rsp, device.getIdentityArr()); rsp = ArrayUtils.addAll(rsp, device.getLengthArr()); rsp = ArrayUtils.add(rsp, device.getPackageNo()); rsp = ArrayUtils.add(rsp, device.getTypeByte()); rsp = ArrayUtils.add(rsp, device.getDstAddrLength()); rsp = ArrayUtils.addAll(rsp, device.getDstAddr()); rsp = ArrayUtils.add(rsp, device.getSrcAddrLength()); rsp = ArrayUtils.addAll(rsp, device.getSrcAddr()); rsp = ArrayUtils.addAll(rsp, rspBody); byte code = PinShenWaterUtils.getXor(rsp); rsp = ArrayUtils.add(rsp, code); buffer.writeBytes(rsp); String result = DatatypeConverter.printHexBinary(rsp); ChannelFuture channelFuture = channel.write(buffer); channelFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { logger.info("server reply [{}] to client success", result); } }); } private void sendReq(Channel channel, PinShenWaterDevice device, byte[] rspBody, Byte gpsType) { ByteBuf buffer = Unpooled.buffer(); byte[] rsp = new byte[] {}; rsp = ArrayUtils.addAll(rsp, device.getIdentityArr()); rsp = ArrayUtils.addAll(rsp, device.getLengthArr()); rsp = ArrayUtils.add(rsp, device.getPackageNo()); rsp = ArrayUtils.add(rsp, gpsType); rsp = ArrayUtils.add(rsp, device.getDstAddrLength()); rsp = ArrayUtils.addAll(rsp, device.getDstAddr()); rsp = ArrayUtils.add(rsp, device.getSrcAddrLength()); rsp = ArrayUtils.addAll(rsp, device.getSrcAddr()); rsp = ArrayUtils.addAll(rsp, rspBody); byte code = PinShenWaterUtils.getXor(rsp); rsp = ArrayUtils.add(rsp, code); buffer.writeBytes(rsp); String result = DatatypeConverter.printHexBinary(rsp); ChannelFuture channelFuture = channel.write(buffer); channelFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { logger.info("server send request [{}] to client success", result); } }); } private byte[] getOriginalData(ByteBuf in) { in.resetReaderIndex(); int length = in.readableBytes(); byte[] dataByteArray = new byte[length]; in.readBytes(dataByteArray, 0, in.readableBytes()); return dataByteArray; } public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { // ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false, // Unpooled.copiedBuffer(tailSplitBytes), // Unpooled.copiedBuffer(headerSplitBytes), // Unpooled.copiedBuffer(headerSplitBytes1))); ch.pipeline().addLast(PinShenData86ServerHandler.this); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error(e.getMessage()); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }