123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- package com.tidecloud.dataacceptance.service.impl;
- import javax.xml.bind.DatatypeConverter;
- import org.apache.commons.lang.ArrayUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import com.tidecloud.dataacceptance.common.BitOperator;
- import com.tidecloud.dataacceptance.common.CRCUtil;
- import com.tidecloud.dataacceptance.common.PinShenWaterUtils;
- import com.tidecloud.dataacceptance.entity.PinShenWaterDevice;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.util.concurrent.Future;
- import io.netty.util.concurrent.GenericFutureListener;
- /**
- * @author cdk
- */
- @Component
- @ChannelHandler.Sharable
- @Scope("prototype")
- public class PinShenData86ServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(PinShenData86ServerHandler.class);
- @Override
- protected void handle(ByteBuf in, Channel channel) throws Exception {
- PinShenWaterDevice device = PinShenWaterUtils.handle(in);
- if (device != null)
- this.handle(device, channel, in);
- }
- private void handle(PinShenWaterDevice device, Channel channel, ByteBuf in) throws Exception {
- byte typeByte = device.getTypeByte();
- byte[] packageArr = device.getPackageArr();
- // GPRS方式数据帧
- if (device.getTypeByte() == PinShenWaterUtils.data_gps_type) {
- String deviceId = device.getDeviceId();
- sendMsg2Kafka(getOriginalData(in), deviceId, channel);
- }
- // GPRS方式链路帧
- else if (typeByte == PinShenWaterUtils.link_gps_type) {
- if (packageArr.length == 1) {
- int link = packageArr[0] & 0xff;
- if (link == PinShenWaterUtils.link_req_code) {
- byte[] rspBody = { PinShenWaterUtils.link_rsp_code };
- reply(channel, device, rspBody);
- sendReq(channel, device, createReqDataArr(), PinShenWaterUtils.data_gps_type);
- }
- }
- }
- }
- private byte[] createReqDataArr() {
- byte[] req = new byte[] { 0x02, 0x04, 0x75, 0x31, 0x00, 0x05 };
- byte[] crcCode = CRCUtil.getCrc16(req);
- byte[] reqBytes = ArrayUtils.addAll(req, crcCode);
- return reqBytes;
- }
- private void reply(Channel channel, PinShenWaterDevice device, byte[] rspBody) {
- ByteBuf buffer = Unpooled.buffer();
- byte[] rsp = new byte[] {};
- rsp = ArrayUtils.addAll(rsp, device.getIdentityArr());
- rsp = ArrayUtils.addAll(rsp, device.getLengthArr());
- rsp = ArrayUtils.add(rsp, device.getPackageNo());
- rsp = ArrayUtils.add(rsp, device.getTypeByte());
- rsp = ArrayUtils.add(rsp, device.getDstAddrLength());
- rsp = ArrayUtils.addAll(rsp, device.getDstAddr());
- rsp = ArrayUtils.add(rsp, device.getSrcAddrLength());
- rsp = ArrayUtils.addAll(rsp, device.getSrcAddr());
- rsp = ArrayUtils.addAll(rsp, rspBody);
- byte code = PinShenWaterUtils.getXor(rsp);
- rsp = ArrayUtils.add(rsp, code);
- buffer.writeBytes(rsp);
- String result = DatatypeConverter.printHexBinary(rsp);
- ChannelFuture channelFuture = channel.write(buffer);
- channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- logger.info("server reply [{}] to client success", result);
- }
- });
- }
- private void sendReq(Channel channel, PinShenWaterDevice device, byte[] rspBody, Byte gpsType) {
- ByteBuf buffer = Unpooled.buffer();
- byte[] rsp = new byte[] {};
- rsp = ArrayUtils.addAll(rsp, device.getIdentityArr());
- rsp = ArrayUtils.addAll(rsp, device.getLengthArr());
- rsp = ArrayUtils.add(rsp, device.getPackageNo());
- rsp = ArrayUtils.add(rsp, gpsType);
- rsp = ArrayUtils.add(rsp, device.getDstAddrLength());
- rsp = ArrayUtils.addAll(rsp, device.getDstAddr());
- rsp = ArrayUtils.add(rsp, device.getSrcAddrLength());
- rsp = ArrayUtils.addAll(rsp, device.getSrcAddr());
- rsp = ArrayUtils.addAll(rsp, rspBody);
- byte code = PinShenWaterUtils.getXor(rsp);
- rsp = ArrayUtils.add(rsp, code);
- buffer.writeBytes(rsp);
- String result = DatatypeConverter.printHexBinary(rsp);
- ChannelFuture channelFuture = channel.write(buffer);
- channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- logger.info("server send request [{}] to client success", result);
- }
- });
- }
- private byte[] getOriginalData(ByteBuf in) {
- in.resetReaderIndex();
- int length = in.readableBytes();
- byte[] dataByteArray = new byte[length];
- in.readBytes(dataByteArray, 0, in.readableBytes());
- return dataByteArray;
- }
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
- // Unpooled.copiedBuffer(tailSplitBytes),
- // Unpooled.copiedBuffer(headerSplitBytes),
- // Unpooled.copiedBuffer(headerSplitBytes1)));
- ch.pipeline().addLast(PinShenData86ServerHandler.this);
- }
- }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture f = b.bind(port).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- }
|