Ver Fonte

经纬语音接入,

jianghouwei há 6 anos atrás
pai
commit
60560ecaab

+ 394 - 0
src/main/java/com/tidecloud/dataacceptance/service/DelimiterJingWeiFrameDecoder.java

@@ -0,0 +1,394 @@
+package com.tidecloud.dataacceptance.service;
+
+import com.tidecloud.dataacceptance.service.impl.WatchJWServerHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufProcessor;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.ByteProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * 基于经纬特许处理
+ */
+public class DelimiterJingWeiFrameDecoder extends ByteToMessageDecoder {
+
+
+    private final ByteBuf TAG_AP07 = Unpooled.copiedBuffer("AP07".getBytes());
+    private final ByteBuf TAG_COMMA = Unpooled.copiedBuffer(",".getBytes());
+
+
+    private static final Logger logger = LoggerFactory.getLogger(DelimiterJingWeiFrameDecoder.class);
+    private final ByteBuf[] delimiters;
+    private final int maxFrameLength;
+    private final boolean stripDelimiter;
+    private final boolean failFast;
+    private boolean discardingTooLongFrame;
+    private int tooLongFrameLength;
+
+    private static int minFrameLength = Integer.MAX_VALUE;
+
+    /**
+     * Set only when decoding with "\n" and "\r\n" as the delimiter.
+     */
+    private final LineBasedFrameDecoder lineBasedDecoder;
+
+    private static ByteBuf buf = Unpooled.buffer();
+
+    private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
+
+    @Override
+    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        int index = indexOf(in, delimiters);
+        if (index <= 0) {
+            return;
+        }
+        ByteBuf[] tag_AP07 = new ByteBuf[]{TAG_AP07.slice(TAG_AP07.readerIndex(), TAG_AP07.readableBytes())};// 语音标记符号
+
+        int frameLength = indexOf(in, tag_AP07);
+        if (frameLength > 0) { // 语音包
+            int beginIndex = in.readerIndex();
+            // 按照逗号切割
+            int cutLength = 0;// 存储最后一次查询条件
+            for (int i = 0; i < 5; i++) {
+                int commaLength = in.forEachByte(in.readerIndex(), 30, FIND_COMMA);
+                if (commaLength <= 0) {
+                    in.readerIndex(beginIndex);
+                    return;
+                } else {
+                    cutLength = commaLength;
+                    in.readerIndex(commaLength + 1);
+                }
+            }
+            in.readerIndex(beginIndex);
+            // 标记数据
+            byte[] req = new byte[cutLength + 1];
+            in.readBytes(req);
+            String msg = new String(req, "UTF-8");
+            String[] msgArr = msg.split(",");
+            Integer length = Integer.valueOf(msgArr[4]);
+            if (in.readableBytes() < length) {
+                in.readerIndex(beginIndex);
+                return;
+            }
+            // 语音数据
+            in.readerIndex(beginIndex);
+            ByteBuf otherByteBufRef = in.readBytes(cutLength + 2 + length);
+            out.add(otherByteBufRef);
+        } else {// 是其他包
+            Object decoded = decode(ctx, in);
+            if (decoded != null) {
+                out.add(decoded);
+            }
+        }
+
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param maxFrameLength the maximum length of the decoded frame.
+     *                       A {@link TooLongFrameException} is thrown if
+     *                       the length of the frame exceeds this value.
+     * @param delimiter      the delimiter
+     */
+    public DelimiterJingWeiFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
+        this(maxFrameLength, true, delimiter);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param maxFrameLength the maximum length of the decoded frame.
+     *                       A {@link TooLongFrameException} is thrown if
+     *                       the length of the frame exceeds this value.
+     * @param stripDelimiter whether the decoded frame should strip out the
+     *                       delimiter or not
+     * @param delimiter      the delimiter
+     */
+    public DelimiterJingWeiFrameDecoder(
+            int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
+        this(maxFrameLength, stripDelimiter, true, delimiter);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param maxFrameLength the maximum length of the decoded frame.
+     *                       A {@link TooLongFrameException} is thrown if
+     *                       the length of the frame exceeds this value.
+     * @param stripDelimiter whether the decoded frame should strip out the
+     *                       delimiter or not
+     * @param failFast       If <tt>true</tt>, a {@link TooLongFrameException} is
+     *                       thrown as soon as the decoder notices the length of the
+     *                       frame will exceed <tt>maxFrameLength</tt> regardless of
+     *                       whether the entire frame has been read.
+     *                       If <tt>false</tt>, a {@link TooLongFrameException} is
+     *                       thrown after the entire frame that exceeds
+     *                       <tt>maxFrameLength</tt> has been read.
+     * @param delimiter      the delimiter
+     */
+    public DelimiterJingWeiFrameDecoder(
+            int maxFrameLength, boolean stripDelimiter, boolean failFast,
+            ByteBuf delimiter) {
+        this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[]{
+                delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param maxFrameLength the maximum length of the decoded frame.
+     *                       A {@link TooLongFrameException} is thrown if
+     *                       the length of the frame exceeds this value.
+     * @param delimiters     the delimiters
+     */
+    public DelimiterJingWeiFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
+        this(maxFrameLength, true, delimiters);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param maxFrameLength the maximum length of the decoded frame.
+     *                       A {@link TooLongFrameException} is thrown if
+     *                       the length of the frame exceeds this value.
+     * @param stripDelimiter whether the decoded frame should strip out the
+     *                       delimiter or not
+     * @param delimiters     the delimiters
+     */
+    public DelimiterJingWeiFrameDecoder(
+            int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
+        this(maxFrameLength, stripDelimiter, true, delimiters);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param maxFrameLength the maximum length of the decoded frame.
+     *                       A {@link TooLongFrameException} is thrown if
+     *                       the length of the frame exceeds this value.
+     * @param stripDelimiter whether the decoded frame should strip out the
+     *                       delimiter or not
+     * @param failFast       If <tt>true</tt>, a {@link TooLongFrameException} is
+     *                       thrown as soon as the decoder notices the length of the
+     *                       frame will exceed <tt>maxFrameLength</tt> regardless of
+     *                       whether the entire frame has been read.
+     *                       If <tt>false</tt>, a {@link TooLongFrameException} is
+     *                       thrown after the entire frame that exceeds
+     *                       <tt>maxFrameLength</tt> has been read.
+     * @param delimiters     the delimiters
+     */
+    public DelimiterJingWeiFrameDecoder(
+            int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
+        validateMaxFrameLength(maxFrameLength);
+        if (delimiters == null) {
+            throw new NullPointerException("delimiters");
+        }
+        if (delimiters.length == 0) {
+            throw new IllegalArgumentException("empty delimiters");
+        }
+
+
+        if (isLineBased(delimiters) && !isSubclass()) {
+            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
+            this.delimiters = null;
+        } else {
+            this.delimiters = new ByteBuf[delimiters.length];
+            for (int i = 0; i < delimiters.length; i++) {
+                ByteBuf d = delimiters[i];
+                validateDelimiter(d);
+                this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
+            }
+            lineBasedDecoder = null;
+        }
+        this.maxFrameLength = maxFrameLength;
+        this.stripDelimiter = stripDelimiter;
+        this.failFast = failFast;
+    }
+
+    /**
+     * Returns true if the delimiters are "\n" and "\r\n".
+     */
+    private static boolean isLineBased(final ByteBuf[] delimiters) {
+        if (delimiters.length != 2) {
+            return false;
+        }
+        ByteBuf a = delimiters[0];
+        ByteBuf b = delimiters[1];
+        if (a.capacity() < b.capacity()) {
+            a = delimiters[1];
+            b = delimiters[0];
+        }
+        return a.capacity() == 2 && b.capacity() == 1
+                && a.getByte(0) == '\r' && a.getByte(1) == '\n'
+                && b.getByte(0) == '\n';
+    }
+
+    /**
+     * Return {@code true} if the current instance is a subclass of DelimiterJingWeiFrameDecoder
+     */
+    private boolean isSubclass() {
+        return getClass() != DelimiterJingWeiFrameDecoder.class;
+    }
+
+
+    /**
+     * 匹配
+     *
+     * @param in
+     * @param delimiters
+     * @return
+     */
+    protected Integer indexOf(ByteBuf in, ByteBuf[] delimiters) {
+        Integer indexLength = 0;
+        for (ByteBuf delim : delimiters) {
+            int frameLength = indexOf(in, delim);
+            if (frameLength >= 0 && frameLength < minFrameLength) {
+                indexLength = frameLength;
+            }
+        }
+        return indexLength;
+    }
+
+    /**
+     * Create a frame out of the {@link ByteBuf} and return it.
+     *
+     * @param ctx    the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
+     * @param buffer the {@link ByteBuf} from which to read data
+     * @return frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
+     * be created.
+     */
+    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+        // Try all delimiters and choose the delimiter which yields the shortest frame.
+        int minFrameLength = Integer.MAX_VALUE;
+
+
+        ByteBuf minDelim = null;
+        for (ByteBuf delim : delimiters) {
+            int frameLength = indexOf(buffer, delim);
+            if (frameLength >= 0 && frameLength < minFrameLength) {
+                minFrameLength = frameLength;
+                minDelim = delim;
+            }
+        }
+
+        if (minDelim != null) {
+            int minDelimLength = minDelim.capacity();
+            ByteBuf frame;
+
+            if (discardingTooLongFrame) {
+                // We've just finished discarding a very large frame.
+                // Go back to the initial state.
+                discardingTooLongFrame = false;
+                buffer.skipBytes(minFrameLength + minDelimLength);
+
+                int tooLongFrameLength = this.tooLongFrameLength;
+                this.tooLongFrameLength = 0;
+                if (!failFast) {
+                    fail(tooLongFrameLength);
+                }
+                return null;
+            }
+
+            if (minFrameLength > maxFrameLength) {
+                // Discard read frame.
+                buffer.skipBytes(minFrameLength + minDelimLength);
+                fail(minFrameLength);
+                return null;
+            }
+
+            if (stripDelimiter) {
+                frame = buffer.readRetainedSlice(minFrameLength);
+                buffer.skipBytes(minDelimLength);
+            } else {
+                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
+            }
+
+            return frame;
+        } else {
+            if (!discardingTooLongFrame) {
+                if (buffer.readableBytes() > maxFrameLength) {
+                    // Discard the content of the buffer until a delimiter is found.
+                    tooLongFrameLength = buffer.readableBytes();
+                    buffer.skipBytes(buffer.readableBytes());
+                    discardingTooLongFrame = true;
+                    if (failFast) {
+                        fail(tooLongFrameLength);
+                    }
+                }
+            } else {
+                // Still discarding the buffer since a delimiter is not found.
+                tooLongFrameLength += buffer.readableBytes();
+                buffer.skipBytes(buffer.readableBytes());
+            }
+            return null;
+        }
+    }
+
+    private void fail(long frameLength) {
+        if (frameLength > 0) {
+            throw new TooLongFrameException(
+                    "frame length exceeds " + maxFrameLength +
+                            ": " + frameLength + " - discarded");
+        } else {
+            throw new TooLongFrameException(
+                    "frame length exceeds " + maxFrameLength +
+                            " - discarding");
+        }
+    }
+
+    /**
+     * Returns the number of bytes between the readerIndex of the haystack and
+     * the first needle found in the haystack.  -1 is returned if no needle is
+     * found in the haystack.
+     */
+    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
+        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
+            int haystackIndex = i;
+            int needleIndex;
+            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
+                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
+                    break;
+                } else {
+                    haystackIndex++;
+                    if (haystackIndex == haystack.writerIndex() &&
+                            needleIndex != needle.capacity() - 1) {
+                        return -1;
+                    }
+                }
+            }
+
+            if (needleIndex == needle.capacity()) {
+                // Found the needle from the haystack!
+                return i - haystack.readerIndex();
+            }
+        }
+        return -1;
+    }
+
+    private static void validateDelimiter(ByteBuf delimiter) {
+        if (delimiter == null) {
+            throw new NullPointerException("delimiter");
+        }
+        if (!delimiter.isReadable()) {
+            throw new IllegalArgumentException("empty delimiter");
+        }
+    }
+
+    private static void validateMaxFrameLength(int maxFrameLength) {
+        if (maxFrameLength <= 0) {
+            throw new IllegalArgumentException(
+                    "maxFrameLength must be a positive integer: " +
+                            maxFrameLength);
+        }
+    }
+
+}

