rainbow954 há 7 anos atrás
pai
commit
fb0e92fb88

+ 77 - 84
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceInboundHandlerAdapter.java

@@ -33,6 +33,8 @@ import org.springframework.util.concurrent.SuccessCallback;
 import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.common.NumUtil;
 import com.tidecloud.dataacceptance.entity.ConnectMsg;
+import com.tidecloud.dataacceptance.entity.Session;
+import com.tidecloud.dataacceptance.entity.SessionManager;
 import com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler;
 
 import io.netty.bootstrap.ServerBootstrap;
@@ -47,11 +49,13 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 
-public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
+public  class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
 
 	protected String ip;
 
@@ -78,6 +82,8 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 	public static Map<String, Channel> socketyChannelMap = new HashMap<>();
 	public static Map<Channel, String> channelDeviceMap = new HashMap<>();
 
+	private final SessionManager sessionManager = SessionManager.getInstance();
+
 	private static ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
 
 	private static ExecutorService kafkaSendThreadPool = Executors.newSingleThreadExecutor();
@@ -88,41 +94,34 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 
 	private String topic;
 
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		ByteBuf byteBuf = (ByteBuf) msg;
-		String str = byteBufferToString(byteBuf.nioBuffer());
-		logger.info("接入数据:{}", str);
-
-		try {
-			reply(ctx, str);
-		} catch (Exception e) {
-			logger.error(e.getMessage());
-		} finally {
-			byteBuf.release();
-		}
-	}
-
-	abstract public void reply(ChannelHandlerContext ctx, String msg) throws Exception;
 	
 
+	
+	
+	
 
 	protected void receiveMsg(String msg, String deviceId, Channel channel) {
-		Channel channelInMap = channelMapOfChannelKey.get(deviceId);
-		if (channelInMap == null) {
-			manageChannel(channel, deviceId);
-		}
+
+		manageChannel(channel, deviceId);
+
 		kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
 		singleThreadPool.execute(() -> dataStorage(msg));
 	}
 
-	private void manageChannel(Channel channel, String deviceId) {
-		String socketkey = UUID.randomUUID().toString();
+	protected void manageChannel(Channel channel, String deviceId) {
+		String socketkey = channel.id().asLongText();
+		Channel channelInMap = socketyChannelMap.get(socketkey);
+		String deviceIdInMap = channelDeviceMap.get(channel);
+		if (channelInMap != null && deviceIdInMap != null) {
+			logger.debug("device [{}] has link [{}]", deviceId, socketkey);
+			return;
+		}
 		socketyChannelMap.put(socketkey, channel);
 		channelDeviceMap.put(channel, deviceId);
-		String addressStr = ConnectMsg.ipToLong(this.ip);
-		ConnectMsg cMsg = new ConnectMsg(this.ip, socketkey);
-		try (Jedis jedis = jedisPool.getResource()) {
+		String addressStr = ConnectMsg.ipToLong(ip);
+		ConnectMsg cMsg = new ConnectMsg(ip, socketkey);
+		Jedis jedis = jedisPool.getResource();
+		try {
 			jedis.select(REDIS_INDEX_LINK);
 			String insertKey = PREFIX_LINK + addressStr;
 			String selectKey = PREFIX_DEVICE + deviceId;
@@ -131,7 +130,9 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 			jedis.sadd(insertBackupKey, deviceId);
 			jedis.set(selectKey, StringUtil.convert2String(cMsg));
 		} catch (Exception e) {
-			logger.error(e.getMessage(), e);
+			e.printStackTrace();
+		} finally {
+			jedis.close();
 		}
 	}
 
@@ -185,27 +186,6 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 		}
 	}
 
-	// public static void writeDevice2File(File file, String deviceStr) {
-	// BufferedWriter bufferedWriter = null;
-	// try {
-	// bufferedWriter = new BufferedWriter(new FileWriter(file, true));
-	// bufferedWriter.write(deviceStr);
-	// bufferedWriter.newLine();
-	// bufferedWriter.flush();
-	// } catch (IOException e) {
-	// logger.error(e.getMessage());
-	// } finally {
-	// if (bufferedWriter != null) {
-	// try {
-	// bufferedWriter.close();
-	// } catch (IOException e) {
-	// logger.error("close file[{}] failed: {}", file.getName(), e.getMessage());
-	// }
-	// }
-	//
-	// }
-	// }
-
 	public static void writeDevice2File(File file, String deviceStr) {
 		Integer length = deviceStr.getBytes().length;
 		try {
@@ -226,8 +206,10 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 	}
 
 	@Override
-	public void channelActive(final ChannelHandlerContext ctx) throws Exception {
-		saveChannel(ctx);
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		Session session = Session.buildSession(ctx.channel());
+		sessionManager.put(session.getId(), session);
+		logger.debug("client linking server session : [{}]", StringUtil.convert2String(session));
 	}
 
 	@Override
@@ -237,14 +219,18 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 			String deviceId = channelDeviceMap.get(channel);
 			if (deviceId != null) {
 				channelDeviceMap.remove(channel);
-				// deleteLinkFromRedis(deviceId);
+				deleteLinkFromRedis(deviceId);
 			}
 		}
 		super.channelInactive(ctx);
 		ctx.close();
+		final String sessionId = ctx.channel().id().asLongText();
+		Session session = sessionManager.findBySessionId(sessionId);
+		this.sessionManager.removeBySessionId(sessionId);
+		logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session));
 	}
 
-	private void deleteLinkFromRedis(String deviceId) {
+	protected void deleteLinkFromRedis(String deviceId) {
 		String deleteKey = PREFIX_DEVICE + deviceId;
 		try (Jedis jedis = jedisPool.getResource()) {
 			jedis.select(REDIS_INDEX_LINK);
@@ -263,9 +249,17 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 		}
 	}
 
-	private void saveChannel(ChannelHandlerContext ctx) {
-	}
-
+	@Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.READER_IDLE) {
+                Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
+                logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session));
+                ctx.close();
+            }
+        }
+    }
 	public static String byteBufferToString(ByteBuffer buffer) {
 		CharBuffer charBuffer = null;
 		try {
@@ -329,35 +323,34 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 			bossGroup.shutdownGracefully();
 		}
 	}
-	
-	
-	   protected void cleanRedisLinkData() {
-	        
-	        try(Jedis jedis = jedisPool.getResource()){
-	            jedis.select(YiTongGpsServerHandler.REDIS_INDEX_LINK);
-	            String addressStr = ConnectMsg.ipToLong(this.ip);
-	            String selectKey = YiTongGpsServerHandler.PREFIX_LINK_BACK + addressStr;
-	            Set<String> values = jedis.smembers(selectKey);
-	            
-	            for (String deviceId : values) {
-	                String deleteKeyOfDevice = YiTongGpsServerHandler.PREFIX_DEVICE + deviceId;
-	                String deleteKeyOfLink = YiTongGpsServerHandler.PREFIX_LINK + addressStr;
-	                String connectMsgStr = jedis.get(deleteKeyOfDevice);
-	                if (connectMsgStr != null) {
-	                    ConnectMsg connectMsg = StringUtil.convert2Object(connectMsgStr, ConnectMsg.class);
-	                    String socketId = connectMsg.getSocketId();
-	                    jedis.del(deleteKeyOfDevice);
-	                    jedis.srem(deleteKeyOfLink, socketId);
-	                }else {
-	                	logger.error("error deviceId [{}] in select [{}] key [{}]", deviceId, 15, deleteKeyOfDevice);
-	                }
-	            }
-	            jedis.del(selectKey);
-	        } catch (Exception e) {
-	        	logger.error(e.getLocalizedMessage());
-	        }
-	    }
-	
+
+	protected void cleanRedisLinkData() {
+
+		try (Jedis jedis = jedisPool.getResource()) {
+			jedis.select(YiTongGpsServerHandler.REDIS_INDEX_LINK);
+			String addressStr = ConnectMsg.ipToLong(this.ip);
+			String selectKey = YiTongGpsServerHandler.PREFIX_LINK_BACK + addressStr;
+			Set<String> values = jedis.smembers(selectKey);
+
+			for (String deviceId : values) {
+				String deleteKeyOfDevice = YiTongGpsServerHandler.PREFIX_DEVICE + deviceId;
+				String deleteKeyOfLink = YiTongGpsServerHandler.PREFIX_LINK + addressStr;
+				String connectMsgStr = jedis.get(deleteKeyOfDevice);
+				if (connectMsgStr != null) {
+					ConnectMsg connectMsg = StringUtil.convert2Object(connectMsgStr, ConnectMsg.class);
+					String socketId = connectMsg.getSocketId();
+					jedis.del(deleteKeyOfDevice);
+					jedis.srem(deleteKeyOfLink, socketId);
+				} else {
+					logger.error("error deviceId [{}] in select [{}] key [{}]", deviceId, 15, deleteKeyOfDevice);
+				}
+			}
+			jedis.del(selectKey);
+		} catch (Exception e) {
+			logger.error(e.getLocalizedMessage());
+		}
+	}
+
 	public String getDataPath() {
 		return dataPath;
 	}

