rainbow954 7 years ago
parent
commit
50f3a2322a

+ 5 - 0
pom.xml

@@ -77,6 +77,11 @@
             <artifactId>spring-kafka</artifactId>
             <version>1.2.0.RELEASE</version>
         </dependency>
+         <dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>fastjson</artifactId>
+			<version>1.2.8</version>
+		</dependency>
 	</dependencies>
 
 	<build>

+ 28 - 54
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceDeviceService.java

@@ -88,7 +88,7 @@ public class AcceptanceDeviceService implements ApplicationContextAware {
 						handler.setIp(ip);
 						handler.setPort(config.getPort());
 						handler.setPrefixName(config.getName());
-						start(handler);
+						handler.startAcceptor();
 					} catch (ClassNotFoundException e) {
 						logger.error(e.getMessage());
 					} catch (Exception ex) {
@@ -101,60 +101,9 @@ public class AcceptanceDeviceService implements ApplicationContextAware {
 
 	}
 
-	public static String getIp(String eth0) {
-		String hostAddress = "127.0.0.1";
-		try {
-			Enumeration<NetworkInterface> interfaces = null;
-			interfaces = NetworkInterface.getNetworkInterfaces();
-			while (interfaces.hasMoreElements()) {
-				NetworkInterface ni = interfaces.nextElement();
-				if (eth0.equalsIgnoreCase(ni.getName())) {
-					Enumeration<InetAddress> addresss = ni.getInetAddresses();
-					while (addresss.hasMoreElements()) {
-						InetAddress ip = addresss.nextElement();
-						if (ip != null && ip instanceof Inet4Address) {
-							if (ip.getHostAddress().equals("127.0.0.1")) {
-								continue;
-							}
-							return hostAddress = ip.getHostAddress();
-						}
-					}
-				}
-			}
-		} catch (Exception e) {
-			logger.error(e.getMessage());
-		}
-		return hostAddress;
-	}
-
-	public void start(AcceptanceInboundHandlerAdapter handler) {
-		cleanRedisLinkData(handler.getIp());
-		EventLoopGroup bossGroup = new NioEventLoopGroup();
-		EventLoopGroup workerGroup = new NioEventLoopGroup();
-		try {
-			ServerBootstrap b = new ServerBootstrap();
-			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-					.childHandler(new ChannelInitializer<SocketChannel>() {
-						@Override
-						protected void initChannel(SocketChannel ch) throws Exception {
-							ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES));
-							ch.pipeline().addLast(handler);
-						}
-					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
-
-			ChannelFuture f = b.bind(handler.getPort()).sync();
-			logger.info("start accept service for {}, bind address {}:{}", handler.getPrefixName(), handler.getIp(),
-					handler.getPort());
-			f.channel().closeFuture().sync();
-		} catch (Exception e) {
-			logger.error(e.getMessage());
-		} finally {
-			cleanRedisLinkData(handler.getIp());
-			workerGroup.shutdownGracefully();
-			bossGroup.shutdownGracefully();
-		}
+	
 
-	}
+	
 
 	private void cleanRedisLinkData(String address) {
 
@@ -183,6 +132,31 @@ public class AcceptanceDeviceService implements ApplicationContextAware {
 		}
 	}
 
+	public static String getIp(String eth0) {
+		String hostAddress = "127.0.0.1";
+		try {
+			Enumeration<NetworkInterface> interfaces = null;
+			interfaces = NetworkInterface.getNetworkInterfaces();
+			while (interfaces.hasMoreElements()) {
+				NetworkInterface ni = interfaces.nextElement();
+				if (eth0.equalsIgnoreCase(ni.getName())) {
+					Enumeration<InetAddress> addresss = ni.getInetAddresses();
+					while (addresss.hasMoreElements()) {
+						InetAddress ip = addresss.nextElement();
+						if (ip != null && ip instanceof Inet4Address) {
+							if (ip.getHostAddress().equals("127.0.0.1")) {
+								continue;
+							}
+							return hostAddress = ip.getHostAddress();
+						}
+					}
+				}
+			}
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		}
+		return hostAddress;
+	}
 	@Override
 	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
 		this.context = applicationContext;

+ 37 - 0
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceInboundHandlerAdapter.java

@@ -17,6 +17,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -32,10 +33,19 @@ import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.common.NumUtil;
 import com.tidecloud.dataacceptance.entity.ConnectMsg;
 
+import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 
@@ -94,6 +104,7 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 	abstract public void reply(ChannelHandlerContext ctx, String msg) throws Exception;
 	
 
+
 	protected void receiveMsg(String msg, String deviceId, Channel channel) {
 		Channel channelInMap = channelMapOfChannelKey.get(deviceId);
 		if (channelInMap == null) {
@@ -290,6 +301,32 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
 		return b;
 	}
 
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES));
+							ch.pipeline().addLast(this);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+			ChannelFuture f = b.bind(this.getPort()).sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+			f.channel().closeFuture().sync();
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		} finally {
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
+	
 	public String getDataPath() {
 		return dataPath;
 	}

+ 0 - 133
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceService.java

@@ -1,133 +0,0 @@
-package com.tidecloud.dataacceptance.service;
-
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.tomcat.util.threads.ThreadPoolExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.smartsanitation.common.util.StringUtil;
-import com.tidecloud.dataacceptance.common.Constants;
-import com.tidecloud.dataacceptance.entity.ConnectMsg;
-import com.tidecloud.dataacceptance.service.handle.YiTongGpsServerHandler;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import io.netty.handler.timeout.IdleStateHandler;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-
-/**
- * @author: chudk 
- */
-@Component
-public class AcceptanceService {
-
-    @Value("${server.netty.port}")
-    private Integer port;
-
-    @Value("${server.localaddress}")
-    private String address;
-
-    @Autowired
-    private YiTongGpsServerHandler yiTongGpsServerHandler;
-    
-    @Autowired
-    private JedisPool jedisPool;
-    
-    private static final Logger LOG = LoggerFactory.getLogger(AcceptanceService.class);
-    
-    @PostConstruct
-    public void run() throws Exception {
-        cleanRedisLinkData();
-        ThreadFactory nettyThreadFactory = new ThreadFactoryBuilder().setNameFormat("netty-pool-%d").build();
-        ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>(1024), nettyThreadFactory, new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
-        singleThreadPool.execute(new Runnable() {
-        
-            @Override
-            public void run() {
-                EventLoopGroup bossGroup = new NioEventLoopGroup();
-                EventLoopGroup workerGroup = new NioEventLoopGroup();
-                byte[] splitBytes1 = new byte[]{0x7e};
-                byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
-                try {
-                    ServerBootstrap b = new ServerBootstrap();
-                    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                            .childHandler(new ChannelInitializer<SocketChannel>() {
-                                @Override
-                                protected void initChannel(SocketChannel ch) throws Exception {
-                                    ch.pipeline().addLast("idleStateHandler",
-                                            new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
-                                    
-                                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204, Unpooled.copiedBuffer(splitBytes1), 
-                                            Unpooled.copiedBuffer(splitBytes2)));
-                                    ch.pipeline().addLast(yiTongGpsServerHandler);
-                                }
-                            }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
-                    ChannelFuture f = b.bind(port).sync();
-                    f.channel().closeFuture().sync();
-                    
-                } catch (InterruptedException e) {
-                	LOG.error("netty server InterruptedException:"+e.getMessage());
-                } finally {
-                //    cleanRedisLinkData();
-                    workerGroup.shutdownGracefully();
-                    bossGroup.shutdownGracefully();
-                }
-            }
-
-        });
-        singleThreadPool.shutdown();
-    }
-    
-    private void cleanRedisLinkData() {
-        Jedis jedis = null;
-        try {
-            jedis = jedisPool.getResource();
-            jedis.select(YiTongGpsServerHandler.REDIS_INDEX_LINK);
-            String addressStr = ConnectMsg.ipToLong(address);
-            String selectKey = YiTongGpsServerHandler.PREFIX_LINK_BACK + addressStr;
-            Set<String> values = jedis.smembers(selectKey);
-            
-            for (String deviceId : values) {
-                String deleteKeyOfDevice = YiTongGpsServerHandler.PREFIX_DEVICE + deviceId;
-                String deleteKeyOfLink = YiTongGpsServerHandler.PREFIX_LINK + addressStr;
-                String connectMsgStr = jedis.get(deleteKeyOfDevice);
-                if (connectMsgStr != null) {
-                    ConnectMsg connectMsg = StringUtil.convert2Object(connectMsgStr, ConnectMsg.class);
-                    String socketId = connectMsg.getSocketId();
-                    jedis.del(deleteKeyOfDevice);
-                    jedis.srem(deleteKeyOfLink, socketId);
-                }else {
-                    LOG.error("error deviceId [{}] in select [{}] key [{}]", deviceId, 15, deleteKeyOfDevice);
-                }
-            }
-            jedis.del(selectKey);
-        } catch (Exception e) {
-            LOG.error(e.getLocalizedMessage());
-        } finally {
-            if (jedis != null) {
-                jedisPool.returnResource(jedis);
-            }
-        }
-    }
-}

+ 55 - 5
src/main/java/com/tidecloud/dataacceptance/service/handle/YiTongGpsServerHandler.java → src/main/java/com/tidecloud/dataacceptance/service/impl/BSJGpsServerHandler.java

@@ -1,6 +1,7 @@
-package com.tidecloud.dataacceptance.service.handle;
+package com.tidecloud.dataacceptance.service.impl;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -23,15 +24,26 @@ import com.tidecloud.dataacceptance.entity.Session;
 import com.tidecloud.dataacceptance.entity.SessionManager;
 import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg;
 import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg;
+import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
 import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
 
+import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 
@@ -40,9 +52,9 @@ import redis.clients.jedis.JedisPool;
  */
 @Component
 @ChannelHandler.Sharable
-public class YiTongGpsServerHandler extends ChannelInboundHandlerAdapter {
+public class BSJGpsServerHandler extends AcceptanceInboundHandlerAdapter {
 
-    private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(BSJGpsServerHandler.class);
     public static String PREFIX_LINK = "s.";
     public static String PREFIX_LINK_BACK = "s.b.";
     public static String PREFIX_DEVICE = "d.";
@@ -64,7 +76,7 @@ public class YiTongGpsServerHandler extends ChannelInboundHandlerAdapter {
      * @Title:  YiTongGpsServerHandler   
      * @Description: initialzation sessionManager and msgDecoder   
      */
-    public YiTongGpsServerHandler() {
+    public BSJGpsServerHandler() {
         this.sessionManager = SessionManager.getInstance();
         this.decoder = new MsgDecoder();
         this.msgProcessService = new TerminalMsgProcessService();
@@ -283,4 +295,42 @@ public class YiTongGpsServerHandler extends ChannelInboundHandlerAdapter {
             }
         }
     }
+    
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		 byte[] splitBytes1 = new byte[]{0x7e};
+         byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast("idleStateHandler",
+                                    new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
+                            
+                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204, Unpooled.copiedBuffer(splitBytes1), 
+                                    Unpooled.copiedBuffer(splitBytes2)));
+                            ch.pipeline().addLast(this);
+                        }
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+			ChannelFuture f = b.bind(this.getPort()).sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+			f.channel().closeFuture().sync();
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		} finally {
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
+
+	@Override
+	public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
+		// TODO Auto-generated method stub
+		
+	}
 }

