package com.tidecloud.dataacceptance.service.impl; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.tidecloud.dataacceptance.codec.MsgDecoder; import com.tidecloud.dataacceptance.common.Constants; import com.tidecloud.dataacceptance.common.JT808ProtocolUtils; import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg; import com.tidecloud.dataacceptance.entity.LocationSelfInfoUploadMsg; import com.tidecloud.dataacceptance.entity.PackageData; import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader; import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg; import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import com.tidecloud.dataacceptance.service.TerminalMsgProcessService; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; /** * @author cdk */ @Component @Scope("prototype") @ChannelHandler.Sharable public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(BSJGpsServerHandler.class); private final MsgDecoder decoder = new MsgDecoder(); private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService(); @Override public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception { try { if (!dataByteBuf.isReadable()) { return; } byte[] dataByteArray = new byte[dataByteBuf.readableBytes()]; dataByteBuf.readBytes(dataByteArray); byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length); // let dataByteArray transfer 808DataEntity PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape); // link manage packageData.setChannel(channel); String deviceId = packageData.getMsgHeader().getTerminalPhone(); MDC.put(MDC_DEVICEID, deviceId); packageData.setDeviceId(deviceId); //发送数据到kafka final MsgHeader header = packageData.getMsgHeader(); if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) { sendMsg2Kafka(dataByteArray, packageData.getDeviceId(), channel); } this.processPackageData(packageData); } catch (Exception e) { logger.error(e.getLocalizedMessage()); } } private void processPackageData(PackageData packageData) { final MsgHeader header = packageData.getMsgHeader(); // 1. 终端心跳-消息体为空 ==> 平台通用应答 if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) { logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { this.msgProcessService.processTerminalHeartBeatMsg(packageData); manageChannel(packageData.getChannel(), header.getTerminalPhone()); logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 5. 终端鉴权 ==> 平台通用应答 else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) { logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData); this.msgProcessService.processAuthMsg(authenticationMsg); manageChannel(packageData.getChannel(), header.getTerminalPhone()); logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 6. 终端注册 ==> 终端注册应答 else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) { logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData); this.msgProcessService.processRegisterMsg(msg); logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答 else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) { logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { this.msgProcessService.processTerminalLogoutMsg(packageData); logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 3. 自定义位置信息汇报 ==> 平台通用应答 else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) { logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData); this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg); logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage(), e); } } // 4. 位置信息汇报 ==> 平台通用应答 else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) { logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); try { LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData); this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg); logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } catch (Exception e) { logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(), e.getMessage()); } } // 5. 车辆控制回复 else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) { logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId()); } // 其他情况 else { logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(), packageData); } } public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); byte[] splitBytes1 = new byte[] { 0x7e }; byte[] splitBytes2 = new byte[] { 0x7e, 0x7e }; try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES)); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204, Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2))); ch.pipeline().addLast(BSJGpsServerHandler.this); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(this.getPort()).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (Exception e) { logger.error(e.getMessage()); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }