|  | @@ -0,0 +1,206 @@
 | 
	
		
			
				|  |  | +package com.tidecloud.dataacceptance.service.impl;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.codec.MsgDecoder;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.common.Constants;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.common.DateUtil;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.entity.*;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 | 
	
		
			
				|  |  | +import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
 | 
	
		
			
				|  |  | +import io.netty.bootstrap.ServerBootstrap;
 | 
	
		
			
				|  |  | +import io.netty.buffer.ByteBuf;
 | 
	
		
			
				|  |  | +import io.netty.buffer.Unpooled;
 | 
	
		
			
				|  |  | +import io.netty.channel.*;
 | 
	
		
			
				|  |  | +import io.netty.channel.nio.NioEventLoopGroup;
 | 
	
		
			
				|  |  | +import io.netty.channel.socket.SocketChannel;
 | 
	
		
			
				|  |  | +import io.netty.channel.socket.nio.NioServerSocketChannel;
 | 
	
		
			
				|  |  | +import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 | 
	
		
			
				|  |  | +import io.netty.handler.timeout.IdleStateHandler;
 | 
	
		
			
				|  |  | +import org.slf4j.Logger;
 | 
	
		
			
				|  |  | +import org.slf4j.LoggerFactory;
 | 
	
		
			
				|  |  | +import org.slf4j.MDC;
 | 
	
		
			
				|  |  | +import org.springframework.context.annotation.Scope;
 | 
	
		
			
				|  |  | +import org.springframework.data.redis.util.ByteUtils;
 | 
	
		
			
				|  |  | +import org.springframework.stereotype.Component;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import java.util.Date;
 | 
	
		
			
				|  |  | +import java.util.concurrent.TimeUnit;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/**
 | 
	
		
			
				|  |  | + * @author cdk
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  | +@Component
 | 
	
		
			
				|  |  | +@Scope("prototype")
 | 
	
		
			
				|  |  | +@ChannelHandler.Sharable
 | 
	
		
			
				|  |  | +public class JSTGPServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	private static final Logger logger = LoggerFactory.getLogger(JSTGPServerHandler.class);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	private final MsgDecoder decoder = new MsgDecoder();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	@Override
 | 
	
		
			
				|  |  | +	public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		try {
 | 
	
		
			
				|  |  | +			if (!dataByteBuf.isReadable()) {
 | 
	
		
			
				|  |  | +				return;
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +			byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
 | 
	
		
			
				|  |  | +			dataByteBuf.readBytes(dataByteArray);
 | 
	
		
			
				|  |  | +			byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length);
 | 
	
		
			
				|  |  | +			// let dataByteArray transfer 808DataEntity
 | 
	
		
			
				|  |  | +			PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
 | 
	
		
			
				|  |  | +			// link manage
 | 
	
		
			
				|  |  | +			packageData.setChannel(channel);
 | 
	
		
			
				|  |  | +			String deviceId = packageData.getMsgHeader().getTerminalPhone();
 | 
	
		
			
				|  |  | +			MDC.put(MDC_DEVICEID, deviceId);
 | 
	
		
			
				|  |  | +			packageData.setDeviceId(deviceId);
 | 
	
		
			
				|  |  | +			//发送数据到kafka
 | 
	
		
			
				|  |  | +			final MsgHeader header = packageData.getMsgHeader();
 | 
	
		
			
				|  |  | +			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId() || Constants.JT808_BATTERY == header.getMsgId()) {
 | 
	
		
			
				|  |  | +				sendMsg2Kafka(byteMerger(dataByteArray, DateUtil.formatDate2String(new Date()).getBytes()), packageData.getDeviceId(), channel);
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +			this.processPackageData(packageData);
 | 
	
		
			
				|  |  | +		} catch (Exception e) {
 | 
	
		
			
				|  |  | +			logger.error(e.getLocalizedMessage());
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
 | 
	
		
			
				|  |  | +		byte[] byte_3 = new byte[byte_1.length + byte_2.length];
 | 
	
		
			
				|  |  | +		System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
 | 
	
		
			
				|  |  | +		System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
 | 
	
		
			
				|  |  | +		return byte_3;
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	private void processPackageData(PackageData packageData) {
 | 
	
		
			
				|  |  | +		final MsgHeader header = packageData.getMsgHeader();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		// 1. 终端心跳-消息体为空 ==> 平台通用应答
 | 
	
		
			
				|  |  | +		if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				this.msgProcessService.processTerminalHeartBeatMsg(packageData);
 | 
	
		
			
				|  |  | +				manageChannel(packageData.getChannel(), header.getTerminalPhone());
 | 
	
		
			
				|  |  | +				logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
 | 
	
		
			
				|  |  | +						e.getMessage());
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +		// 5. 终端鉴权 ==> 平台通用应答
 | 
	
		
			
				|  |  | +		else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
 | 
	
		
			
				|  |  | +				this.msgProcessService.processAuthMsg(authenticationMsg);
 | 
	
		
			
				|  |  | +				manageChannel(packageData.getChannel(), header.getTerminalPhone());
 | 
	
		
			
				|  |  | +				logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
 | 
	
		
			
				|  |  | +						e.getMessage());
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		// 6. 终端注册 ==> 终端注册应答
 | 
	
		
			
				|  |  | +		else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
 | 
	
		
			
				|  |  | +				this.msgProcessService.processRegisterMsg(msg);
 | 
	
		
			
				|  |  | +				logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
 | 
	
		
			
				|  |  | +						e.getMessage());
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		// 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
 | 
	
		
			
				|  |  | +		else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				this.msgProcessService.processTerminalLogoutMsg(packageData);
 | 
	
		
			
				|  |  | +				logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
 | 
	
		
			
				|  |  | +						e.getMessage());
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		// 4. 位置信息汇报 ==> 平台通用应答
 | 
	
		
			
				|  |  | +		else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
 | 
	
		
			
				|  |  | +				this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
 | 
	
		
			
				|  |  | +				logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
 | 
	
		
			
				|  |  | +						e.getMessage());
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		// 3. 自定义电量信息汇报 ==> 平台通用应答
 | 
	
		
			
				|  |  | +		else if (Constants.JT808_BATTERY == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[自定义电量信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				this.msgProcessService.processTerminaMsg(packageData);
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[自定义电量信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
 | 
	
		
			
				|  |  | +						header.getFlowId(), e.getMessage(), e);
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		// 3. 自定义卡片登入汇报 ==> 平台通用应答
 | 
	
		
			
				|  |  | +		else if (Constants.JT808_KP_LOGIN == header.getMsgId()) {
 | 
	
		
			
				|  |  | +			logger.info(">>>>>[自定义卡片登录信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
 | 
	
		
			
				|  |  | +			try {
 | 
	
		
			
				|  |  | +				this.msgProcessService.processTerminaMsg(packageData);
 | 
	
		
			
				|  |  | +			} catch (Exception e) {
 | 
	
		
			
				|  |  | +				logger.error("<<<<<[自定义卡片登录信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
 | 
	
		
			
				|  |  | +						header.getFlowId(), e.getMessage(), e);
 | 
	
		
			
				|  |  | +			}
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +		// 其他情况
 | 
	
		
			
				|  |  | +		else {
 | 
	
		
			
				|  |  | +			logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
 | 
	
		
			
				|  |  | +					packageData);
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +	public void startAcceptor() {
 | 
	
		
			
				|  |  | +		EventLoopGroup bossGroup = new NioEventLoopGroup();
 | 
	
		
			
				|  |  | +		EventLoopGroup workerGroup = new NioEventLoopGroup();
 | 
	
		
			
				|  |  | +		byte[] splitBytes1 = new byte[]{0x7e};
 | 
	
		
			
				|  |  | +		byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
 | 
	
		
			
				|  |  | +		try {
 | 
	
		
			
				|  |  | +			ServerBootstrap b = new ServerBootstrap();
 | 
	
		
			
				|  |  | +			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
 | 
	
		
			
				|  |  | +					.childHandler(new ChannelInitializer<SocketChannel>() {
 | 
	
		
			
				|  |  | +						@Override
 | 
	
		
			
				|  |  | +						protected void initChannel(SocketChannel ch) throws Exception {
 | 
	
		
			
				|  |  | +							ch.pipeline().addLast("idleStateHandler",
 | 
	
		
			
				|  |  | +									new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +							ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204,
 | 
	
		
			
				|  |  | +									Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2)));
 | 
	
		
			
				|  |  | +							ch.pipeline().addLast(JSTGPServerHandler.this);
 | 
	
		
			
				|  |  | +						}
 | 
	
		
			
				|  |  | +					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +			ChannelFuture f = b.bind(this.getPort()).sync();
 | 
	
		
			
				|  |  | +			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
 | 
	
		
			
				|  |  | +					this.getPort());
 | 
	
		
			
				|  |  | +			f.channel().closeFuture().sync();
 | 
	
		
			
				|  |  | +		} catch (Exception e) {
 | 
	
		
			
				|  |  | +			logger.error(e.getMessage());
 | 
	
		
			
				|  |  | +		} finally {
 | 
	
		
			
				|  |  | +			cleanRedisLinkData();
 | 
	
		
			
				|  |  | +			workerGroup.shutdownGracefully();
 | 
	
		
			
				|  |  | +			bossGroup.shutdownGracefully();
 | 
	
		
			
				|  |  | +		}
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +}
 |