+ 5 - 5
src/main/java/com/tidecloud/dataacceptance/web/BSJDeviceController.java

@@ -18,7 +18,7 @@ import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.common.BitOperator;
 import com.tidecloud.dataacceptance.common.Constants;
 import com.tidecloud.dataacceptance.entity.ConnectMsg;
-import com.tidecloud.dataacceptance.service.handle.YiTongGpsServerHandler;
+import com.tidecloud.dataacceptance.service.impl.BSJGpsServerHandler;
 
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
@@ -30,7 +30,7 @@ import redis.clients.jedis.JedisPool;
  * @date:   2017年10月30日 上午9:44:50   
  */
 @RestController("setting")
-public class DeviceController {
+public class BSJDeviceController {
     
     private static final Logger logger = LoggerFactory.getLogger(DeviceController.class);
     
@@ -44,7 +44,7 @@ public class DeviceController {
             return ResultWrapper.ok();
         }
         try {
-            Map<String, Channel> channelMap = YiTongGpsServerHandler.socketyChannelMap;
+            Map<String, Channel> channelMap = BSJGpsServerHandler.socketyChannelMap;
             if (channelMap.isEmpty()) {
                 return ResultWrapper.ok();
             }
@@ -92,9 +92,9 @@ public class DeviceController {
         String[] deviceIdsArray = deviceIds.split(",");
         Jedis jedis = jedisPool.getResource();
         try {
-            jedis.select(YiTongGpsServerHandler.REDIS_INDEX_LINK);
+            jedis.select(BSJGpsServerHandler.REDIS_INDEX_LINK);
             for (String deviceId : deviceIdsArray) {
-                String selectKey = YiTongGpsServerHandler.PREFIX_DEVICE + deviceId;
+                String selectKey = BSJGpsServerHandler.PREFIX_DEVICE + deviceId;
                 String result = jedis.get(selectKey);
                 if (result != null) {
                     ConnectMsg connectMsg = StringUtil.convert2Object(result, ConnectMsg.class);

+ 2 - 2
src/main/resources/application.yml

@@ -47,8 +47,8 @@ acceptance:
     topic: device-bsj
     ip: 10.25.19.87
     port: 6707
-    dataFileDir: /home/service/collector_7510/rawdata/
-    handlerClass: com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler
+    dataFileDir: /home/service/collector_6707/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.BSJGpsServerHandler
     enable: true
           
 logging: