123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- package com.tidecloud.dataacceptance.service;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import io.netty.handler.codec.LineBasedFrameDecoder;
- import io.netty.handler.codec.TooLongFrameException;
- import io.netty.util.ByteProcessor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.List;
- /**
- * 基于经纬特许处理
- */
- public class DelimiterJingWeiFrameDecoder extends ByteToMessageDecoder {
- private final ByteBuf TAG_AP07 = Unpooled.copiedBuffer("AP07".getBytes());
- private final ByteBuf TAG_COMMA = Unpooled.copiedBuffer(",".getBytes());
- private static final Logger logger = LoggerFactory.getLogger(DelimiterJingWeiFrameDecoder.class);
- private final ByteBuf[] delimiters;
- private final int maxFrameLength;
- private final boolean stripDelimiter;
- private final boolean failFast;
- private boolean discardingTooLongFrame;
- private int tooLongFrameLength;
- private static int minFrameLength = Integer.MAX_VALUE;
- /**
- * Set only when decoding with "\n" and "\r\n" as the delimiter.
- */
- private final LineBasedFrameDecoder lineBasedDecoder;
- private static ByteBuf buf = Unpooled.buffer();
- private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
- @Override
- protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- int index = indexOf(in, delimiters);
- if (index <= 0) {
- return;
- }
- ByteBuf[] tag_AP07 = new ByteBuf[]{TAG_AP07.slice(TAG_AP07.readerIndex(), TAG_AP07.readableBytes())};// 语音标记符号
- int frameLength = indexOf(in, tag_AP07);
- if (frameLength > 0) { // 语音包
- synchronized (this) {
- in.markReaderIndex();
- // 按照逗号切割
- int cutLength = 0;// 存储最后一次查询条件
- for (int i = 0; i < 5; i++) {
- int commaLength = in.forEachByte(in.readerIndex(), 30, FIND_COMMA);
- if (commaLength <= 0) {
- in.resetReaderIndex();
- return;
- } else {
- cutLength = commaLength;
- in.markReaderIndex();
- }
- }
- in.resetReaderIndex();
- // 标记数据
- if (in.readableBytes() < cutLength + 1) {
- return;
- } else {
- byte[] req = new byte[cutLength + 1];
- in.readBytes(req);
- String msg = new String(req, "UTF-8");
- String[] msgArr = msg.split(",");
- Integer length = Integer.valueOf(msgArr[4]);
- if (in.readableBytes() < length) {
- in.resetReaderIndex();
- return;
- }
- // 语音数据
- in.resetReaderIndex();
- int readLength = cutLength + 1 + length + 1;
- Object decoded = decode(readLength, ctx, in);
- out.add(decoded);
- }
- }
- } else {// 是其他包
- Object decoded = decode(ctx, in);
- if (decoded != null) {
- out.add(decoded);
- }
- }
- }
- protected Object decode(int readLength, ChannelHandlerContext ctx, ByteBuf in) throws Exception {
- if (in.readableBytes() < readLength) {
- return null;
- } else {
- return in.readRetainedSlice(readLength);
- }
- }
- /**
- * Creates a new instance.
- *
- * @param maxFrameLength the maximum length of the decoded frame.
- * A {@link TooLongFrameException} is thrown if
- * the length of the frame exceeds this value.
- * @param delimiter the delimiter
- */
- public DelimiterJingWeiFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
- this(maxFrameLength, true, delimiter);
- }
- /**
- * Creates a new instance.
- *
- * @param maxFrameLength the maximum length of the decoded frame.
- * A {@link TooLongFrameException} is thrown if
- * the length of the frame exceeds this value.
- * @param stripDelimiter whether the decoded frame should strip out the
- * delimiter or not
- * @param delimiter the delimiter
- */
- public DelimiterJingWeiFrameDecoder(
- int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
- this(maxFrameLength, stripDelimiter, true, delimiter);
- }
- /**
- * Creates a new instance.
- *
- * @param maxFrameLength the maximum length of the decoded frame.
- * A {@link TooLongFrameException} is thrown if
- * the length of the frame exceeds this value.
- * @param stripDelimiter whether the decoded frame should strip out the
- * delimiter or not
- * @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
- * thrown as soon as the decoder notices the length of the
- * frame will exceed <tt>maxFrameLength</tt> regardless of
- * whether the entire frame has been read.
- * If <tt>false</tt>, a {@link TooLongFrameException} is
- * thrown after the entire frame that exceeds
- * <tt>maxFrameLength</tt> has been read.
- * @param delimiter the delimiter
- */
- public DelimiterJingWeiFrameDecoder(
- int maxFrameLength, boolean stripDelimiter, boolean failFast,
- ByteBuf delimiter) {
- this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[]{
- delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
- }
- /**
- * Creates a new instance.
- *
- * @param maxFrameLength the maximum length of the decoded frame.
- * A {@link TooLongFrameException} is thrown if
- * the length of the frame exceeds this value.
- * @param delimiters the delimiters
- */
- public DelimiterJingWeiFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
- this(maxFrameLength, true, delimiters);
- }
- /**
- * Creates a new instance.
- *
- * @param maxFrameLength the maximum length of the decoded frame.
- * A {@link TooLongFrameException} is thrown if
- * the length of the frame exceeds this value.
- * @param stripDelimiter whether the decoded frame should strip out the
- * delimiter or not
- * @param delimiters the delimiters
- */
- public DelimiterJingWeiFrameDecoder(
- int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
- this(maxFrameLength, stripDelimiter, true, delimiters);
- }
- /**
- * Creates a new instance.
- *
- * @param maxFrameLength the maximum length of the decoded frame.
- * A {@link TooLongFrameException} is thrown if
- * the length of the frame exceeds this value.
- * @param stripDelimiter whether the decoded frame should strip out the
- * delimiter or not
- * @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
- * thrown as soon as the decoder notices the length of the
- * frame will exceed <tt>maxFrameLength</tt> regardless of
- * whether the entire frame has been read.
- * If <tt>false</tt>, a {@link TooLongFrameException} is
- * thrown after the entire frame that exceeds
- * <tt>maxFrameLength</tt> has been read.
- * @param delimiters the delimiters
- */
- public DelimiterJingWeiFrameDecoder(
- int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
- validateMaxFrameLength(maxFrameLength);
- if (delimiters == null) {
- throw new NullPointerException("delimiters");
- }
- if (delimiters.length == 0) {
- throw new IllegalArgumentException("empty delimiters");
- }
- if (isLineBased(delimiters) && !isSubclass()) {
- lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
- this.delimiters = null;
- } else {
- this.delimiters = new ByteBuf[delimiters.length];
- for (int i = 0; i < delimiters.length; i++) {
- ByteBuf d = delimiters[i];
- validateDelimiter(d);
- this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
- }
- lineBasedDecoder = null;
- }
- this.maxFrameLength = maxFrameLength;
- this.stripDelimiter = stripDelimiter;
- this.failFast = failFast;
- }
- /**
- * Returns true if the delimiters are "\n" and "\r\n".
- */
- private static boolean isLineBased(final ByteBuf[] delimiters) {
- if (delimiters.length != 2) {
- return false;
- }
- ByteBuf a = delimiters[0];
- ByteBuf b = delimiters[1];
- if (a.capacity() < b.capacity()) {
- a = delimiters[1];
- b = delimiters[0];
- }
- return a.capacity() == 2 && b.capacity() == 1
- && a.getByte(0) == '\r' && a.getByte(1) == '\n'
- && b.getByte(0) == '\n';
- }
- /**
- * Return {@code true} if the current instance is a subclass of DelimiterJingWeiFrameDecoder
- */
- private boolean isSubclass() {
- return getClass() != DelimiterJingWeiFrameDecoder.class;
- }
- /**
- * 匹配
- *
- * @param in
- * @param delimiters
- * @return
- */
- protected Integer indexOf(ByteBuf in, ByteBuf[] delimiters) {
- Integer indexLength = 0;
- for (ByteBuf delim : delimiters) {
- int frameLength = indexOf(in, delim);
- if (frameLength >= 0 && frameLength < minFrameLength) {
- indexLength = frameLength;
- }
- }
- return indexLength;
- }
- /**
- * Create a frame out of the {@link ByteBuf} and return it.
- *
- * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
- * @param buffer the {@link ByteBuf} from which to read data
- * @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
- * be created.
- */
- protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
- // Try all delimiters and choose the delimiter which yields the shortest frame.
- int minFrameLength = Integer.MAX_VALUE;
- ByteBuf minDelim = null;
- for (ByteBuf delim : delimiters) {
- int frameLength = indexOf(buffer, delim);
- if (frameLength >= 0 && frameLength < minFrameLength) {
- minFrameLength = frameLength;
- minDelim = delim;
- }
- }
- if (minDelim != null) {
- int minDelimLength = minDelim.capacity();
- ByteBuf frame;
- if (discardingTooLongFrame) {
- // We've just finished discarding a very large frame.
- // Go back to the initial state.
- discardingTooLongFrame = false;
- buffer.skipBytes(minFrameLength + minDelimLength);
- int tooLongFrameLength = this.tooLongFrameLength;
- this.tooLongFrameLength = 0;
- if (!failFast) {
- fail(tooLongFrameLength);
- }
- return null;
- }
- if (minFrameLength > maxFrameLength) {
- // Discard read frame.
- buffer.skipBytes(minFrameLength + minDelimLength);
- fail(minFrameLength);
- return null;
- }
- if (stripDelimiter) {
- frame = buffer.readRetainedSlice(minFrameLength);
- buffer.skipBytes(minDelimLength);
- } else {
- frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
- }
- return frame;
- } else {
- if (!discardingTooLongFrame) {
- if (buffer.readableBytes() > maxFrameLength) {
- // Discard the content of the buffer until a delimiter is found.
- tooLongFrameLength = buffer.readableBytes();
- buffer.skipBytes(buffer.readableBytes());
- discardingTooLongFrame = true;
- if (failFast) {
- fail(tooLongFrameLength);
- }
- }
- } else {
- // Still discarding the buffer since a delimiter is not found.
- tooLongFrameLength += buffer.readableBytes();
- buffer.skipBytes(buffer.readableBytes());
- }
- return null;
- }
- }
- private void fail(long frameLength) {
- if (frameLength > 0) {
- throw new TooLongFrameException(
- "frame length exceeds " + maxFrameLength +
- ": " + frameLength + " - discarded");
- } else {
- throw new TooLongFrameException(
- "frame length exceeds " + maxFrameLength +
- " - discarding");
- }
- }
- /**
- * Returns the number of bytes between the readerIndex of the haystack and
- * the first needle found in the haystack. -1 is returned if no needle is
- * found in the haystack.
- */
- private static int indexOf(ByteBuf haystack, ByteBuf needle) {
- for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
- int haystackIndex = i;
- int needleIndex;
- for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
- if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
- break;
- } else {
- haystackIndex++;
- if (haystackIndex == haystack.writerIndex() &&
- needleIndex != needle.capacity() - 1) {
- return -1;
- }
- }
- }
- if (needleIndex == needle.capacity()) {
- // Found the needle from the haystack!
- return i - haystack.readerIndex();
- }
- }
- return -1;
- }
- private static void validateDelimiter(ByteBuf delimiter) {
- if (delimiter == null) {
- throw new NullPointerException("delimiter");
- }
- if (!delimiter.isReadable()) {
- throw new IllegalArgumentException("empty delimiter");
- }
- }
- private static void validateMaxFrameLength(int maxFrameLength) {
- if (maxFrameLength <= 0) {
- throw new IllegalArgumentException(
- "maxFrameLength must be a positive integer: " +
- maxFrameLength);
- }
- }
- }
|