+ 42 - 0
src/main/java/com/tidecloud/dataacceptance/service/HexBinaryAcceptanceHandlerAdapter.java

@@ -0,0 +1,42 @@
+package com.tidecloud.dataacceptance.service;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboundHandlerAdapter {
+	private static final Logger logger = LoggerFactory.getLogger(HexBinaryAcceptanceHandlerAdapter.class);
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		ByteBuf in = (ByteBuf) msg;
+		printAcceptanceData(in,ctx);
+		
+		try {
+			handle(in,ctx.channel());
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		} finally {
+			in.release();
+			
+		}
+		
+		ctx.flush();
+	}
+
+	private void printAcceptanceData(ByteBuf dataByteBuf,ChannelHandlerContext ctx) {
+		ByteBuf dataByteBufCopy = dataByteBuf.copy();
+		byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
+		dataByteBufCopy.readBytes(dataByteArray);
+		String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray);
+		logger.info("设备: [{}] 传入数据为 : [{}]", channelDeviceMap.get(ctx.channel()), printHexBinary);
+		dataByteBufCopy.release();
+	}
+
+	abstract protected void handle(ByteBuf in, Channel channel) throws Exception;
+	
+}

+ 30 - 0
src/main/java/com/tidecloud/dataacceptance/service/StringAcceptanceHandlerAdapter.java

@@ -0,0 +1,30 @@
+package com.tidecloud.dataacceptance.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+public abstract class StringAcceptanceHandlerAdapter extends AcceptanceInboundHandlerAdapter {
+	private static final Logger logger = LoggerFactory.getLogger(AcceptanceInboundHandlerAdapter.class);
+	
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		ByteBuf byteBuf = (ByteBuf) msg;
+		String str = byteBufferToString(byteBuf.nioBuffer());
+		logger.info("接入数据:{}", str);
+		try {
+			handle(str,ctx.channel());
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		} finally {
+			byteBuf.release();
+		}
+		ctx.flush();
+	}
+
+	abstract protected void handle(String in, Channel channel) throws Exception;
+
+}

+ 120 - 266
src/main/java/com/tidecloud/dataacceptance/service/impl/BSJGpsServerHandler.java

@@ -1,30 +1,21 @@
 package com.tidecloud.dataacceptance.service.impl;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
-import javax.xml.bind.DatatypeConverter;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.codec.MsgDecoder;
 import com.tidecloud.dataacceptance.common.Constants;
 import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
-import com.tidecloud.dataacceptance.entity.ConnectMsg;
 import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg;
 import com.tidecloud.dataacceptance.entity.LocationSelfInfoUploadMsg;
 import com.tidecloud.dataacceptance.entity.PackageData;
 import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
-import com.tidecloud.dataacceptance.entity.Session;
-import com.tidecloud.dataacceptance.entity.SessionManager;
 import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg;
 import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg;
-import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
 
 import io.netty.bootstrap.ServerBootstrap;
@@ -33,7 +24,6 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -41,279 +31,148 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
 
 /**
  * @author cdk
  */
 @Component
 @ChannelHandler.Sharable
-public class BSJGpsServerHandler extends AcceptanceInboundHandlerAdapter {
+public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-    private static final Logger logger = LoggerFactory.getLogger(BSJGpsServerHandler.class);
-    public static String PREFIX_LINK = "s.";
-    public static String PREFIX_LINK_BACK = "s.b.";
-    public static String PREFIX_DEVICE = "d.";
-    
-    public static final Integer REDIS_INDEX_LINK = 15;
+	private static final Logger logger = LoggerFactory.getLogger(BSJGpsServerHandler.class);
 
+	private final MsgDecoder decoder = new MsgDecoder();
 
-    public static Map<String, Channel> socketyChannelMap = new HashMap<>();
-    public static Map<Channel, String> channelDeviceMap = new HashMap<>();
-    public static Map<String, String> commandCopy = new HashMap<>();
-    
-    private final SessionManager sessionManager;
-    private final MsgDecoder decoder;
+	private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
 
-    private TerminalMsgProcessService msgProcessService;
-    
-    /**
-     * 
-     * @Title:  YiTongGpsServerHandler   
-     * @Description: initialzation sessionManager and msgDecoder   
-     */
-    public BSJGpsServerHandler() {
-        this.sessionManager = SessionManager.getInstance();
-        this.decoder = new MsgDecoder();
-        this.msgProcessService = new TerminalMsgProcessService();
-    }
-    
-    @Autowired
-    private JedisPool jedisPool;
-    
-    @Value("${server.localaddress}")
-    private String address;
-    
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        ByteBuf dataByteBuf = (ByteBuf) msg;
-        // print acceptance data
-        printAcceptanceData(dataByteBuf);
-        try {
-            if (!dataByteBuf.isReadable()) {
-                return;
-            }
-            byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
-            dataByteBuf.readBytes(dataByteArray);
-            byte[] dataByteArrayDoEscape = JT808ProtocolUtils.
-                    doEscape4Receive(dataByteArray, 0, dataByteArray.length);
-            //let dataByteArray transfer 808DataEntity
-            PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
-            //link manage
-            packageData.setChannel(ctx.channel());
-            this.processPackageData(packageData);
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        }
-       ctx.flush();
-    }
-    
-    private void manageChannel(Channel channel, String deviceId) {
-        String socketkey = channel.id().asLongText();
-        Channel channelInMap = socketyChannelMap.get(socketkey);
-        String deviceIdInMap = channelDeviceMap.get(channel);
-        if (channelInMap != null && deviceIdInMap != null) {
-            logger.debug("device [{}] has link [{}]", deviceId, socketkey);
-            return;
-        }
-        socketyChannelMap.put(socketkey, channel);
-        channelDeviceMap.put(channel, deviceId);
-        String addressStr = ConnectMsg.ipToLong(address);
-        ConnectMsg cMsg = new ConnectMsg(address, socketkey);
-        Jedis jedis = jedisPool.getResource();
-        try {
-            jedis.select(REDIS_INDEX_LINK);
-            String insertKey = PREFIX_LINK + addressStr;
-            String selectKey = PREFIX_DEVICE + deviceId;
-            String insertBackupKey = PREFIX_LINK_BACK + addressStr;
-            jedis.sadd(insertKey, socketkey);
-            jedis.sadd(insertBackupKey, deviceId);
-            jedis.set(selectKey, StringUtil.convert2String(cMsg));
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            jedis.close();
-        }
-    }
-    
-    private void deleteLinkFromRedis(String deviceId) {
-        String deleteKey = PREFIX_DEVICE + deviceId;
-        try(Jedis jedis = jedisPool.getResource()) {
-            jedis.select(REDIS_INDEX_LINK);
-            String connectMsg = jedis.get(deleteKey);
-            if (connectMsg != null) {
-                ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
-                String socketId = cmsg.getSocketId();
-                socketyChannelMap.remove(socketId);
-                String socketQueryKey = PREFIX_LINK + address;
-                jedis.srem(socketQueryKey, socketId);
-                jedis.del(deleteKey);
-                logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
-            }
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        } 
-    }
+	@Override
+	public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
+	
+		try {
+			if (!dataByteBuf.isReadable()) {
+				return;
+			}
+			byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
+			dataByteBuf.readBytes(dataByteArray);
+			byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length);
+			// let dataByteArray transfer 808DataEntity
+			PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
+			// link manage
+			packageData.setChannel(channel);
+			this.processPackageData(packageData);
+		} catch (Exception e) {
+			logger.error(e.getLocalizedMessage());
+		}
+		
+	}
 
-    private void processPackageData(PackageData packageData) {
-        final MsgHeader header = packageData.getMsgHeader();
+	private void processPackageData(PackageData packageData) {
+		final MsgHeader header = packageData.getMsgHeader();
 
-        // 1. 终端心跳-消息体为空 ==> 平台通用应答
-        if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
-            logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            try {
-                this.msgProcessService.processTerminalHeartBeatMsg(packageData);
-                manageChannel(packageData.getChannel(), header.getTerminalPhone());
-                logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            } catch (Exception e) {
-                logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
-                        e.getMessage());
-            }
-        }
+		// 1. 终端心跳-消息体为空 ==> 平台通用应答
+		if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
+			logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				this.msgProcessService.processTerminalHeartBeatMsg(packageData);
+				manageChannel(packageData.getChannel(), header.getTerminalPhone());
+				logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+
+		// 5. 终端鉴权 ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
+			logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
+				this.msgProcessService.processAuthMsg(authenticationMsg);
+				logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 6. 终端注册 ==> 终端注册应答
+		else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
+			logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
+				this.msgProcessService.processRegisterMsg(msg);
+				logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
+			logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				this.msgProcessService.processTerminalLogoutMsg(packageData);
+				logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 3. 自定义位置信息汇报 ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) {
+			logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData);
+				this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg);
+				logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
+						header.getFlowId(), e.getMessage(), e);
+			}
+		}
+		// 4. 位置信息汇报 ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
+			logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
+				this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
+				logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 5. 车辆控制回复
+		else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) {
+			logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+		}
+		// 其他情况
+		else {
+			logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
+					packageData);
+		}
+	}
 
-        // 5. 终端鉴权 ==> 平台通用应答
-        else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
-            logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            try {
-                TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
-                this.msgProcessService.processAuthMsg(authenticationMsg);
-                logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            } catch (Exception e) {
-                logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
-                        e.getMessage());
-            }
-        }
-        // 6. 终端注册 ==> 终端注册应答
-        else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
-            logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            try {
-                TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
-                this.msgProcessService.processRegisterMsg(msg);
-                logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            } catch (Exception e) {
-                logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
-                        e.getMessage());
-            }
-        }
-        // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
-        else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
-            logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            try {
-                this.msgProcessService.processTerminalLogoutMsg(packageData);
-                logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            } catch (Exception e) {
-                logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
-                        e.getMessage());
-            }
-        }
-        // 3. 自定义位置信息汇报 ==> 平台通用应答
-        else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) {
-            logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            try {
-                LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData);
-                this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg);
-                logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            } catch (Exception e) {
-                logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
-                        e.getMessage(), e);
-            }
-        }
-        // 4. 位置信息汇报 ==> 平台通用应答
-        else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
-            logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            try {
-                LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
-                this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
-                logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-            } catch (Exception e) {
-                logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
-                        e.getMessage());
-            }
-        }
-        // 5. 车辆控制回复 
-        else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) {
-            logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
-        }
-        // 其他情况
-        else {
-            logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
-                    packageData);
-        }
-    }
+	
 
-    private void printAcceptanceData(ByteBuf dataByteBuf) {
-        ByteBuf dataByteBufCopy = dataByteBuf.copy();
-        byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
-        dataByteBufCopy.readBytes(dataByteArray);
-        String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray);
-        logger.info("acceptance original data [{}]", printHexBinary);
-    }
-    
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        cause.printStackTrace();
-        ctx.close();
-    }
-    
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        Channel channel = ctx.channel();
-        if (!channel.isActive()) {
-            String deviceId = channelDeviceMap.get(channel);
-            if (deviceId != null) {
-                channelDeviceMap.remove(channel);
-                deleteLinkFromRedis(deviceId);
-            }
-        }
-        super.channelInactive(ctx);
-        ctx.close();
-        final String sessionId = ctx.channel().id().asLongText();
-        Session session = sessionManager.findBySessionId(sessionId);
-        this.sessionManager.removeBySessionId(sessionId);
-        logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session));
-    }
-    
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        Session session = Session.buildSession(ctx.channel());
-        sessionManager.put(session.getId(), session);
-        logger.debug("client linking server session : [{}]", StringUtil.convert2String(session));
-    }
-    
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
-            IdleStateEvent event = (IdleStateEvent) evt;
-            if (event.state() == IdleState.READER_IDLE) {
-                Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
-                logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session));
-                ctx.close();
-            }
-        }
-    }
-    
 	public void startAcceptor() {
 		EventLoopGroup bossGroup = new NioEventLoopGroup();
 		EventLoopGroup workerGroup = new NioEventLoopGroup();
-		 byte[] splitBytes1 = new byte[]{0x7e};
-         byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
+		byte[] splitBytes1 = new byte[] { 0x7e };
+		byte[] splitBytes2 = new byte[] { 0x7e, 0x7e };
 		try {
 			ServerBootstrap b = new ServerBootstrap();
 			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
 					.childHandler(new ChannelInitializer<SocketChannel>() {
 						@Override
 						protected void initChannel(SocketChannel ch) throws Exception {
-                            ch.pipeline().addLast("idleStateHandler",
-                                    new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
-                            
-                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204, Unpooled.copiedBuffer(splitBytes1), 
-                                    Unpooled.copiedBuffer(splitBytes2)));
-                            ch.pipeline().addLast(this);
-                        }
+							ch.pipeline().addLast("idleStateHandler",
+									new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
+
+							ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204,
+									Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2)));
+							ch.pipeline().addLast(this);
+						}
 					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
 
 			ChannelFuture f = b.bind(this.getPort()).sync();
@@ -329,9 +188,4 @@ public class BSJGpsServerHandler extends AcceptanceInboundHandlerAdapter {
 		}
 	}
 
-	@Override
-	public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
 }

+ 4 - 4
src/main/java/com/tidecloud/dataacceptance/service/impl/BingShuiGpsServerHandler.java

@@ -9,7 +9,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
+import com.tidecloud.dataacceptance.service.StringAcceptanceHandlerAdapter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -25,11 +25,11 @@ import io.netty.util.concurrent.GenericFutureListener;
  */
 @Component
 @ChannelHandler.Sharable