+ 26 - 26
src/main/java/com/tidecloud/dataacceptance/service/DirectMemoryReporterImpl.java

@@ -29,31 +29,31 @@ public class DirectMemoryReporterImpl {
 
 	private static Logger logger = LoggerFactory.getLogger(DirectMemoryReporterImpl.class);
 
-	@PostConstruct
-	public void init() {
-
-		try {
-			Field field = ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_COUNTER");
-			field.setAccessible(true);
-			directMemory = (AtomicLong) field.get(PlatformDependent.class);
-			Field field1 = ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_LIMIT");
-			field1.setAccessible(true);
-			maxDirectMemory = (Long) field1.get(PlatformDependent.class);
-		} catch (Exception e) {
-			logger.error(e.getMessage());
-		}
-
-		TimerTask timerTask = new TimerTask() {
-			@Override
-			public void run() {
-				System.out.println("task  run:" + new Date());
-				long m = directMemory.get() / _ik;
-				logger.error(BUSINESS_KEY + "maxDirectMemory==={}:{}K", maxDirectMemory/_ik, m);
-			}
-		};
-		Timer timer = new Timer();
-		//安排指定的任务在指定的时间开始进行重复的固定延迟执行。这里是每3秒执行一次
-		timer.schedule(timerTask, 10, 2000);
-	}
+//	@PostConstruct
+//	public void init() {
+//
+//		try {
+//			Field field = ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_COUNTER");
+//			field.setAccessible(true);
+//			directMemory = (AtomicLong) field.get(PlatformDependent.class);
+//			Field field1 = ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_LIMIT");
+//			field1.setAccessible(true);
+//			maxDirectMemory = (Long) field1.get(PlatformDependent.class);
+//		} catch (Exception e) {
+//			logger.error(e.getMessage());
+//		}
+//
+//		TimerTask timerTask = new TimerTask() {
+//			@Override
+//			public void run() {
+//				System.out.println("task  run:" + new Date());
+//				long m = directMemory.get() / _ik;
+//				logger.error(BUSINESS_KEY + "maxDirectMemory==={}:{}K", maxDirectMemory/_ik, m);
+//			}
+//		};
+//		Timer timer = new Timer();
+//		//安排指定的任务在指定的时间开始进行重复的固定延迟执行。这里是每3秒执行一次
+//		timer.schedule(timerTask, 10, 2000);
+//	}
 
 }

+ 3 - 0
src/main/java/com/tidecloud/dataacceptance/service/HexBinaryAcceptanceHandlerAdapter.java

@@ -2,6 +2,8 @@ package com.tidecloud.dataacceptance.service;
 
 import javax.xml.bind.DatatypeConverter;
 
+import com.sun.scenario.effect.Reflection;
+import io.netty.util.ReferenceCountUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -30,6 +32,7 @@ public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboun
 			logger.error(e.getMessage(), e);
 		} finally {
 			in.release();
+			//ReferenceCountUtil.release(msg);// 显示丢弃已经接受的消息
 			
 		}
 		

+ 615 - 464
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchJWServerHandler.java

@@ -1,7 +1,10 @@
 package com.tidecloud.dataacceptance.service.impl;
 
+import com.accept.client.VoiceMsgClient;
+import com.accept.model.VoiceMsgVo;
 import com.alibaba.fastjson.JSON;
 import com.tidecloud.dataacceptance.common.DateUtil;
+import com.tidecloud.dataacceptance.service.DelimiterJingWeiFrameDecoder;
 import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
@@ -14,9 +17,11 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.ByteProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
@@ -37,472 +42,618 @@ import java.util.concurrent.TimeUnit;
 @Component(WatchJWServerHandler.name)
 public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-	public static final String name = "WatchJWServerHandler";
-
-	private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
-
-	private static ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-	private static final Long INTERVAL_TIME = 300000L; // 开关时间
-	private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
-
-	/**
-	 * 紧急模式
-	 */
-	private static final Integer URGENCY = 3;
-
-	/**
-	 * 其他模式
-	 */
-	private static final Integer OTHER = 2;
-	/**
-	 * 省电模式开关
-	 */
-	private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
-
-	/**
-	 * 设置模式
-	 */
-	private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
-
-	@Override
-	protected void handle(ByteBuf in, Channel channel) throws Exception {
-
-		//String msg = byteBufferToString(in.nioBuffer());TODO (注意)
-		byte[] req = new byte[in.readableBytes()];
-		in.readBytes(req);
-		String msg = new String(req, "UTF-8");
-		try {
-			String deviceId = channelDeviceMap.get(channel);
-			String factory = msg.substring(0, 2);// 工厂
-			String type = msg.substring(2, 6);// 标记
-			if (deviceId != null) {
-				MDC.put(MDC_DEVICEID, deviceId);
-			} else {
-				logger.info("该手表没有登录:传入数据为" + msg);
-				if (!"AP00".equals(type)) {
-					logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
-					channel.close();
-					return;
-				}
-			}
-			logger.info("传入数据为:" + msg);
-			Long time = System.currentTimeMillis();
-			WorkModel workModel = null;
-			SwitchWorkModel swm = null;
-			if (deviceId != null) {
-				workModel = modelMap.get(deviceId);//已登录
-				swm = switchMap.get(deviceId);// 开关
-				if (workModel != null && workModel.getUrgentType() == OTHER
-						&& (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
-					workModel.setUrgentTime(time);
-					workModel.setUrgentType(OTHER);
-					modelMap.put(deviceId, workModel);
-					normalReplyModel(factory, deviceId, channel, 3);
-					logger.warn("超过指定时间没有收到回复》》》 重新设置");
-					// 更新开关切换时间
-					swm.setSwitchTime(time);
-					switchMap.put(deviceId, swm);
-				}
-			}
-			switch (type) {
-				case "AP00": // 初始化登录
-					resolveLoginMSG(msg, channel);
-					break;
-				case "AP03": // 连接(心跳) BP03#
-					normalReply(factory, channel, "BP03");
-					checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
-					sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
-					logger.warn("心跳报文下发 msg+" + msg);
-					break;
-				case "AP01": // 位置信息
-					String gpsState = msg.substring(12, 13);
-					// 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
-					String gDay = msg.substring(6, 12);
-					String gTime = msg.substring(39, 45);
+    public static final String name = "WatchJWServerHandler";
+
+    private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
+
+    private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    private static final Long INTERVAL_TIME = 300000L; // 开关时间
+    private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
+
+    /**
+     * 紧急模式
+     */
+    private static final Integer URGENCY = 3;
+
+    /**
+     * 其他模式
+     */
+    private static final Integer OTHER = 2;
+    /**
+     * 省电模式开关
+     */
+    private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
+
+    /**
+     * 设置模式
+     */
+    private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
+
+    private final ByteBuf TAG_AP07 = Unpooled.copiedBuffer("AP07".getBytes());
+    private final ByteBuf TAG_COMMA = Unpooled.copiedBuffer(",".getBytes());
+
+    private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
+
+    @Autowired
+    private VoiceMsgClient voiceMsgClient;
+
+    @Override
+    protected void handle(ByteBuf in, Channel channel) throws Exception {
+        //String msg = byteBufferToString(in.nioBuffer());TODO (注意)
+        byte[] req = new byte[in.readableBytes()];
+        in.readBytes(req);
+        String msg = new String(req, "UTF-8");
+        try {
+            String deviceId = channelDeviceMap.get(channel);
+            String factory = msg.substring(0, 2);// 工厂
+            String type = msg.substring(2, 6);// 标记
+            if (deviceId != null) {
+                MDC.put(MDC_DEVICEID, deviceId);
+            } else {
+                logger.info("该手表没有登录:传入数据为" + msg);
+                if (!"AP00".equals(type)) {
+                    logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
+                    channel.close();
+                    return;
+                }
+            }
+            logger.info("传入数据为:" + msg);
+            Long time = System.currentTimeMillis();
+            WorkModel workModel = null;
+            SwitchWorkModel swm = null;
+            if (deviceId != null) {
+                workModel = modelMap.get(deviceId);//已登录
+                swm = switchMap.get(deviceId);// 开关
+                if (workModel != null && workModel.getUrgentType() == OTHER
+                        && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
+                    workModel.setUrgentTime(time);
+                    workModel.setUrgentType(OTHER);
+                    modelMap.put(deviceId, workModel);
+                    normalReplyModel(factory, deviceId, channel, 3);
+                    logger.warn("超过指定时间没有收到回复》》》 重新设置");
+                    // 更新开关切换时间
+                    swm.setSwitchTime(time);
+                    switchMap.put(deviceId, swm);
+                }
+            }
+            switch (type) {
+                case "AP00": // 初始化登录
+                    resolveLoginMSG(msg, channel);
+                    break;
+                case "AP03": // 连接(心跳) BP03#
+                    normalReply(factory, channel, "BP03");
+                    checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
+                    sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
+                    logger.warn("心跳报文下发 msg+" + msg);
+//                    //TODO (判断当前设备是否有需要回复的 数据)
+                    VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
+                    if (voiceF.getLag() == 1) {
+                        normalBP28Reply(channel, voiceF);
+                    }
+                    break;
+                case "AP01": // 位置信息
+                    String gpsState = msg.substring(12, 13);
+                    // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
+                    String gDay = msg.substring(6, 12);
+                    String gTime = msg.substring(39, 45);
 //					if (checkGpsTime(gDay + gTime, time)) {
 //						channel.close();
 //					}
-					setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
-					normalReply(factory, channel, "BP01");
-					break;
-				case "AP33": // 设置模式回复
-					//IWAP33,080835,1#   IWAP33,080835,03#
-					if (deviceId != null) {
-						Integer moderType = getInteger(msg.substring(14, msg.indexOf("#")));// 收到回复状态
-						if (swm.getWorkType() == URGENCY) {//  当前设置的模式
-							if (workModel != null && moderType == URGENCY) {
-								workModel.setUrgentType(URGENCY);
-								logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
-							} else {
-								logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
-								Thread.sleep(1000);
-								normalReplyModel(factory, deviceId, channel, URGENCY);
-								workModel.setUrgentType(OTHER);
-								// 更新开关时间
-								swm.setSwitchTime(time);
-								switchMap.put(deviceId, swm);
-							}
-							modelMap.put(deviceId, workModel);
-						}
-					}
-					break;
-				case "APHT": // 心率测量
-					sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
-					normalReply(factory, channel, "BP49");
-					break;
-				default: // 其他
-					logger.info("client send data without handle type ...");
-					break;
-			}
-		} finally {
-			MDC.clear();
-		}
-	}
-
-
-	/**
-	 * 模式设置
-	 *
-	 * @param factory
-	 * @param deviceId
-	 * @param channel
-	 */
-	protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
-		Long nowTime = System.currentTimeMillis();// 当前时间戳
-		if (deviceId == null || !switchMap.containsKey(deviceId)) {
-			return;
-		}
-		SwitchWorkModel swm = switchMap.get(deviceId);
-		logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
-		if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
-			Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY;
-			if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
-				workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式
-			}
-			if (URGENCY == workType) {
-				//如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据
-				swm.setFirstRemove(Boolean.FALSE);
-			}
-			// 如果当前状态为A 紧急模式
-			swm.setSwitchTime(nowTime);
-			swm.setWorkType(workType);
-			switchMap.put(deviceId, swm);
-			logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
-			normalReplyModel(factory, deviceId, channel, workType);
-			if (workType == URGENCY) {// 设置紧急模式  则更新紧急模式设置状态
-				WorkModel wm = new WorkModel();
-				wm.setUrgentType(OTHER);//
-				wm.setUrgentTime(nowTime);// 设置紧急模式时间
-				modelMap.put(deviceId, wm);
-			}
-		} else {
-			logger.warn("心跳检测是下发模式:工作不更改状态");
-		}
-	}
-
-
-	/**
-	 * 初始化 数据记录 注册记录
-	 *
-	 * @param deviceId
-	 */
-	protected void initSwitchMap(String deviceId) {
-		if (deviceId == null) {
-			return;
-		}
-		Long time = System.currentTimeMillis();
-		SwitchWorkModel swm = new SwitchWorkModel();
-		swm.setActiveTime(time);
-		swm.setSwitchTime(time);
-		swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式
-		swm.setGpsUpTime(0L);// GPS 上报时间接口
-		swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃
-		switchMap.put(deviceId, swm);
-		logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
-	}
-
-	/**
-	 * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接
-	 * <p>
-	 *
-	 * @return 返回值为ture  则需要断开连接,如果返回值为false 则不需要断开连接
-	 */
-	protected Boolean checkGpsTime(String gTime, Long time) {
-
-		try {
-			DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss");
-			Date date = fmt.parse(gTime);
-			if ((time - date.getTime()) > 3 * 3600 * 1000) {
-				return Boolean.TRUE;
-			}
-		} catch (Exception e) {
-			logger.error(e.getMessage(), e);
-		}
-		return Boolean.FALSE;
-	}
-
-	/**
-	 * APOI 上报数据 更新活跃时间
-	 *
-	 * @param deviceId
-	 */
-	protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) {
-		if (deviceId == null) {
-			return null;
-		}
-		Long time = System.currentTimeMillis();
-		SwitchWorkModel swm = switchMap.getOrDefault(deviceId, new SwitchWorkModel());
-		swm.setGpsUpTime(time);
-		if ("A".equals(gpsState)) {
-			swm.setActiveTime(time);
-			logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
-			if (swm.getFirstRemove()) {
-				sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
-			} else {
-				swm.setFirstRemove(Boolean.TRUE);
-				logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId);
-			}
-		} else {
-			logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
-		}
-		switchMap.put(deviceId, swm);
-		logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
-		return swm;
-	}
-
-	protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
-		ByteBuf dataByteBufCopy = dataByteBuf.copy();
-		byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
-		dataByteBufCopy.readBytes(dataByteArray);
-		dataByteBufCopy.release();
-	}
-
-	/**
-	 * 登录管理
-	 *
-	 * @param channel
-	 */
-	private void resolveLoginMSG(String msg, Channel channel) {
-		/*IWAP00353456789012345# */
-		String message = String.valueOf(msg);
-		String factory = message.substring(0, 2);
-		String deviceId = message.substring(6, 21);
-		String deviceIdInMap = channelDeviceMap.get(channel);
-		MDC.put(MDC_DEVICEID, deviceId);
-		if (!deviceId.equals(deviceIdInMap)) {
-			manageChannel(channel, deviceId);
-		}
-		String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
-		normalReply(factory, channel, "BP00," + date + ",8");
-		Date loginTime = new Date();
-		initSwitchMap(deviceId);
-		executorService.execute(new Runnable() {
-			@Override
-			public void run() {
-				Date currentTime = new Date();
-				try {
-					Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
-					if (secondsSinceLogin < 5L) {
-						TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
-					}
-					normalReplyModel(factory, deviceId, channel, URGENCY);
-					WorkModel wm = new WorkModel();
-					wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
-					wm.setUrgentType(OTHER);// 默认指令模式为正常模式
-					modelMap.put(deviceId, wm);
-				} catch (InterruptedException e) {
-					logger.error(e.getMessage());
-				}
-			}
-		});
-
-	}
-
-
-	// IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
-	private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
-		StringBuilder replyCommand = new StringBuilder();
-		replyCommand.append(factory).append("BP33").append(",")
-				.append(deviceId).append(",")
-				.append("080835").append(",")
-				.append(workType).append("#");
-		String replyCommandStr = replyCommand.toString();
-		ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
-		buffer.writeBytes(replyCommandStr.getBytes());
-		ChannelFuture channelFuture = channel.writeAndFlush(buffer);
-		channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
-	}
-
-	/**
-	 * 回复
-	 *
-	 * @param channel
-	 * @content 回复内容
-	 */
-	private void normalReply(String factory, Channel channel, String content) {
-		// gps ==== >IW BP01#
-		// 登录 ==== >IW BP00,20150101125223,8#
-		// 心跳 ==== >IW BP03#
-		StringBuilder replyCommand = new StringBuilder();
-		replyCommand.append(factory);
-		replyCommand.append(content);
-		replyCommand.append("#");
-		String replyCommandStr = replyCommand.toString();
-		ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
-		buffer.writeBytes(replyCommandStr.getBytes());
-		ChannelFuture channelFuture = channel.writeAndFlush(buffer);
-		channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
-	}
-
-	@Override
-	public void startAcceptor() {
-		EventLoopGroup bossGroup = new NioEventLoopGroup();
-		EventLoopGroup workerGroup = new NioEventLoopGroup();
-		try {
-			ServerBootstrap b = new ServerBootstrap();
-			b.group(bossGroup, workerGroup)
-					.channel(NioServerSocketChannel.class)
-					.option(ChannelOption.SO_BACKLOG, 1024)
-					.handler(new LoggingHandler(LogLevel.INFO))
-					.childHandler(new ChannelInitializer<SocketChannel>() {
-						@Override
-						protected void initChannel(SocketChannel ch) {
-							ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
-							ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
-							ch.pipeline().addLast(WatchJWServerHandler.this);
-						}
-					});
-			ChannelFuture f = b.bind(port).sync();
-			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
-					this.getPort());
-			f.channel().closeFuture().sync();
-		} catch (Exception ex) {
-			logger.warn(ex.getMessage(), ex);
-		} finally {
-			cleanRedisLinkData();
-			workerGroup.shutdownGracefully();
-			bossGroup.shutdownGracefully();
-		}
-	}
-
-
-	/**
-	 * 移除 失效的 deviceId
-	 *
-	 * @param ctx
-	 * @throws Exception
-	 */
-	@Override
-	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-		Channel channel = ctx.channel();
-		if (!channel.isActive()) {
-			String deviceId = channelDeviceMap.get(channel);
-			if (deviceId != null) {
-				switchMap.remove(deviceId);
-				modelMap.remove(deviceId);
-			}
-		}
-		super.channelInactive(ctx);
-
-	}
-
-	class WorkModel {
-		/**
-		 * 设置紧急模式时间
-		 */
-		private Long urgentTime;
-
-		/**
-		 * 设置紧急模式状态
-		 */
-		private Integer urgentType;
-
-		public Long getUrgentTime() {
-			return urgentTime;
-		}
-
-		public void setUrgentTime(Long urgentTime) {
-			this.urgentTime = urgentTime;
-		}
-
-		public Integer getUrgentType() {
-			return urgentType;
-		}
-
-		public void setUrgentType(Integer urgentType) {
-			this.urgentType = urgentType;
-		}
-	}
-
-	/**
-	 * 开关工作模式
-	 */
-	class SwitchWorkModel {
-		/**
-		 * 有效数据时间 毫秒
-		 */
-		private Long activeTime;
-
-		/**
-		 * 开关 A 有效,V 无效
-		 */
-		private Integer workType;
-
-		/**
-		 * 开关切换时间 毫秒
-		 */
-		private Long switchTime;
-
-		/**
-		 * GPS APO1 上傳时间
-		 */
-		private Long gpsUpTime;
-
-		/**
-		 * 模式切换第一条是否移除  true   不需要移除, false  需要移除
-		 */
-		private Boolean firstRemove = Boolean.TRUE;
-
-		public Long getActiveTime() {
-			return activeTime;
-		}
-
-		public void setActiveTime(Long activeTime) {
-			this.activeTime = activeTime;
-		}
-
-		public Integer getWorkType() {
-			return workType;
-		}
-
-		public void setWorkType(Integer workType) {
-			this.workType = workType;
-		}
-
-		public Long getSwitchTime() {
-			return switchTime;
-		}
-
-		public void setSwitchTime(Long switchTime) {
-			this.switchTime = switchTime;
-		}
-
-		public Long getGpsUpTime() {
-			return gpsUpTime;
-		}
-
-		public void setGpsUpTime(Long gpsUpTime) {
-			this.gpsUpTime = gpsUpTime;
-		}
-
-		public Boolean getFirstRemove() {
-			return firstRemove;
-		}
-
-		public void setFirstRemove(Boolean firstRemove) {
-			this.firstRemove = firstRemove;
-		}
-	}
+                    setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
+                    normalReply(factory, channel, "BP01");
+                    break;
+                case "AP33": // 设置模式回复
+                    //IWAP33,080835,1#   IWAP33,080835,03#
+                    if (deviceId != null) {
+                        Integer moderType = getInteger(msg.substring(14, msg.indexOf("#")));// 收到回复状态
+                        if (swm.getWorkType() == URGENCY) {//  当前设置的模式
+                            if (workModel != null && moderType == URGENCY) {
+                                workModel.setUrgentType(URGENCY);
+                                logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
+                            } else {
+                                logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
+                                Thread.sleep(1000);
+                                normalReplyModel(factory, deviceId, channel, URGENCY);
+                                workModel.setUrgentType(OTHER);
+                                // 更新开关时间
+                                swm.setSwitchTime(time);
+                                switchMap.put(deviceId, swm);
+                            }
+                            modelMap.put(deviceId, workModel);
+                        }
+                    }
+                    break;
+                case "APHT": // 心率测量
+                    sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
+                    normalReply(factory, channel, "BP49");
+                    // TODO 设置心率周期上传还是 开机上传一次
+                    break;
+                case "AP07": // 语音上传
+                    // TODO 设置语音上传  IWAP07,20140818064408,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
+                    VoiceMsgVo ap07Msg = splitAP07VoiceMsg(msg, deviceId, in);
+                    // 调用接口 写库
+                    VoiceMsgVo vo = voiceMsgClient.insVoiceMsg(ap07Msg);
+                    normalVoiceReply(channel, vo);
+                    break;
+                case "AP28": // 语音下行终端回复
+                    // 调用接口  查询回复内容
+                    VoiceMsgVo ap28Msg = splitAP28VoiceMsg(msg, deviceId);
+                    VoiceMsgVo voiceMsg = null;
+                    if (ap28Msg.getLag() == 1) {
+                        if (ap28Msg.getNu() < ap28Msg.getTotal()) {
+                            // 回复下个语音包
+                            voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu() + 1);
+                        } else {
+                            // 更新数据状态为发送完毕
+                            voiceMsgClient.updateVoiceMsgSendFinish(deviceId, ap28Msg.getMsgId());
+                        }
+                    } else {
+                        // 回复上个语音包
+                        voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu());
+                    }
+                    if (voiceMsg != null && voiceMsg.getLag() == 1) {
+                        normalBP28Reply(channel, voiceMsg);
+                    }
+                    break;
+                default: // 其他
+                    logger.info("client send data without handle type ...");
+                    break;
+            }
+        } finally {
+            MDC.clear();
+        }
+    }
+
+
+    /**
+     * 语音上行回复
+     *
+     * @param channel
+     * @content 回复内容    IWBP07,20140818064408,6,1,1#
+     */
+    private void normalVoiceReply(Channel channel, VoiceMsgVo voiceMsg) {
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append("IWBP07").append(",");
+        replyCommand.append(voiceMsg.getVoiceTime()).append(",");
+        replyCommand.append(voiceMsg.getTotal()).append(",");
+        replyCommand.append(voiceMsg.getNu()).append(",");
+        replyCommand.append(voiceMsg.getLag());// 接受成功1:成功,0 失败
+        replyCommand.append("#");
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
+    }
+
+    /**
+     * 语音下行回复
+     *
+     * @param channel
+     * @content 回复内容   获取下一条代发送终端语音  IWBP28, D3590D54,XXXX,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
+     */
+    private void normalBP28Reply(Channel channel, VoiceMsgVo voiceMsg) {
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append("IWBP28").append(",");
+        replyCommand.append("D3590D54").append(",");
+        replyCommand.append(voiceMsg.getMsgId()).append(",");
+        replyCommand.append(voiceMsg.getTotal()).append(",");
+        replyCommand.append(voiceMsg.getNu()).append(",");
+        replyCommand.append(voiceMsg.getLength()).append(",");
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        buffer.writeBytes(voiceMsg.getMsg());
+//        buffer.readerIndex(0);
+//        byte[] req = new byte[buffer.readableBytes()];
+//        buffer.readBytes(req);
+//        voiceMsg.setMsg(req);
+//        VoiceMsgVo vo = voiceMsgClient.insVoiceMsg(voiceMsg);// 调用接口 写库
+        buffer.readerIndex(0);
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg())));
+    }
+
+    /**
+     * 上行语音包才解析
+     *
+     * @param msg      IWAP07,20140818064408,6,1,1024,XXXXXXXX#
+     * @param deviceId
+     * @return
+     */
+    protected VoiceMsgVo splitAP07VoiceMsg(String msg, String deviceId, ByteBuf byteBufIn) {
+        VoiceMsgVo voiceMsg = new VoiceMsgVo();
+        String[] msgArr = msg.split(",");
+        voiceMsg.setDeviceId(deviceId);
+        voiceMsg.setVoiceTime(msgArr[1]);
+        voiceMsg.setTotal(Integer.valueOf(msgArr[2]));
+        voiceMsg.setNu(Integer.valueOf(msgArr[3]));
+        voiceMsg.setLength(Integer.valueOf(msgArr[4]));
+
+        try {
+            byteBufIn.readerIndex(0);
+            // 按照逗号切割
+            int cutLength = 0;// 存储最后一次查询条件
+            for (int i = 0; i < 5; i++) {
+                int commaLength = byteBufIn.forEachByte(byteBufIn.readerIndex(), 30, FIND_COMMA);
+                cutLength = commaLength;
+                byteBufIn.readerIndex(commaLength + 1);
+            }
+            byteBufIn.readerIndex(cutLength + 1);
+            byte[] bytes = new byte[voiceMsg.getLength() + 1];
+            byteBufIn.readBytes(bytes);
+            String msg1 = new String(bytes, "UTF-8");
+            logger.warn("入库参数" + msg1);
+            voiceMsg.setMsg(bytes);
+        } catch (Exception e) {
+            logger.warn("语音解析错无" + JSON.toJSONString(voiceMsg) + ":" + e.getStackTrace());
+        }
+        return voiceMsg;
+    }
+
+    /**
+     * 下行语音包才解析
+     *
+     * @param msg      IWAP28,D3590D54,XXXX,6,1,1#
+     * @param deviceId
+     * @return
+     */
+    protected VoiceMsgVo splitAP28VoiceMsg(String msg, String deviceId) {
+        VoiceMsgVo voiceMsg = new VoiceMsgVo();
+        String[] msgArr = msg.split(",");
+        voiceMsg.setDeviceId(deviceId);
+        voiceMsg.setTotal(Integer.valueOf(msgArr[3]));
+        voiceMsg.setNu(Integer.valueOf(msgArr[4]));
+        voiceMsg.setLag(Integer.valueOf(msgArr[5].substring(0, 1)));
+        voiceMsg.setMsgId(Integer.valueOf(msgArr[2]));
+        return voiceMsg;
+    }
+
+
+    /**
+     * 模式设置
+     *
+     * @param factory
+     * @param deviceId
+     * @param channel
+     */
+    protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
+        Long nowTime = System.currentTimeMillis();// 当前时间戳
+        if (deviceId == null || !switchMap.containsKey(deviceId)) {
+            return;
+        }
+        SwitchWorkModel swm = switchMap.get(deviceId);
+        logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
+        if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
+            Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY;
+            if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
+                workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式
+            }
+            if (URGENCY == workType) {
+                //如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据
+                swm.setFirstRemove(Boolean.FALSE);
+            }
+            // 如果当前状态为A 紧急模式
+            swm.setSwitchTime(nowTime);
+            swm.setWorkType(workType);
+            switchMap.put(deviceId, swm);
+            logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
+            normalReplyModel(factory, deviceId, channel, workType);
+            if (workType == URGENCY) {// 设置紧急模式  则更新紧急模式设置状态
+                WorkModel wm = new WorkModel();
+                wm.setUrgentType(OTHER);//
+                wm.setUrgentTime(nowTime);// 设置紧急模式时间
+                modelMap.put(deviceId, wm);
+            }
+        } else {
+            logger.warn("心跳检测是下发模式:工作不更改状态");
+        }
+    }
+
+
+    /**
+     * 初始化 数据记录 注册记录
+     *
+     * @param deviceId
+     */
+    protected void initSwitchMap(String deviceId) {
+        if (deviceId == null) {
+            return;
+        }
+        Long time = System.currentTimeMillis();
+        SwitchWorkModel swm = new SwitchWorkModel();
+        swm.setActiveTime(time);
+        swm.setSwitchTime(time);
+        swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式
+        swm.setGpsUpTime(0L);// GPS 上报时间接口
+        swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃
+        switchMap.put(deviceId, swm);
+        logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
+    }
+
+    /**
+     * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接
+     * <p>
+     *
+     * @return 返回值为ture  则需要断开连接,如果返回值为false 则不需要断开连接
+     */
+    protected Boolean checkGpsTime(String gTime, Long time) {
+
+        try {
+            DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss");
+            Date date = fmt.parse(gTime);
+            if ((time - date.getTime()) > 3 * 3600 * 1000) {
+                return Boolean.TRUE;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return Boolean.FALSE;
+    }
+
+    /**
+     * APOI 上报数据 更新活跃时间
+     *
+     * @param deviceId
+     */
+    protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) {
+        if (deviceId == null) {
+            return null;
+        }
+        Long time = System.currentTimeMillis();
+        SwitchWorkModel swm = switchMap.getOrDefault(deviceId, new SwitchWorkModel());
+        swm.setGpsUpTime(time);
+        if ("A".equals(gpsState)) {
+            swm.setActiveTime(time);
+            logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
+            if (swm.getFirstRemove()) {
+                sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
+            } else {
+                swm.setFirstRemove(Boolean.TRUE);
+                logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId);
+            }
+        } else {
+            logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
+        }
+        switchMap.put(deviceId, swm);
+        logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
+        return swm;
+    }
+
+    protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
+        ByteBuf dataByteBufCopy = dataByteBuf.copy();
+        byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
+        dataByteBufCopy.readBytes(dataByteArray);
+        dataByteBufCopy.release();
+    }
+
+    /**
+     * 登录管理
+     *
+     * @param channel
+     */
+    private void resolveLoginMSG(String msg, Channel channel) {
+        /*IWAP00353456789012345# */
+        String message = String.valueOf(msg);
+        String factory = message.substring(0, 2);
+        String deviceId = message.substring(6, 21);
+        String deviceIdInMap = channelDeviceMap.get(channel);
+        MDC.put(MDC_DEVICEID, deviceId);
+        if (!deviceId.equals(deviceIdInMap)) {
+            manageChannel(channel, deviceId);
+        }
+        String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
+        normalReply(factory, channel, "BP00," + date + ",8");
+        Date loginTime = new Date();
+        initSwitchMap(deviceId);
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                Date currentTime = new Date();
+                try {
+                    Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
+                    if (secondsSinceLogin < 5L) {
+                        TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
+                    }
+                    normalReplyModel(factory, deviceId, channel, URGENCY);
+                    WorkModel wm = new WorkModel();
+                    wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
+                    wm.setUrgentType(OTHER);// 默认指令模式为正常模式
+                    modelMap.put(deviceId, wm);
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+        });
+
+    }
+
+
+    // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
+    private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append(factory).append("BP33").append(",")
+                .append(deviceId).append(",")
+                .append("080835").append(",")
+                .append(workType).append("#");
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
+    }
+
+    /**
+     * 回复
+     *
+     * @param channel
+     * @content 回复内容
+     */
+    private void normalReply(String factory, Channel channel, String content) {
+        // gps ==== >IW BP01#
+        // 登录 ==== >IW BP00,20150101125223,8#
+        // 心跳 ==== >IW BP03#
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append(factory);
+        replyCommand.append(content);
+        replyCommand.append("#");
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
+    }
+
+    @Override
+    public void startAcceptor() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .option(ChannelOption.SO_BACKLOG, 1024)
+                    .handler(new LoggingHandler(LogLevel.INFO))
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        protected void initChannel(SocketChannel ch) {
+                            ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
+                            // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
+                            ch.pipeline().addLast(new DelimiterJingWeiFrameDecoder(65535, false, delimiter));
+                            ch.pipeline().addLast(WatchJWServerHandler.this);
+                        }
+                    });
+            ChannelFuture f = b.bind(port).sync();
+            logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+                    this.getPort());
+            f.channel().closeFuture().sync();
+        } catch (Exception ex) {
+            logger.warn(ex.getMessage(), ex);
+        } finally {
+            cleanRedisLinkData();
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+
+
+    /**
+     * 移除 失效的 deviceId
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        Channel channel = ctx.channel();
+        if (!channel.isActive()) {
+            String deviceId = channelDeviceMap.get(channel);
+            if (deviceId != null) {
+                switchMap.remove(deviceId);
+                modelMap.remove(deviceId);
+            }
+        }
+        super.channelInactive(ctx);
+
+    }
+
+    class WorkModel {
+        /**
+         * 设置紧急模式时间
+         */
+        private Long urgentTime;
+
+        /**
+         * 设置紧急模式状态
+         */
+        private Integer urgentType;
+
+        public Long getUrgentTime() {
+            return urgentTime;
+        }
+
+        public void setUrgentTime(Long urgentTime) {
+            this.urgentTime = urgentTime;
+        }
+
+        public Integer getUrgentType() {
+            return urgentType;
+        }
+
+        public void setUrgentType(Integer urgentType) {
+            this.urgentType = urgentType;
+        }
+    }
+
+    /**
+     * 开关工作模式
+     */
+    class SwitchWorkModel {
+        /**
+         * 有效数据时间 毫秒
+         */
+        private Long activeTime;
+
+        /**
+         * 开关 A 有效,V 无效
+         */
+        private Integer workType;
+
+        /**
+         * 开关切换时间 毫秒
+         */
+        private Long switchTime;
+
+        /**
+         * GPS APO1 上傳时间
+         */
+        private Long gpsUpTime;
+
+        /**
+         * 模式切换第一条是否移除  true   不需要移除, false  需要移除
+         */
+        private Boolean firstRemove = Boolean.TRUE;
+
+        public Long getActiveTime() {
+            return activeTime;
+        }
+
+        public void setActiveTime(Long activeTime) {
+            this.activeTime = activeTime;
+        }
+
+        public Integer getWorkType() {
+            return workType;
+        }
+
+        public void setWorkType(Integer workType) {
+            this.workType = workType;
+        }
+
+        public Long getSwitchTime() {
+            return switchTime;
+        }
+
+        public void setSwitchTime(Long switchTime) {
+            this.switchTime = switchTime;
+        }
+
+        public Long getGpsUpTime() {
+            return gpsUpTime;
+        }
+
+        public void setGpsUpTime(Long gpsUpTime) {
+            this.gpsUpTime = gpsUpTime;
+        }
+
+        public Boolean getFirstRemove() {
+            return firstRemove;
+        }
+
+        public void setFirstRemove(Boolean firstRemove) {
+            this.firstRemove = firstRemove;
+        }
+    }
 
 }