DelimiterJingWeiFrameDecoder.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package com.tidecloud.dataacceptance.service;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.ByteToMessageDecoder;
  6. import io.netty.handler.codec.LineBasedFrameDecoder;
  7. import io.netty.handler.codec.TooLongFrameException;
  8. import io.netty.util.ByteProcessor;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import java.util.List;
  12. /**
  13. * 基于经纬特许处理
  14. */
  15. public class DelimiterJingWeiFrameDecoder extends ByteToMessageDecoder {
  16. private final ByteBuf TAG_AP07 = Unpooled.copiedBuffer("AP07".getBytes());
  17. private final ByteBuf TAG_COMMA = Unpooled.copiedBuffer(",".getBytes());
  18. private static final Logger logger = LoggerFactory.getLogger(DelimiterJingWeiFrameDecoder.class);
  19. private final ByteBuf[] delimiters;
  20. private final int maxFrameLength;
  21. private final boolean stripDelimiter;
  22. private final boolean failFast;
  23. private boolean discardingTooLongFrame;
  24. private int tooLongFrameLength;
  25. private static int minFrameLength = Integer.MAX_VALUE;
  26. /**
  27. * Set only when decoding with "\n" and "\r\n" as the delimiter.
  28. */
  29. private final LineBasedFrameDecoder lineBasedDecoder;
  30. private static ByteBuf buf = Unpooled.buffer();
  31. private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
  32. @Override
  33. protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  34. int index = indexOf(in, delimiters);
  35. if (index <= 0) {
  36. return;
  37. }
  38. ByteBuf[] tag_AP07 = new ByteBuf[]{TAG_AP07.slice(TAG_AP07.readerIndex(), TAG_AP07.readableBytes())};// 语音标记符号
  39. int frameLength = indexOf(in, tag_AP07);
  40. if (frameLength > 0) { // 语音包
  41. synchronized (this) {
  42. in.markReaderIndex();
  43. // 按照逗号切割
  44. int cutLength = 0;// 存储最后一次查询条件
  45. for (int i = 0; i < 5; i++) {
  46. int commaLength = in.forEachByte(in.readerIndex(), 30, FIND_COMMA);
  47. if (commaLength <= 0) {
  48. in.resetReaderIndex();
  49. return;
  50. } else {
  51. cutLength = commaLength;
  52. in.markReaderIndex();
  53. }
  54. }
  55. in.resetReaderIndex();
  56. // 标记数据
  57. if (in.readableBytes() < cutLength + 1) {
  58. return;
  59. } else {
  60. byte[] req = new byte[cutLength + 1];
  61. in.readBytes(req);
  62. String msg = new String(req, "UTF-8");
  63. String[] msgArr = msg.split(",");
  64. Integer length = Integer.valueOf(msgArr[4]);
  65. if (in.readableBytes() < length) {
  66. in.resetReaderIndex();
  67. return;
  68. }
  69. // 语音数据
  70. in.resetReaderIndex();
  71. int readLength = cutLength + 1 + length + 1;
  72. Object decoded = decode(readLength, ctx, in);
  73. out.add(decoded);
  74. }
  75. }
  76. } else {// 是其他包
  77. Object decoded = decode(ctx, in);
  78. if (decoded != null) {
  79. out.add(decoded);
  80. }
  81. }
  82. }
  83. protected Object decode(int readLength, ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  84. if (in.readableBytes() < readLength) {
  85. return null;
  86. } else {
  87. return in.readRetainedSlice(readLength);
  88. }
  89. }
  90. /**
  91. * Creates a new instance.
  92. *
  93. * @param maxFrameLength the maximum length of the decoded frame.
  94. * A {@link TooLongFrameException} is thrown if
  95. * the length of the frame exceeds this value.
  96. * @param delimiter the delimiter
  97. */
  98. public DelimiterJingWeiFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
  99. this(maxFrameLength, true, delimiter);
  100. }
  101. /**
  102. * Creates a new instance.
  103. *
  104. * @param maxFrameLength the maximum length of the decoded frame.
  105. * A {@link TooLongFrameException} is thrown if
  106. * the length of the frame exceeds this value.
  107. * @param stripDelimiter whether the decoded frame should strip out the
  108. * delimiter or not
  109. * @param delimiter the delimiter
  110. */
  111. public DelimiterJingWeiFrameDecoder(
  112. int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
  113. this(maxFrameLength, stripDelimiter, true, delimiter);
  114. }
  115. /**
  116. * Creates a new instance.
  117. *
  118. * @param maxFrameLength the maximum length of the decoded frame.
  119. * A {@link TooLongFrameException} is thrown if
  120. * the length of the frame exceeds this value.
  121. * @param stripDelimiter whether the decoded frame should strip out the
  122. * delimiter or not
  123. * @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
  124. * thrown as soon as the decoder notices the length of the
  125. * frame will exceed <tt>maxFrameLength</tt> regardless of
  126. * whether the entire frame has been read.
  127. * If <tt>false</tt>, a {@link TooLongFrameException} is
  128. * thrown after the entire frame that exceeds
  129. * <tt>maxFrameLength</tt> has been read.
  130. * @param delimiter the delimiter
  131. */
  132. public DelimiterJingWeiFrameDecoder(
  133. int maxFrameLength, boolean stripDelimiter, boolean failFast,
  134. ByteBuf delimiter) {
  135. this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[]{
  136. delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
  137. }
  138. /**
  139. * Creates a new instance.
  140. *
  141. * @param maxFrameLength the maximum length of the decoded frame.
  142. * A {@link TooLongFrameException} is thrown if
  143. * the length of the frame exceeds this value.
  144. * @param delimiters the delimiters
  145. */
  146. public DelimiterJingWeiFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
  147. this(maxFrameLength, true, delimiters);
  148. }
  149. /**
  150. * Creates a new instance.
  151. *
  152. * @param maxFrameLength the maximum length of the decoded frame.
  153. * A {@link TooLongFrameException} is thrown if
  154. * the length of the frame exceeds this value.
  155. * @param stripDelimiter whether the decoded frame should strip out the
  156. * delimiter or not
  157. * @param delimiters the delimiters
  158. */
  159. public DelimiterJingWeiFrameDecoder(
  160. int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
  161. this(maxFrameLength, stripDelimiter, true, delimiters);
  162. }
  163. /**
  164. * Creates a new instance.
  165. *
  166. * @param maxFrameLength the maximum length of the decoded frame.
  167. * A {@link TooLongFrameException} is thrown if
  168. * the length of the frame exceeds this value.
  169. * @param stripDelimiter whether the decoded frame should strip out the
  170. * delimiter or not
  171. * @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
  172. * thrown as soon as the decoder notices the length of the
  173. * frame will exceed <tt>maxFrameLength</tt> regardless of
  174. * whether the entire frame has been read.
  175. * If <tt>false</tt>, a {@link TooLongFrameException} is
  176. * thrown after the entire frame that exceeds
  177. * <tt>maxFrameLength</tt> has been read.
  178. * @param delimiters the delimiters
  179. */
  180. public DelimiterJingWeiFrameDecoder(
  181. int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
  182. validateMaxFrameLength(maxFrameLength);
  183. if (delimiters == null) {
  184. throw new NullPointerException("delimiters");
  185. }
  186. if (delimiters.length == 0) {
  187. throw new IllegalArgumentException("empty delimiters");
  188. }
  189. if (isLineBased(delimiters) && !isSubclass()) {
  190. lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
  191. this.delimiters = null;
  192. } else {
  193. this.delimiters = new ByteBuf[delimiters.length];
  194. for (int i = 0; i < delimiters.length; i++) {
  195. ByteBuf d = delimiters[i];
  196. validateDelimiter(d);
  197. this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
  198. }
  199. lineBasedDecoder = null;
  200. }
  201. this.maxFrameLength = maxFrameLength;
  202. this.stripDelimiter = stripDelimiter;
  203. this.failFast = failFast;
  204. }
  205. /**
  206. * Returns true if the delimiters are "\n" and "\r\n".
  207. */
  208. private static boolean isLineBased(final ByteBuf[] delimiters) {
  209. if (delimiters.length != 2) {
  210. return false;
  211. }
  212. ByteBuf a = delimiters[0];
  213. ByteBuf b = delimiters[1];
  214. if (a.capacity() < b.capacity()) {
  215. a = delimiters[1];
  216. b = delimiters[0];
  217. }
  218. return a.capacity() == 2 && b.capacity() == 1
  219. && a.getByte(0) == '\r' && a.getByte(1) == '\n'
  220. && b.getByte(0) == '\n';
  221. }
  222. /**
  223. * Return {@code true} if the current instance is a subclass of DelimiterJingWeiFrameDecoder
  224. */
  225. private boolean isSubclass() {
  226. return getClass() != DelimiterJingWeiFrameDecoder.class;
  227. }
  228. /**
  229. * 匹配
  230. *
  231. * @param in
  232. * @param delimiters
  233. * @return
  234. */
  235. protected Integer indexOf(ByteBuf in, ByteBuf[] delimiters) {
  236. Integer indexLength = 0;
  237. for (ByteBuf delim : delimiters) {
  238. int frameLength = indexOf(in, delim);
  239. if (frameLength >= 0 && frameLength < minFrameLength) {
  240. indexLength = frameLength;
  241. }
  242. }
  243. return indexLength;
  244. }
  245. /**
  246. * Create a frame out of the {@link ByteBuf} and return it.
  247. *
  248. * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
  249. * @param buffer the {@link ByteBuf} from which to read data
  250. * @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
  251. * be created.
  252. */
  253. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
  254. // Try all delimiters and choose the delimiter which yields the shortest frame.
  255. int minFrameLength = Integer.MAX_VALUE;
  256. ByteBuf minDelim = null;
  257. for (ByteBuf delim : delimiters) {
  258. int frameLength = indexOf(buffer, delim);
  259. if (frameLength >= 0 && frameLength < minFrameLength) {
  260. minFrameLength = frameLength;
  261. minDelim = delim;
  262. }
  263. }
  264. if (minDelim != null) {
  265. int minDelimLength = minDelim.capacity();
  266. ByteBuf frame;
  267. if (discardingTooLongFrame) {
  268. // We've just finished discarding a very large frame.
  269. // Go back to the initial state.
  270. discardingTooLongFrame = false;
  271. buffer.skipBytes(minFrameLength + minDelimLength);
  272. int tooLongFrameLength = this.tooLongFrameLength;
  273. this.tooLongFrameLength = 0;
  274. if (!failFast) {
  275. fail(tooLongFrameLength);
  276. }
  277. return null;
  278. }
  279. if (minFrameLength > maxFrameLength) {
  280. // Discard read frame.
  281. buffer.skipBytes(minFrameLength + minDelimLength);
  282. fail(minFrameLength);
  283. return null;
  284. }
  285. if (stripDelimiter) {
  286. frame = buffer.readRetainedSlice(minFrameLength);
  287. buffer.skipBytes(minDelimLength);
  288. } else {
  289. frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
  290. }
  291. return frame;
  292. } else {
  293. if (!discardingTooLongFrame) {
  294. if (buffer.readableBytes() > maxFrameLength) {
  295. // Discard the content of the buffer until a delimiter is found.
  296. tooLongFrameLength = buffer.readableBytes();
  297. buffer.skipBytes(buffer.readableBytes());
  298. discardingTooLongFrame = true;
  299. if (failFast) {
  300. fail(tooLongFrameLength);
  301. }
  302. }
  303. } else {
  304. // Still discarding the buffer since a delimiter is not found.
  305. tooLongFrameLength += buffer.readableBytes();
  306. buffer.skipBytes(buffer.readableBytes());
  307. }
  308. return null;
  309. }
  310. }
  311. private void fail(long frameLength) {
  312. if (frameLength > 0) {
  313. throw new TooLongFrameException(
  314. "frame length exceeds " + maxFrameLength +
  315. ": " + frameLength + " - discarded");
  316. } else {
  317. throw new TooLongFrameException(
  318. "frame length exceeds " + maxFrameLength +
  319. " - discarding");
  320. }
  321. }
  322. /**
  323. * Returns the number of bytes between the readerIndex of the haystack and
  324. * the first needle found in the haystack. -1 is returned if no needle is
  325. * found in the haystack.
  326. */
  327. private static int indexOf(ByteBuf haystack, ByteBuf needle) {
  328. for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
  329. int haystackIndex = i;
  330. int needleIndex;
  331. for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
  332. if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
  333. break;
  334. } else {
  335. haystackIndex++;
  336. if (haystackIndex == haystack.writerIndex() &&
  337. needleIndex != needle.capacity() - 1) {
  338. return -1;
  339. }
  340. }
  341. }
  342. if (needleIndex == needle.capacity()) {
  343. // Found the needle from the haystack!
  344. return i - haystack.readerIndex();
  345. }
  346. }
  347. return -1;
  348. }
  349. private static void validateDelimiter(ByteBuf delimiter) {
  350. if (delimiter == null) {
  351. throw new NullPointerException("delimiter");
  352. }
  353. if (!delimiter.isReadable()) {
  354. throw new IllegalArgumentException("empty delimiter");
  355. }
  356. }
  357. private static void validateMaxFrameLength(int maxFrameLength) {
  358. if (maxFrameLength <= 0) {
  359. throw new IllegalArgumentException(
  360. "maxFrameLength must be a positive integer: " +
  361. maxFrameLength);
  362. }
  363. }
  364. }