-public class BingShuiGpsServerHandler extends AcceptanceInboundHandlerAdapter {
+public class BingShuiGpsServerHandler extends StringAcceptanceHandlerAdapter {
 
 	private static final Logger logger = LoggerFactory.getLogger(BingShuiGpsServerHandler.class);
 
-	public void reply(ChannelHandlerContext ctx, String oringalData) {
+	public void handle(String oringalData, Channel channel) {
 		Map<String, String> dataMap = new HashMap<>();
 
 		String[] dataArray = oringalData.split("\\s+");
@@ -52,7 +52,7 @@ public class BingShuiGpsServerHandler extends AcceptanceInboundHandlerAdapter {
 				logger.error(e.getMessage(), e);
 			}
 		}
-		Channel channel = ctx.channel();
+		
 		
 		receiveMsg(oringalData,phone,channel);
 

+ 8 - 12
src/main/java/com/tidecloud/dataacceptance/service/impl/VorgeaUR0401ServerHandler.java

@@ -13,18 +13,18 @@ import com.tidecloud.dataacceptance.rfid.ReaderDataPackageProcess;
 import com.tidecloud.dataacceptance.rfid.module.interaction.DataPackageParser;
 import com.tidecloud.dataacceptance.rfid.module.interaction.DataPackageProcess;
 import com.tidecloud.dataacceptance.rfid.rxobserver.RXObserver;
-import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
 
 /**
  * @author ryan
  */
 @Component
 @ChannelHandler.Sharable
-public class VorgeaUR0401ServerHandler extends AcceptanceInboundHandlerAdapter {
+public class VorgeaUR0401ServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
 	private static final Logger logger = LoggerFactory.getLogger(VorgeaUR0401ServerHandler.class);
 	
@@ -35,10 +35,11 @@ public class VorgeaUR0401ServerHandler extends AcceptanceInboundHandlerAdapter {
 	private RXObserver mObserver = new RXObserver() ;
 	DateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmss");
 
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+	
 
-		ByteBuf byteBuf = (ByteBuf) msg;
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		ByteBuf byteBuf = (ByteBuf) in;
 		int len = byteBuf.readableBytes();
 
 		byte[] btAryReceiveData = new byte[len];
@@ -55,13 +56,8 @@ public class VorgeaUR0401ServerHandler extends AcceptanceInboundHandlerAdapter {
 		String deviceId = mObserver.getDeviceId();
 		String storageStr = deviceId+";"+epc+";"+fmt.format(new Date());
 		dataStorage(storageStr);
-
+		
 	}
 
 
-	@Override
-	public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
 }

+ 4 - 26
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchServerHandler.java

@@ -9,14 +9,13 @@ import org.springframework.stereotype.Component;
 
 import com.tidecloud.dataacceptance.entity.Advice;
 import com.tidecloud.dataacceptance.entity.Device;
-import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
+import com.tidecloud.dataacceptance.service.StringAcceptanceHandlerAdapter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -25,18 +24,18 @@ import io.netty.util.concurrent.GenericFutureListener;
  */
 @Sharable
 @Component(WatchServerHandler.name)
-public class WatchServerHandler extends AcceptanceInboundHandlerAdapter {
+public class WatchServerHandler extends StringAcceptanceHandlerAdapter {
      
 	public static final String name = "WatchServerHandler";
     private static final Logger logger = LoggerFactory.getLogger(WatchServerHandler.class);
    
   
-    public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
+    public void handle(String msg,Channel channel) throws Exception {
         logger.info("设备上传数据:" + msg);
         Advice advice = getAdevice(msg);
         String deviceId = advice.getDeviceId();
         String adviceType = advice.getAdviceType();
-        Channel channel = ctx.channel();
+        
       
         switch (adviceType) {
         
@@ -158,26 +157,5 @@ public class WatchServerHandler extends AcceptanceInboundHandlerAdapter {
     }
 
 
-
-    /**
-     * 【Receive from 223.104.255.118 :61922】:[3G*3918197044*000D*LK,12642,0,93]
-     */
-    public static void main(String[] args) {
-//        for (int i = 0; i < 100; i++) {
-//            Device device = new Device();
-//            device.setDeviceId("3918197044");
-//            device.setElectric(11.2d);
-//            device.setItemState("125");
-//            device.setLat(24.4441);
-//            device.setLng(114.223);
-//            device.setSpeed(21.2);
-//            device.setStep(12);
-//            device.setTerminalState(12);
-//            device.setTimestamp(new Date());
-//            new WatchServerHandler().dataStorage(device);
-//        }
-    }
-	
-
    
 }

+ 230 - 340
src/main/java/com/tidecloud/dataacceptance/service/impl/YiTongGpsServerHandler.java

@@ -1,33 +1,19 @@
 package com.tidecloud.dataacceptance.service.impl;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.text.SimpleDateFormat;
+
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import javax.xml.bind.DatatypeConverter;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.common.CRCUtil;
-import com.tidecloud.dataacceptance.common.Constants;
 import com.tidecloud.dataacceptance.common.DateUtil;
 import com.tidecloud.dataacceptance.common.NumUtil;
-import com.tidecloud.dataacceptance.entity.ConnectMsg;
 import com.tidecloud.dataacceptance.entity.YiTongGPSDevice;
 import com.tidecloud.dataacceptance.entity.YiTongGpsForWarnDevice;
-import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
@@ -43,355 +29,259 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
 
 /**
  * @author cdk
  */
 @Component
 @ChannelHandler.Sharable
-public class YiTongGpsServerHandler extends AcceptanceInboundHandlerAdapter {
-
-    private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
-    public static String PREFIX_LINK = "s.";
-    public static String PREFIX_LINK_BACK = "s.b.";
-    public static String PREFIX_DEVICE = "d.";
-    private static Boolean ISINIT = true;
-    
-    private static final Integer START_BITS = 2;
-    private static final Byte START_BIT = 0x78;
-    private static final Byte START_BIT2 = 0X79;
+public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-    private static final Integer TEN_M = 10485760;
+	private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
 
-    private static final byte LOGIN_MSG = 0x01;
-    private static final byte LOCATION_MSG = 0x22;
-    private static final byte STATUS_MSG = 0x13;
-    private static final byte WARNING_MSG = 0x26;
-    private static final byte CORRECT_TIME_MSG = (byte)0x8A;
-    private static final byte VOLTAGE_MSG = (byte)0x94;
-    private static final byte VOLTAGE_SUB_MSG = (byte)0x00;
-    private static final byte COMMAND_COPY_MSG = 0x15;
-    private static final byte ASCII_CODE = 0x01;
-    private static final byte UTF16_BE_CODE = 0x02;
-    
-    private static final String DATA_PATH = "/home/service/collector_yitong/rawdata-car/";
-    private static final String PREFIX_NAME = "yitong";
-    private static final Integer DATA_SIZE = 6;
-    public static final Integer REDIS_INDEX_LINK = 15;
-    private static File WRITE_FILE = null;
+	private static final Integer START_BITS = 2;
+	private static final Byte START_BIT = 0x78;
+	private static final Byte START_BIT2 = 0X79;
 
+	private static final byte LOGIN_MSG = 0x01;
+	private static final byte LOCATION_MSG = 0x22;
+	private static final byte STATUS_MSG = 0x13;
+	private static final byte WARNING_MSG = 0x26;
+	private static final byte CORRECT_TIME_MSG = (byte) 0x8A;
+	private static final byte VOLTAGE_MSG = (byte) 0x94;
+	private static final byte VOLTAGE_SUB_MSG = (byte) 0x00;
+	private static final byte COMMAND_COPY_MSG = 0x15;
 
-    public static Map<String, Channel> socketyChannelMap = new HashMap<>();
-    public static Map<Channel, String> channelDeviceMap = new HashMap<>();
-    public static Map<String, String> commandCopy = new HashMap<>();
-    
-    @Autowired
-    private JedisPool jedisPool;
-    
-    @Value("${server.localaddress}")
-    private String address;
-    
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        ByteBuf in = (ByteBuf) msg;
-        ByteBuf copy = in.copy();
-        byte[] bytes = new byte[copy.readableBytes()];
-        copy.readBytes(bytes);
-        String printHexBinary = DatatypeConverter.printHexBinary(bytes);
-        logger.info("设备: [{}] 传入数据为 : [{}]",channelDeviceMap.get(ctx.channel()), printHexBinary);
-        try {
-            int index = 0;
-            byte b = 0;
-            while (index < START_BITS) {
-                b = in.readByte();
-                if (START_BIT != b && START_BIT2 != b) {
-                    ctx.close();
-                }
-                index++;
-            }
-            int length = 0;
-            if (START_BIT == b) {
-                length = in.readByte() & 0xff;
-            }else {
-                length = in.readShort() & 0xffff;
-            }
-            handle(in, length, ctx.channel());
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-       ctx.flush();
-    }
-    
-    private void handle(ByteBuf in, int length, Channel channel) throws Exception{
-        if (in.isReadable()) {
-            in = in.readBytes(length);
-            byte msgType = in.readByte();
-            if (LOGIN_MSG == msgType) {
-                resolveLoginMSG(in, channel);
-            } else if (LOCATION_MSG == msgType) {
-                String deviceId = channelDeviceMap.get(channel);
-                if (deviceId == null) {
-                    logger.info("链接管理失效。。。。。。。");
-                }else {
-                    resolveLocationMSG(in, deviceId);
-                }
-            } else if (STATUS_MSG == msgType) {
-                reply(channel, STATUS_MSG);
-            } else if (WARNING_MSG == msgType) {
-                String deviceId = channelDeviceMap.get(channel);
-                resolveWarningMsg(in, deviceId);
-                reply(channel, WARNING_MSG);
-            } else if (CORRECT_TIME_MSG == msgType) {
-                reply(channel, CORRECT_TIME_MSG);
-            } else if (VOLTAGE_MSG == msgType) {
-                resolveVoltageMSG(in, channel);
-            } else if (COMMAND_COPY_MSG == msgType) {
-                resolveCommandCopyMSG(in, channel, length);
-            } else {
-                logger.info("client send data without handle type ...");
-            }
-        }
+	private static final Integer DATA_SIZE = 6;
 
-    }
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		try {
+			int index = 0;
+			byte b = 0;
+			while (index < START_BITS) {
+				b = in.readByte();
+				if (START_BIT != b && START_BIT2 != b) {
+					channel.close();
+				}
+				index++;
+			}
+			int length = 0;
+			if (START_BIT == b) {
+				length = in.readByte() & 0xff;
+			} else {
+				length = in.readShort() & 0xffff;
+			}
+			handle(in, length, channel);
+		} catch (Exception e) {
+			logger.error(e.getMessage(), e);
+		}
+	}
 
-    private void resolveCommandCopyMSG(ByteBuf in, Channel channel, Integer length) {
-        byte readByte = in.readByte();    
-        int checkCode = in.readInt();
-        if (checkCode != 0) {
-            logger.error("illegal checkcode [{}]", checkCode);
-        } else {
-//            byte codingType = in.readByte();
-            byte[] coypByes = new byte[length -14];
-            in.readBytes(coypByes);
-            String copyStr = new String(coypByes);
-            logger.info("设备指令回复 [{}]", copyStr);
-        }
-    }
+	private void handle(ByteBuf in, int length, Channel channel) throws Exception {
+		if (in.isReadable()) {
+			in = in.readBytes(length);
+			byte msgType = in.readByte();
+			if (LOGIN_MSG == msgType) {
+				resolveLoginMSG(in, channel);
+			} else if (LOCATION_MSG == msgType) {
+				String deviceId = channelDeviceMap.get(channel);
+				if (deviceId == null) {
+					logger.info("链接管理失效。。。。。。。");
+				} else {
+					resolveLocationMSG(in, deviceId);
+				}
+			} else if (STATUS_MSG == msgType) {
+				reply(channel, STATUS_MSG);
+			} else if (WARNING_MSG == msgType) {
+				String deviceId = channelDeviceMap.get(channel);
+				resolveWarningMsg(in, deviceId);
+				reply(channel, WARNING_MSG);
+			} else if (CORRECT_TIME_MSG == msgType) {
+				reply(channel, CORRECT_TIME_MSG);
+			} else if (VOLTAGE_MSG == msgType) {
+				resolveVoltageMSG(in, channel);
+			} else if (COMMAND_COPY_MSG == msgType) {
+				resolveCommandCopyMSG(in, channel, length);
+			} else {
+				logger.info("client send data without handle type ...");
+			}
+		}
 
-    private void resolveWarningMsg(ByteBuf in, String deviceId) {
-        YiTongGpsForWarnDevice yiTongGPSDevice = new YiTongGpsForWarnDevice();
-        StringBuffer dateTimeStrBuf = new StringBuffer();
-        int indexOfDateTime = 0;
-        while(indexOfDateTime < DATA_SIZE){
-            byte b = in.readByte();
-            dateTimeStrBuf.append(NumUtil.byte2String(b));
-            indexOfDateTime ++;
-        }
-        //日期
-        yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
-        //gps信息卫星数
-        yiTongGPSDevice.setGpsCount(in.readByte());
-        //维度
-        yiTongGPSDevice.setLat(in.readInt());
-        //经度
-        yiTongGPSDevice.setLng(in.readInt());
-        //速度
-        yiTongGPSDevice.setSpeedbyte(in.readByte());
-        //航向
-        yiTongGPSDevice.setCourseStatus(in.readShort());
-        in.readByte();
-        //国家代号
-        yiTongGPSDevice.setMcc(in.readShort());
-        //移动网号码
-        yiTongGPSDevice.setMnc(in.readByte());
-        //位置区码
-        yiTongGPSDevice.setLac(in.readShort());
-        in.readMedium();
-        yiTongGPSDevice.setTerminalMsg((int)in.readByte());
-        yiTongGPSDevice.setElectric((int)in.readByte());
-        yiTongGPSDevice.setGmsSign((int)in.readByte());
-        yiTongGPSDevice.setWarningReason((int)in.readByte());
-        in.readByte();
-        yiTongGPSDevice.setDeviceId(deviceId);
-        //写文件操作
-        String deviceStr = yiTongGPSDevice.buildDeviceStr();
+	}
 
-        dataStorage(deviceStr);
-    }
+	private void resolveCommandCopyMSG(ByteBuf in, Channel channel, Integer length) {
+		byte readByte = in.readByte();
+		int checkCode = in.readInt();
+		if (checkCode != 0) {
+			logger.error("illegal checkcode [{}]", checkCode);
+		} else {
+			// byte codingType = in.readByte();
+			byte[] coypByes = new byte[length - 14];
+			in.readBytes(coypByes);
+			String copyStr = new String(coypByes);
+			logger.info("设备指令回复 [{}]", copyStr);
+		}
+	}
 
-    private void resolveVoltageMSG(ByteBuf in, Channel channel) {
-        String deviceId = channelDeviceMap.get(channel);
-        byte subMsgType = in.readByte();
-        if (VOLTAGE_SUB_MSG == subMsgType) {
-            short voltage = in.readShort();
-            Double voltageDouble = NumUtil.toFixed2Place((double)voltage);
-            String date = DateUtil.formatDate2String(DateUtil.calculateByHour(new Date(), -8));
-            YiTongGPSDevice yiTongGPSDevice = buildYiTongGpsDevcie(voltageDouble, deviceId, date);
-            String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
-            dataStorage(deviceStr);
-        }
-    }
+	private void resolveWarningMsg(ByteBuf in, String deviceId) {
+		YiTongGpsForWarnDevice yiTongGPSDevice = new YiTongGpsForWarnDevice();
+		StringBuffer dateTimeStrBuf = new StringBuffer();
+		int indexOfDateTime = 0;
+		while (indexOfDateTime < DATA_SIZE) {
+			byte b = in.readByte();
+			dateTimeStrBuf.append(NumUtil.byte2String(b));
+			indexOfDateTime++;
+		}
+		// 日期
+		yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
+		// gps信息卫星数
+		yiTongGPSDevice.setGpsCount(in.readByte());
+		// 维度
+		yiTongGPSDevice.setLat(in.readInt());
+		// 经度
+		yiTongGPSDevice.setLng(in.readInt());
+		// 速度
+		yiTongGPSDevice.setSpeedbyte(in.readByte());
+		// 航向
+		yiTongGPSDevice.setCourseStatus(in.readShort());
+		in.readByte();
+		// 国家代号
+		yiTongGPSDevice.setMcc(in.readShort());
+		// 移动网号码
+		yiTongGPSDevice.setMnc(in.readByte());
+		// 位置区码
+		yiTongGPSDevice.setLac(in.readShort());
+		in.readMedium();
+		yiTongGPSDevice.setTerminalMsg((int) in.readByte());
+		yiTongGPSDevice.setElectric((int) in.readByte());
+		yiTongGPSDevice.setGmsSign((int) in.readByte());
+		yiTongGPSDevice.setWarningReason((int) in.readByte());
+		in.readByte();
+		yiTongGPSDevice.setDeviceId(deviceId);
+		// 写文件操作
+		String deviceStr = yiTongGPSDevice.buildDeviceStr();
 
-    private YiTongGPSDevice buildYiTongGpsDevcie(Double voltageDouble, String deviceId, String date) {
-        return new YiTongGPSDevice(deviceId, date, null, null, null, null, null, null, null, null, null, null, null, null, null, null, voltageDouble, 0);
-    }
+		dataStorage(deviceStr);
+	}
 
-    private void resolveLoginMSG(ByteBuf in, Channel channel) {
-        byte[] deviceIdBytes = new byte[8];
-        in.readBytes(deviceIdBytes);
-        String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
-        // 回复和链接管理
-        String deviceIdInMap = channelDeviceMap.get(channel);
-        if (!deviceId.equals(deviceIdInMap)) {
-            manageChannel(channel, deviceId);
-        }
-        reply(channel, LOGIN_MSG);
-    }
+	private void resolveVoltageMSG(ByteBuf in, Channel channel) {
+		String deviceId = channelDeviceMap.get(channel);
+		byte subMsgType = in.readByte();
+		if (VOLTAGE_SUB_MSG == subMsgType) {
+			short voltage = in.readShort();
+			Double voltageDouble = NumUtil.toFixed2Place((double) voltage);
+			String date = DateUtil.formatDate2String(DateUtil.calculateByHour(new Date(), -8));
+			YiTongGPSDevice yiTongGPSDevice = buildYiTongGpsDevcie(voltageDouble, deviceId, date);
+			String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
+			dataStorage(deviceStr);
+		}
+	}
 
-    private void manageChannel(Channel channel, String deviceId) {
-        String socketkey = UUID.randomUUID().toString();
-        logger.info("device [{}] socketId is [{}]", deviceId, socketkey);
-        socketyChannelMap.put(socketkey, channel);
-        channelDeviceMap.put(channel, deviceId);
-        String addressStr = ConnectMsg.ipToLong(address);
-        ConnectMsg cMsg = new ConnectMsg(address, socketkey);
-        try (Jedis jedis = jedisPool.getResource()) {
-            jedis.select(REDIS_INDEX_LINK);
-            String insertKey = PREFIX_LINK + addressStr;
-            String selectKey = PREFIX_DEVICE + deviceId;
-            String insertBackupKey = PREFIX_LINK_BACK + addressStr;
-            jedis.sadd(insertKey, socketkey);
-            jedis.sadd(insertBackupKey, deviceId);
-            jedis.set(selectKey, StringUtil.convert2String(cMsg));
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
+	private YiTongGPSDevice buildYiTongGpsDevcie(Double voltageDouble, String deviceId, String date) {
+		return new YiTongGPSDevice(deviceId, date, null, null, null, null, null, null, null, null, null, null, null,
+				null, null, null, voltageDouble, 0);
+	}
 
-    private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception{
-        YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
-        StringBuffer dateTimeStrBuf = new StringBuffer();
-        int indexOfDateTime = 0;
-        while(indexOfDateTime < DATA_SIZE){
-            byte b = in.readByte();
-            dateTimeStrBuf.append(NumUtil.byte2String(b));
-            indexOfDateTime ++;
-        }
-        //日期
-        yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
-        //gps信息卫星数
-        yiTongGPSDevice.setGpsCount(in.readByte());
-        //维度
-        yiTongGPSDevice.setLat(in.readInt());
-        //经度
-        yiTongGPSDevice.setLng(in.readInt());
-        //速度
-        yiTongGPSDevice.setSpeedbyte(in.readByte());
-        //航向
-        yiTongGPSDevice.setCourseStatus(in.readShort());
-        //国家代号
-        yiTongGPSDevice.setMcc(in.readShort());
-        //移动网号码
-        yiTongGPSDevice.setMnc(in.readByte());
-        //位置区码
-        yiTongGPSDevice.setLac(in.readShort());
-        //移动基站Cell Tower ID
-        yiTongGPSDevice.setCellId(in.readMedium());
-        yiTongGPSDevice.setAcc(in.readByte());
-        //数据上报模式  0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
-        yiTongGPSDevice.setReportModel(in.readByte());
-        //0x01:实时   0x00:补传
-        yiTongGPSDevice.setIsmendMsg(in.readByte());
-        double mileage = NumUtil.toFixed2Place((double)in.readInt() / 1000);
-        //里程设备默认是关闭的,需要指令,设备端才发送
-        yiTongGPSDevice.setDeviceId(deviceId);
-        yiTongGPSDevice.setMileage(mileage);
-        yiTongGPSDevice.setDataType(1);
-        //写文件操作
-        String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
-        dataStorage(deviceStr);
-    }
+	private void resolveLoginMSG(ByteBuf in, Channel channel) {
+		byte[] deviceIdBytes = new byte[8];
+		in.readBytes(deviceIdBytes);
+		String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
+		// 回复和链接管理
+		String deviceIdInMap = channelDeviceMap.get(channel);
+		if (!deviceId.equals(deviceIdInMap)) {
+			manageChannel(channel, deviceId);
+		}
+		reply(channel, LOGIN_MSG);
+	}
 
-    private void reply(Channel channel, byte msgType) {
-        ByteBuf buffer = Unpooled.buffer();
-        byte[] crcBytes = new byte[]{0x05, msgType, 0x00, 0x05};
-        int doCrc = CRCUtil.do_crc(65535, crcBytes);
-        byte[] intToByte = NumUtil.intToByte(doCrc, 2);
-        byte[] bytes = new byte[]{0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A};
-        buffer.writeBytes(bytes);
-        ChannelFuture channelFuture = channel.write(buffer);
-        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
-            @Override
-            public void operationComplete(Future<? super Void> future) throws Exception {
-                logger.info("server reply [{}] to client success", DatatypeConverter.printHexBinary(bytes));
-            }
-        });
-    }
+	private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
+		YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
+		StringBuffer dateTimeStrBuf = new StringBuffer();
+		int indexOfDateTime = 0;
+		while (indexOfDateTime < DATA_SIZE) {
+			byte b = in.readByte();
+			dateTimeStrBuf.append(NumUtil.byte2String(b));
+			indexOfDateTime++;
+		}
+		// 日期
+		yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
+		// gps信息卫星数
+		yiTongGPSDevice.setGpsCount(in.readByte());
+		// 维度
+		yiTongGPSDevice.setLat(in.readInt());
+		// 经度
+		yiTongGPSDevice.setLng(in.readInt());
+		// 速度
+		yiTongGPSDevice.setSpeedbyte(in.readByte());
+		// 航向
+		yiTongGPSDevice.setCourseStatus(in.readShort());
+		// 国家代号
+		yiTongGPSDevice.setMcc(in.readShort());
+		// 移动网号码
+		yiTongGPSDevice.setMnc(in.readByte());
+		// 位置区码
+		yiTongGPSDevice.setLac(in.readShort());
+		// 移动基站Cell Tower ID
+		yiTongGPSDevice.setCellId(in.readMedium());
+		yiTongGPSDevice.setAcc(in.readByte());
+		// 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
+		yiTongGPSDevice.setReportModel(in.readByte());
+		// 0x01:实时 0x00:补传
+		yiTongGPSDevice.setIsmendMsg(in.readByte());
+		double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
+		// 里程设备默认是关闭的,需要指令,设备端才发送
+		yiTongGPSDevice.setDeviceId(deviceId);
+		yiTongGPSDevice.setMileage(mileage);
+		yiTongGPSDevice.setDataType(1);
+		// 写文件操作
+		String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
+		dataStorage(deviceStr);
+	}
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        cause.printStackTrace();
-        ctx.close();
-    }
-    
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        Channel channel = ctx.channel();
-        if (!channel.isActive()) {
-            String deviceId = channelDeviceMap.get(channel);
-            if (deviceId != null) {
-                channelDeviceMap.remove(channel);
-                deleteLinkFromRedis(deviceId);
-            }
-        }
-        super.channelInactive(ctx);
-        ctx.close();
-    }
-    
-    private void deleteLinkFromRedis(String deviceId) {
-        String deleteKey = PREFIX_DEVICE + deviceId;
-        try(Jedis jedis = jedisPool.getResource()) {
-            jedis.select(REDIS_INDEX_LINK);
-            String connectMsg = jedis.get(deleteKey);
-            if (connectMsg != null) {
-                ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
-                String socketId = cmsg.getSocketId();
-                socketyChannelMap.remove(socketId);
-                String socketQueryKey = PREFIX_LINK + address;
-                jedis.srem(socketQueryKey, socketId);
-                jedis.del(deleteKey);
-                logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
-            }
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        } 
-    }
+	private void reply(Channel channel, byte msgType) {
+		ByteBuf buffer = Unpooled.buffer();
+		byte[] crcBytes = new byte[] { 0x05, msgType, 0x00, 0x05 };
+		int doCrc = CRCUtil.do_crc(65535, crcBytes);
+		byte[] intToByte = NumUtil.intToByte(doCrc, 2);
+		byte[] bytes = new byte[] { 0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A };
+		buffer.writeBytes(bytes);
+		ChannelFuture channelFuture = channel.write(buffer);
+		channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
+			@Override
+			public void operationComplete(Future<? super Void> future) throws Exception {
+				logger.info("server reply [{}] to client success", DatatypeConverter.printHexBinary(bytes));
+			}
+		});
+	}
 
-    
 	public void startAcceptor() {
-		 EventLoopGroup bossGroup = new NioEventLoopGroup();
-         EventLoopGroup workerGroup = new NioEventLoopGroup();
-         byte[] splitBytes = new byte[]{0x0D, 0x0A};
-         try {
-             ServerBootstrap b = new ServerBootstrap();
-             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                     .childHandler(new ChannelInitializer<SocketChannel>() {
-                         @Override
-                         protected void initChannel(SocketChannel ch) throws Exception {
-                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, Unpooled.copiedBuffer(splitBytes)));
-                             ch.pipeline().addLast(this);
-                         }
-                     }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
-             ChannelFuture f = b.bind(port).sync();
-             f.channel().closeFuture().sync();
-             
-         } catch (InterruptedException e) {
-             e.printStackTrace();
-         } finally {
-             cleanRedisLinkData();
-             workerGroup.shutdownGracefully();
-             bossGroup.shutdownGracefully();
-         }
-	}
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		byte[] splitBytes = new byte[] { 0x0D, 0x0A };
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							ch.pipeline()
+									.addLast(new DelimiterBasedFrameDecoder(65535, Unpooled.copiedBuffer(splitBytes)));
+							ch.pipeline().addLast(this);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+			ChannelFuture f = b.bind(port).sync();
+			f.channel().closeFuture().sync();
 
-	@Override
-	public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
-		// TODO Auto-generated method stub
-		
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
 	}
