Forráskód Böngészése

数据解析 增加 锁

jianghouwei 7 éve
szülő
commit
aea1213f66

+ 32 - 30
src/main/java/com/tidecloud/dataacceptance/service/DelimiterJingWeiFrameDecoder.java

@@ -43,50 +43,52 @@ public class DelimiterJingWeiFrameDecoder extends ByteToMessageDecoder {
 
     @Override
     protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
-        in.markReaderIndex();
+
         int index = indexOf(in, delimiters);
         if (index <= 0) {
             return;
         }
         ByteBuf[] tag_AP07 = new ByteBuf[]{TAG_AP07.slice(TAG_AP07.readerIndex(), TAG_AP07.readableBytes())};// 语音标记符号
-
         int frameLength = indexOf(in, tag_AP07);
         if (frameLength > 0) { // 语音包
-            int beginIndex = in.readerIndex();
-            // 按照逗号切割
-            int cutLength = 0;// 存储最后一次查询条件
-            for (int i = 0; i < 5; i++) {
-                int commaLength = in.forEachByte(in.readerIndex(), 30, FIND_COMMA);
-                if (commaLength <= 0) {
+            synchronized (this){
+                in.markReaderIndex();
+                // 按照逗号切割
+                int cutLength = 0;// 存储最后一次查询条件
+                for (int i = 0; i < 5; i++) {
+                    int commaLength = in.forEachByte(in.readerIndex(), 30, FIND_COMMA);
+                    if (commaLength <= 0) {
 //                    in.readerIndex(beginIndex);
+                        in.resetReaderIndex();
+                        return;
+                    } else {
+                        cutLength = commaLength;
+                        in.readerIndex(commaLength + 1);
+                    }
+                }
+                in.resetReaderIndex();
+                // in.readerIndex(beginIndex);
+                // 标记数据
+                byte[] req = new byte[cutLength + 1];
+                in.readBytes(req);
+                String msg = new String(req, "UTF-8");
+                String[] msgArr = msg.split(",");
+                Integer length = Integer.valueOf(msgArr[4]);
+                if (in.readableBytes() < length) {
+//                in.readerIndex(beginIndex);
                     in.resetReaderIndex();
                     return;
-                } else {
-                    cutLength = commaLength;
-                    in.readerIndex(commaLength + 1);
                 }
-            }
-            in.resetReaderIndex();
-            // in.readerIndex(beginIndex);
-            // 标记数据
-            byte[] req = new byte[cutLength + 1];
-            in.readBytes(req);
-            String msg = new String(req, "UTF-8");
-            String[] msgArr = msg.split(",");
-            Integer length = Integer.valueOf(msgArr[4]);
-            if (in.readableBytes() < length) {
-//                in.readerIndex(beginIndex);
-                in.resetReaderIndex();
-                return;
-            }
-            // 语音数据
+                // 语音数据
 //            in.readerIndex(beginIndex);
 //            ByteBuf otherByteBufRef = in.readBytes(readLength);
 //            out.add(otherByteBufRef);
-            in.resetReaderIndex();
-            int readLength = cutLength + 1 + length + 1;
-            Object decoded = decode(readLength, ctx, in);
-            out.add(decoded);
+                in.resetReaderIndex();
+                int readLength = cutLength + 1 + length + 1;
+                Object decoded = decode(readLength, ctx, in);
+                out.add(decoded);
+            }
+
         } else {// 是其他包
             Object decoded = decode(ctx, in);
             if (decoded != null) {