Browse Source

修改 解析方法

jianghouwei 6 years ago
parent
commit
c9c4384703

+ 163 - 121
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchJWServerHandler.java

@@ -1,25 +1,28 @@
 package com.tidecloud.dataacceptance.service.impl;
 
-import com.tidecloud.dataacceptance.DataAcceptanceApplication;
+import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder;
 import com.tidecloud.dataacceptance.common.DateUtil;
-import com.tidecloud.dataacceptance.entity.Advice;
 import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
+import io.netty.channel.*;
 import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.lang.StringUtils;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.springframework.boot.SpringApplication;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
-import javax.xml.bind.DatatypeConverter;
-import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -33,118 +36,157 @@ import java.util.concurrent.TimeUnit;
 @Component(WatchJWServerHandler.name)
 public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
-    public static final String name = "WatchJWServerHandler";
-
-    private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
-
-    private static ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-    @Override
-    protected void handle(ByteBuf in, Channel channel) throws Exception {
-        String msg = byteBufferToString(in.nioBuffer());
-        String deviceId = channelDeviceMap.get(channel);
-        if (deviceId != null) {
-            MDC.put(MDC_DEVICEID, deviceId);
-        }
-        logger.info("传入数据为:" + msg);
-        String factory = msg.substring(0, 2);// 工厂
-        String type = msg.substring(2, 6);// 标记
-        switch (type) {
-            case "AP00": // 初始化登录
-                resolveLoginMSG(msg, channel);
-                break;
-            case "AP03": // 连接(心跳) BP03#
-                normalReply(factory, channel, "BP03");
-                break;
-            case "AP01": // 位置信息
-                sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
-                normalReply(factory, channel, "BP01");
-                break;
-            default: // 其他
-                logger.info("client send data without handle type ...");
-                break;
-        }
-    }
-
-
-    protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
-        ByteBuf dataByteBufCopy = dataByteBuf.copy();
-        byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
-        dataByteBufCopy.readBytes(dataByteArray);
-        dataByteBufCopy.release();
-    }
-
-    /**
-     * 登录管理
-     *
-     * @param channel
-     */
-    private void resolveLoginMSG(String msg, Channel channel) {
-        /*IWAP00353456789012345# */
-        String message = String.valueOf(msg);
-        String factory = message.substring(0, 2);
-        String deviceId = message.substring(6, 21);
-        String deviceIdInMap = channelDeviceMap.get(channel);
-        MDC.put(MDC_DEVICEID, deviceId);
-        if (!deviceId.equals(deviceIdInMap)) {
-            manageChannel(channel, deviceId);
-        }
-        String date =DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
-        normalReply(factory, channel, "BP00," + date + ",8");
-        Date loginTime = new Date();
-        executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-                Date currentTime = new Date();
-                try {
-                    Long secondsSinceLogin  = (currentTime.getTime() - loginTime.getTime() )/1000;
-                    if(secondsSinceLogin < 5L){
-                        TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
-                    }
-                    normalReplyModel(factory,deviceId,channel);
-                } catch (InterruptedException e) {
-                    logger.error(e.getMessage());
-                }
-            }
-        });
-
-    }
-
-
-    // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
-    private void normalReplyModel(String factory,String deviceId, Channel channel){
-        StringBuilder replyCommand = new StringBuilder();
-        replyCommand.append(factory).append("BP33").append(",")
-                .append(deviceId).append(",")
-                .append("080835").append(",")
-                .append("3").append("#");
-        String replyCommandStr = replyCommand.toString();
-        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
-        buffer.writeBytes(replyCommandStr.getBytes());
-        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
-        channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
-    }
-
-    /**
-     * 回复
-     *
-     * @param channel
-     * @content 回复内容
-     */
-    private void normalReply(String factory, Channel channel, String content) {
-        // gps ==== >IW BP01#
-        // 登录 ==== >IW BP00,20150101125223,8#
-        // 心跳 ==== >IW BP03#
-        StringBuilder replyCommand = new StringBuilder();
-        replyCommand.append(factory);
-        replyCommand.append(content);
-        replyCommand.append("#");
-        String replyCommandStr = replyCommand.toString();
-        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
-        buffer.writeBytes(replyCommandStr.getBytes());
-        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
-        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
-    }
+	public static final String name = "WatchJWServerHandler";
+
+	private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
+
+	private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+
+		// String msg = byteBufferToString(in.nioBuffer());TODO (注意)
+		byte[] req = new byte[in.readableBytes()];
+		in.readBytes(req);
+		String msg = new String(req,"UTF-8");
+		String deviceId = channelDeviceMap.get(channel);
+		if (deviceId != null) {
+			MDC.put(MDC_DEVICEID, deviceId);
+		}
+		try {
+			logger.info("传入数据为:" + msg);
+			String factory = msg.substring(0, 2);// 工厂
+			String type = msg.substring(2, 6);// 标记
+			switch (type) {
+				case "AP00": // 初始化登录
+					resolveLoginMSG(msg, channel);
+					break;
+				case "AP03": // 连接(心跳) BP03#
+					normalReply(factory, channel, "BP03");
+					break;
+				case "AP01": // 位置信息
+					sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
+					normalReply(factory, channel, "BP01");
+					break;
+				default: // 其他
+					logger.info("client send data without handle type ...");
+					break;
+			}
+		} finally {
+			MDC.clear();
+		}
+	}
+
+
+	protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
+		ByteBuf dataByteBufCopy = dataByteBuf.copy();
+		byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
+		dataByteBufCopy.readBytes(dataByteArray);
+		dataByteBufCopy.release();
+	}
+
+	/**
+	 * 登录管理
+	 *
+	 * @param channel
+	 */
+	private void resolveLoginMSG(String msg, Channel channel) {
+		/*IWAP00353456789012345# */
+		String message = String.valueOf(msg);
+		String factory = message.substring(0, 2);
+		String deviceId = message.substring(6, 21);
+		String deviceIdInMap = channelDeviceMap.get(channel);
+		MDC.put(MDC_DEVICEID, deviceId);
+		if (!deviceId.equals(deviceIdInMap)) {
+			manageChannel(channel, deviceId);
+		}
+		String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
+		normalReply(factory, channel, "BP00," + date + ",8");
+		Date loginTime = new Date();
+		executorService.execute(new Runnable() {
+			@Override
+			public void run() {
+				Date currentTime = new Date();
+				try {
+					Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
+					if (secondsSinceLogin < 5L) {
+						TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
+					}
+					normalReplyModel(factory, deviceId, channel);
+				} catch (InterruptedException e) {
+					logger.error(e.getMessage());
+				}
+			}
+		});
+
+	}
+
+
+	// IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
+	private void normalReplyModel(String factory, String deviceId, Channel channel) {
+		StringBuilder replyCommand = new StringBuilder();
+		replyCommand.append(factory).append("BP33").append(",")
+				.append(deviceId).append(",")
+				.append("080835").append(",")
+				.append("3").append("#");
+		String replyCommandStr = replyCommand.toString();
+		ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+		buffer.writeBytes(replyCommandStr.getBytes());
+		ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+		channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
+	}
+
+	/**
+	 * 回复
+	 *
+	 * @param channel
+	 * @content 回复内容
+	 */
+	private void normalReply(String factory, Channel channel, String content) {
+		// gps ==== >IW BP01#
+		// 登录 ==== >IW BP00,20150101125223,8#
+		// 心跳 ==== >IW BP03#
+		StringBuilder replyCommand = new StringBuilder();
+		replyCommand.append(factory);
+		replyCommand.append(content);
+		replyCommand.append("#");
+		String replyCommandStr = replyCommand.toString();
+		ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+		buffer.writeBytes(replyCommandStr.getBytes());
+		ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+		channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
+	}
+
+	@Override
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup)
+					.channel(NioServerSocketChannel.class)
+					.option(ChannelOption.SO_BACKLOG, 1024)
+					.handler(new LoggingHandler(LogLevel.INFO))
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) {
+							ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
+							ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
+							ch.pipeline().addLast(WatchJWServerHandler.this);
+						}
+					});
+			ChannelFuture f = b.bind(port).sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+			f.channel().closeFuture().sync();
+		} catch (Exception ex) {
+			logger.warn(ex.getMessage(), ex);
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
 
 
 }