+
 }

+ 146 - 255
src/main/java/com/tidecloud/dataacceptance/service/impl/YuGuangGpsServerHandler.java

@@ -1,23 +1,14 @@
 package com.tidecloud.dataacceptance.service.impl;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 
 import javax.xml.bind.DatatypeConverter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.common.NumUtil;
-import com.tidecloud.dataacceptance.entity.ConnectMsg;
-import com.tidecloud.dataacceptance.entity.YiTongGPSDevice;
 import com.tidecloud.dataacceptance.entity.YuGuangGPSDevice;
-import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
@@ -33,266 +24,166 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
 
 /**
  * @author cdk
  */
 @Component
 @ChannelHandler.Sharable
-public class YuGuangGpsServerHandler extends AcceptanceInboundHandlerAdapter {
+public class YuGuangGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-    private static final Logger logger = LoggerFactory.getLogger(YuGuangGpsServerHandler.class);
-    public static String PREFIX_LINK = "s.";
-    public static String PREFIX_LINK_BACK = "s.b.";
-    public static String PREFIX_DEVICE = "d.";
-    private static Boolean ISINIT = true;
-    
-    private static final Integer START_BITS = 2;
-    private static final Byte START_BIT = (byte)0xA3;
+	private static final Logger logger = LoggerFactory.getLogger(YuGuangGpsServerHandler.class);
 
-    private static final Integer TEN_M = 10485760;
+	private static final byte LOCATION_MSG = (byte) 0xA3;
+	private static final byte OIL_MSG = (byte) 0x18;
+	private static final byte STATUS_MSG = (byte) 0xb1;
 
-    private static final byte LOGIN_MSG = 0x01;
-    private static final byte LOCATION_MSG = (byte)0xA3;
-    private static final byte OIL_MSG = (byte)0x18;
-    private static final byte STATUS_MSG = (byte)0xb1;
-
-    
-    private static final String DATA_PATH = "/home/service/collector_7510/rawdata/";
-    private static final String PREFIX_NAME = "yuguang";
-    public static final Integer REDIS_INDEX_LINK = 15;
-    private static File WRITE_FILE = null;
-
-
-    public static Map<String, Channel> socketyChannelMap = new HashMap<>();
-    public static Map<Channel, String> channelDeviceMap = new HashMap<>();
-    public static Map<String, String> commandCopy = new HashMap<>();
-    
-    @Autowired
-    private JedisPool jedisPool;
-    
-    @Value("${server.localaddress}")
-    private String address;
-    
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        ByteBuf in = (ByteBuf) msg;
-        ByteBuf copy = in.copy();
-        byte[] bytes = new byte[copy.readableBytes()];
-        copy.readBytes(bytes);
-        String printHexBinary = DatatypeConverter.printHexBinary(bytes);
-        logger.info("传入数据为:" + printHexBinary);
-        try {
-            byte type = in.readByte();
-            short length = in.readShort();
-            handle(in, length, ctx.channel(), type);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-       ctx.flush();
-    }
-    
-    private void handle(ByteBuf in, int length, Channel channel, Byte type) throws Exception{
-        if (in.isReadable()) {
-            in = in.readBytes(length);
-            if (LOCATION_MSG == type) {
-                resolveLoginMSG(in, channel);
-            } else if (STATUS_MSG == type) {
-                reply(channel);
-            } else {
-                logger.info("client send data without handle type ...");
-            }
-        }
-    }
-
-    private void resolveLoginMSG(ByteBuf in, Channel channel) {
-        byte[] deviceIdBytes = new byte[4];
-        in.readBytes(deviceIdBytes);
-
-        byte fistDeviceIdByte = deviceIdBytes[0];
-        byte secondDeviceIdByte = (byte) (deviceIdBytes[1] - 0x80);
-        byte thirdDeviceIdByte = (byte) (deviceIdBytes[2] - 0x80);
-        byte lastByte = deviceIdBytes[3];
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append(fistDeviceIdByte).append(secondDeviceIdByte)
-        .append(thirdDeviceIdByte).append(lastByte);
-        
-        String deviceId = stringBuilder.toString();
-        logger.info("deviceId: [{}]", deviceId);
-        
-        byte[] dateBytes = new byte[6];
-        in.readBytes(dateBytes);
-        String date = DatatypeConverter.printHexBinary(dateBytes);
-        logger.info("date: [{}]", date);
-        
-        byte[] latBytes = new byte[4];
-        in.readBytes(latBytes);
-        String lat = DatatypeConverter.printHexBinary(latBytes);
-        logger.info("lat: [{}]", lat);
-        
-        byte[] lngBytes = new byte[4];
-        in.readBytes(lngBytes);
-        String lng = DatatypeConverter.printHexBinary(lngBytes);
-        logger.info("lng: [{}]", lng);
-        
-        byte[] speedBytes = new byte[2];
-        in.readBytes(speedBytes);
-        String speed = DatatypeConverter.printHexBinary(speedBytes);
-        logger.info("speed: [{}]", speed);
-     
-        byte[] directionBytes = new byte[2];
-        in.readBytes(directionBytes);
-        String direction = DatatypeConverter.printHexBinary(directionBytes);
-        logger.info("direction: [{}]", direction);
-        
-        byte state = in.readByte();
-        logger.info("state: [{}]", state);
-        
-        byte[] mileageBytes = new byte[3];
-        in.readBytes(mileageBytes);
-        int mileage = NumUtil.threeBytesToInteger(mileageBytes);
-        logger.info("mileage: [{}]", mileage);
-        
-        byte[] Bytes = new byte[12];
-        in.readBytes(Bytes);
-
-        
-        YuGuangGPSDevice yiTongGPSDevice = new YuGuangGPSDevice();
-        if (in.isReadable(2)) {
-            int oilLength = in.readShort();
-            ByteBuf oilByteBuf = in.readBytes(oilLength);
-            byte sonType = oilByteBuf.readByte();
-            
-            if (OIL_MSG == sonType) {
-                byte length = oilByteBuf.readByte();
-                int oilAD1 = oilByteBuf.readShort();
-                int oilAD2 = oilByteBuf.readShort();
-                int oilAD3 = oilByteBuf.readShort();
-                int oilAD4 = oilByteBuf.readShort();
-                yiTongGPSDevice.setOil(oilAD4);
-            }
-        }
-        yiTongGPSDevice.setDate(date);
-        yiTongGPSDevice.setDeviceId(deviceId);
-        yiTongGPSDevice.setLat(lat);
-        yiTongGPSDevice.setLng(lng);
-        yiTongGPSDevice.setMileage(String.valueOf(mileage));
-        yiTongGPSDevice.setSpeed(speed);
-        dataStorage(YuGuangGPSDevice.buildDeviceStr(yiTongGPSDevice));
-        
-        // 回复和链接管理
-        String deviceIdInMap = channelDeviceMap.get(channel);
-        if (!deviceId.equals(deviceIdInMap)) {
-            manageChannel(channel, deviceId);
-        }
-        reply(channel);
-    }
-    
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		ByteBuf in = (ByteBuf) msg;
+		ByteBuf copy = in.copy();
+		byte[] bytes = new byte[copy.readableBytes()];
+		copy.readBytes(bytes);
+		String printHexBinary = DatatypeConverter.printHexBinary(bytes);
+		logger.info("传入数据为:" + printHexBinary);
+		handle(in, ctx.channel());
+		ctx.flush();
+	}
 
