Browse Source

调整 hengsheng 去挑不必要回复

jianghouwei 6 years ago
parent
commit
ad2d8d0c36

+ 50 - 12
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchHenShengServerHandler.java

@@ -3,12 +3,17 @@ package com.tidecloud.dataacceptance.service.impl;
 import com.tidecloud.dataacceptance.common.DateUtil;
 import com.tidecloud.dataacceptance.entity.Advice;
 import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
+import io.netty.channel.*;
 import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
@@ -38,6 +43,9 @@ public class WatchHenShengServerHandler extends HexBinaryAcceptanceHandlerAdapte
 		String msg = new String(req, "UTF-8");
 		logger.info("传入数据:》》》》》》》》》" + msg);
 		Advice advice = setAdvice(msg);
+		if(advice == null){
+			return ;
+		}
 		switch (advice.getAdviceType()) {
 			case "INIT": // 初始化
 				String init = "INIT,1";
@@ -52,21 +60,21 @@ public class WatchHenShengServerHandler extends HexBinaryAcceptanceHandlerAdapte
 				normalReply(advice, channel, sb.toString());
 				break;
 			case "UD": // 位置信息
-				byte[] dataUDArray = new byte[in.readableBytes()];
-				in.readBytes(dataUDArray);
-				sendMsg2Kafka(dataUDArray, advice.getDeviceId(), channel);
+				sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
 				logger.info("位置数据上报[UD]:" + advice.toString());
 				normalReply(advice, channel, "UD");
 				break;
 			case "UD2": // 位置信息
-				byte[] dataUD2Array = new byte[in.readableBytes()];
-				in.readBytes(dataUD2Array);
 				logger.info("盲点补传数据:" + advice.toString());
-				sendMsg2Kafka(dataUD2Array, advice.getDeviceId(), channel);
+				sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
 				//normalReply(advice, channel, "UD2");
 				break;
+			case "UPLOAD": // 位置信息
+				logger.info("修改上报频率成功!");
+				break;
 			default: // 其他
-				normalReply(advice, channel, advice.getAdviceType());
+				logger.info("client send data without handle type ...");
+				//normalReply(advice, channel, advice.getAdviceType());
 				break;
 		}
 	}
@@ -169,10 +177,40 @@ public class WatchHenShengServerHandler extends HexBinaryAcceptanceHandlerAdapte
 				advice.setAdviceType(contents[0]);// 标记
 				return advice;
 			} catch (Exception e) {
-				e.printStackTrace();
+				logger.error(e.getMessage(),e);
 			}
 		}
-
 		return null;
 	}
+
+	@Override
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup)
+					.channel(NioServerSocketChannel.class)
+					.option(ChannelOption.SO_BACKLOG, 1024)
+					.handler(new LoggingHandler(LogLevel.INFO))
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) {
+							ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes());
+							ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
+							ch.pipeline().addLast(WatchHenShengServerHandler.this);
+						}
+					});
+			ChannelFuture f = b.bind(port).sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+			f.channel().closeFuture().sync();
+		} catch (Exception ex) {
+			logger.warn(ex.getMessage(), ex);
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
 }