123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- package com.tidecloud.dataacceptance.service.impl;
- import com.accept.client.DeviceMsgClient;
- import com.alibaba.fastjson.JSON;
- import com.tidecloud.dataacceptance.codec.MsgDecoder;
- import com.tidecloud.dataacceptance.common.Constants;
- import com.tidecloud.dataacceptance.common.DateUtil;
- import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
- import com.tidecloud.dataacceptance.entity.*;
- import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.DelimiterBasedFrameDecoder;
- import io.netty.handler.timeout.IdleStateHandler;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.data.redis.util.ByteUtils;
- import org.springframework.stereotype.Component;
- import java.util.Date;
- import java.util.concurrent.TimeUnit;
- /**
- * @author cdk
- */
- @Component
- @Scope("prototype")
- @ChannelHandler.Sharable
- public class JSTGPServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(JSTGPServerHandler.class);
- private final MsgDecoder decoder = new MsgDecoder();
- private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
- @Autowired
- private DeviceMsgClient deviceMsgClient;
- @Override
- public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
- try {
- if (!dataByteBuf.isReadable()) {
- return;
- }
- byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
- dataByteBuf.readBytes(dataByteArray);
- byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length);
- // let dataByteArray transfer 808DataEntity
- PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
- // link manage
- packageData.setChannel(channel);
- String deviceId = packageData.getMsgHeader().getTerminalPhone();
- MDC.put(MDC_DEVICEID, deviceId);
- packageData.setDeviceId(deviceId);
- //发送数据到kafka
- final MsgHeader header = packageData.getMsgHeader();
- if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId() || Constants.JT808_BATTERY == header.getMsgId()) {
- sendMsg2Kafka(byteMerger(dataByteArray, DateUtil.formatDate2String(new Date()).getBytes()), packageData.getDeviceId(), channel);
- }
- this.processPackageData(packageData);
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
- byte[] byte_3 = new byte[byte_1.length + byte_2.length];
- System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
- System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
- return byte_3;
- }
- private void processPackageData(PackageData packageData) {
- final MsgHeader header = packageData.getMsgHeader();
- // 1. 终端心跳-消息体为空 ==> 平台通用应答
- if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
- logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- this.msgProcessService.processTerminalHeartBeatMsg(packageData);
- manageChannel(packageData.getChannel(), header.getTerminalPhone());
- logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- deviceMsgClient.acceptDeviceMsgParam("jinshatian",
- packageData.getDeviceId(),
- 1, JSON.toJSONString(packageData, true), System.currentTimeMillis());
- } catch (Exception e) {
- logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 5. 终端鉴权 ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
- logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
- this.msgProcessService.processAuthMsg(authenticationMsg);
- manageChannel(packageData.getChannel(), header.getTerminalPhone());
- logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 6. 终端注册 ==> 终端注册应答
- else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
- logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
- this.msgProcessService.processRegisterMsg(msg);
- logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
- logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- this.msgProcessService.processTerminalLogoutMsg(packageData);
- logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 4. 位置信息汇报 ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
- logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
- this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
- logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- deviceMsgClient.acceptDeviceMsgParam("jinshatian",
- packageData.getDeviceId(),
- 2, JSON.toJSONString(packageData, true), System.currentTimeMillis());
- } catch (Exception e) {
- logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 3. 自定义电量信息汇报 ==> 平台通用应答
- else if (Constants.JT808_BATTERY == header.getMsgId()) {
- logger.info(">>>>>[自定义电量信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- this.msgProcessService.processTerminaMsg(packageData);
- } catch (Exception e) {
- logger.error("<<<<<[自定义电量信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
- header.getFlowId(), e.getMessage(), e);
- }
- }
- // 3. 自定义卡片登入汇报 ==> 平台通用应答
- else if (Constants.JT808_KP_LOGIN == header.getMsgId()) {
- logger.info(">>>>>[自定义卡片登录信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- this.msgProcessService.processTerminaMsg(packageData);
- } catch (Exception e) {
- logger.error("<<<<<[自定义卡片登录信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(),
- header.getFlowId(), e.getMessage(), e);
- }
- }
- // 其他情况
- else {
- logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
- packageData);
- }
- }
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- byte[] splitBytes1 = new byte[]{0x7e};
- byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("idleStateHandler",
- new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
- ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204,
- Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2)));
- ch.pipeline().addLast(JSTGPServerHandler.this);
- }
- }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture f = b.bind(this.getPort()).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- }
|