-    private void manageChannel(Channel channel, String deviceId) {
-        String socketkey = UUID.randomUUID().toString();
-        socketyChannelMap.put(socketkey, channel);
-        channelDeviceMap.put(channel, deviceId);
-        String addressStr = ConnectMsg.ipToLong(address);
-        ConnectMsg cMsg = new ConnectMsg(address, socketkey);
-        Jedis jedis = jedisPool.getResource();
-        try {
-            jedis.select(REDIS_INDEX_LINK);
-            String insertKey = PREFIX_LINK + addressStr;
-            String selectKey = PREFIX_DEVICE + deviceId;
-            String insertBackupKey = PREFIX_LINK_BACK + addressStr;
-            jedis.sadd(insertKey, socketkey);
-            jedis.sadd(insertBackupKey, deviceId);
-            jedis.set(selectKey, StringUtil.convert2String(cMsg));
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            jedis.close();
-        }
-    }
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		try {
+			byte type = in.readByte();
+			short length = in.readShort();
+			if (in.isReadable()) {
+				in = in.readBytes(length);
+				if (LOCATION_MSG == type) {
+					resolveLoginMSG(in, channel);
+				} else if (STATUS_MSG == type) {
+					reply(channel);
+				} else {
+					logger.info("client send data without handle type ...");
+				}
+			}
+		} catch (Exception e) {
+			logger.error(e.getMessage(), e);
+		}
 
+	}
 
-    private void reply(Channel channel) {
-        logger.info("server reply to client ...");
-        ByteBuf buffer = Unpooled.buffer();
-        byte[] bytes = new byte[]{0x29, 0x29, 0x21, 0x00, 0x05, (byte)0x82, (byte)0xb1, 0x0C, 0x1b, 0x0d};
-        buffer.writeBytes(bytes);
-        channel.write(buffer);
-    }
+	private void resolveLoginMSG(ByteBuf in, Channel channel) {
+		byte[] deviceIdBytes = new byte[4];
+		in.readBytes(deviceIdBytes);
+
+		byte fistDeviceIdByte = deviceIdBytes[0];
+		byte secondDeviceIdByte = (byte) (deviceIdBytes[1] - 0x80);
+		byte thirdDeviceIdByte = (byte) (deviceIdBytes[2] - 0x80);
+		byte lastByte = deviceIdBytes[3];
+		StringBuilder stringBuilder = new StringBuilder();
+		stringBuilder.append(fistDeviceIdByte).append(secondDeviceIdByte).append(thirdDeviceIdByte).append(lastByte);
+
+		String deviceId = stringBuilder.toString();
+		logger.info("deviceId: [{}]", deviceId);
+
+		byte[] dateBytes = new byte[6];
+		in.readBytes(dateBytes);
+		String date = DatatypeConverter.printHexBinary(dateBytes);
+		logger.info("date: [{}]", date);
+
+		byte[] latBytes = new byte[4];
+		in.readBytes(latBytes);
+		String lat = DatatypeConverter.printHexBinary(latBytes);
+		logger.info("lat: [{}]", lat);
+
+		byte[] lngBytes = new byte[4];
+		in.readBytes(lngBytes);
+		String lng = DatatypeConverter.printHexBinary(lngBytes);
+		logger.info("lng: [{}]", lng);
+
+		byte[] speedBytes = new byte[2];
+		in.readBytes(speedBytes);
+		String speed = DatatypeConverter.printHexBinary(speedBytes);
+		logger.info("speed: [{}]", speed);
+
+		byte[] directionBytes = new byte[2];
+		in.readBytes(directionBytes);
+		String direction = DatatypeConverter.printHexBinary(directionBytes);
+		logger.info("direction: [{}]", direction);
+
+		byte state = in.readByte();
+		logger.info("state: [{}]", state);
+
+		byte[] mileageBytes = new byte[3];
+		in.readBytes(mileageBytes);
+		int mileage = NumUtil.threeBytesToInteger(mileageBytes);
+		logger.info("mileage: [{}]", mileage);
+
+		byte[] Bytes = new byte[12];
+		in.readBytes(Bytes);
+
+		YuGuangGPSDevice yiTongGPSDevice = new YuGuangGPSDevice();
+		if (in.isReadable(2)) {
+			int oilLength = in.readShort();
+			ByteBuf oilByteBuf = in.readBytes(oilLength);
+			byte sonType = oilByteBuf.readByte();
+
+			if (OIL_MSG == sonType) {
+				byte length = oilByteBuf.readByte();
+				int oilAD1 = oilByteBuf.readShort();
+				int oilAD2 = oilByteBuf.readShort();
+				int oilAD3 = oilByteBuf.readShort();
+				int oilAD4 = oilByteBuf.readShort();
+				yiTongGPSDevice.setOil(oilAD4);
+			}
+		}
+		yiTongGPSDevice.setDate(date);
+		yiTongGPSDevice.setDeviceId(deviceId);
+		yiTongGPSDevice.setLat(lat);
+		yiTongGPSDevice.setLng(lng);
+		yiTongGPSDevice.setMileage(String.valueOf(mileage));
+		yiTongGPSDevice.setSpeed(speed);
+		dataStorage(YuGuangGPSDevice.buildDeviceStr(yiTongGPSDevice));
+
+		// 回复和链接管理
+		String deviceIdInMap = channelDeviceMap.get(channel);
+		if (!deviceId.equals(deviceIdInMap)) {
+			manageChannel(channel, deviceId);
+		}
+		reply(channel);
+	}
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        cause.printStackTrace();
-        ctx.close();
-    }
-    
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        Channel channel = ctx.channel();
-        if (!channel.isActive()) {
-            String deviceId = channelDeviceMap.get(channel);
-            if (deviceId != null) {
-                channelDeviceMap.remove(channel);
-                deleteLinkFromRedis(deviceId);
-            }
-        }
-        super.channelInactive(ctx);
-        ctx.close();
-    }
-    
-    private void deleteLinkFromRedis(String deviceId) {
-        String deleteKey = PREFIX_DEVICE + deviceId;
-        try(Jedis jedis = jedisPool.getResource()) {
-            jedis.select(REDIS_INDEX_LINK);
-            String connectMsg = jedis.get(deleteKey);
-            if (connectMsg != null) {
-                ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
-                String socketId = cmsg.getSocketId();
-                socketyChannelMap.remove(socketId);
-                String socketQueryKey = PREFIX_LINK + address;
-                jedis.srem(socketQueryKey, socketId);
-                jedis.del(deleteKey);
-                logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
-            }
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        } 
-    }
+	private void reply(Channel channel) {
+		logger.info("server reply to client ...");
+		ByteBuf buffer = Unpooled.buffer();
+		byte[] bytes = new byte[] { 0x29, 0x29, 0x21, 0x00, 0x05, (byte) 0x82, (byte) 0xb1, 0x0C, 0x1b, 0x0d };
+		buffer.writeBytes(bytes);
+		channel.write(buffer);
+	}
 
-  
-  
-    
 	public void startAcceptor() {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        byte[] splitBytes = new byte[]{0x29, 0x29};
-        try {
-            ServerBootstrap b = new ServerBootstrap();
-            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        protected void initChannel(SocketChannel ch) throws Exception {
-                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, Unpooled.copiedBuffer(splitBytes)));
-                            ch.pipeline().addLast(this);
-                        }
-                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
-            ChannelFuture f = b.bind(port).sync();
-            f.channel().closeFuture().sync();
-            
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        } finally {
-            cleanRedisLinkData();
-            workerGroup.shutdownGracefully();
-            bossGroup.shutdownGracefully();
-        }
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		byte[] splitBytes = new byte[] { 0x29, 0x29 };
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							ch.pipeline()
+									.addLast(new DelimiterBasedFrameDecoder(65535, Unpooled.copiedBuffer(splitBytes)));
+							ch.pipeline().addLast(this);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+			ChannelFuture f = b.bind(port).sync();
+			f.channel().closeFuture().sync();
+
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
 	}
 
-	@Override
-	public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
 }