123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- package com.tidecloud.dataacceptance.service.handle;
- import java.util.HashMap;
- import java.util.Map;
- import javax.xml.bind.DatatypeConverter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import com.smartsanitation.common.util.StringUtil;
- import com.tidecloud.dataacceptance.codec.MsgDecoder;
- import com.tidecloud.dataacceptance.common.Constants;
- import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
- import com.tidecloud.dataacceptance.entity.ConnectMsg;
- import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg;
- import com.tidecloud.dataacceptance.entity.LocationSelfInfoUploadMsg;
- import com.tidecloud.dataacceptance.entity.PackageData;
- import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
- import com.tidecloud.dataacceptance.entity.Session;
- import com.tidecloud.dataacceptance.entity.SessionManager;
- import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg;
- import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg;
- import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /**
- * @author cdk
- */
- @Component
- @ChannelHandler.Sharable
- public class YiTongGpsServerHandler extends ChannelInboundHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(YiTongGpsServerHandler.class);
- public static String PREFIX_LINK = "s.";
- public static String PREFIX_LINK_BACK = "s.b.";
- public static String PREFIX_DEVICE = "d.";
-
- public static final Integer REDIS_INDEX_LINK = 15;
- public static Map<String, Channel> socketyChannelMap = new HashMap<>();
- public static Map<Channel, String> channelDeviceMap = new HashMap<>();
- public static Map<String, String> commandCopy = new HashMap<>();
-
- private final SessionManager sessionManager;
- private final MsgDecoder decoder;
- private TerminalMsgProcessService msgProcessService;
-
- /**
- *
- * @Title: YiTongGpsServerHandler
- * @Description: initialzation sessionManager and msgDecoder
- */
- public YiTongGpsServerHandler() {
- this.sessionManager = SessionManager.getInstance();
- this.decoder = new MsgDecoder();
- this.msgProcessService = new TerminalMsgProcessService();
- }
-
- @Autowired
- private JedisPool jedisPool;
-
- @Value("${server.localaddress}")
- private String address;
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf dataByteBuf = (ByteBuf) msg;
- // print acceptance data
- printAcceptanceData(dataByteBuf);
- try {
- if (!dataByteBuf.isReadable()) {
- return;
- }
- byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
- dataByteBuf.readBytes(dataByteArray);
- byte[] dataByteArrayDoEscape = JT808ProtocolUtils.
- doEscape4Receive(dataByteArray, 0, dataByteArray.length);
- //let dataByteArray transfer 808DataEntity
- PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
- //link manage
- packageData.setChannel(ctx.channel());
- this.processPackageData(packageData);
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- ctx.flush();
- }
-
- private void manageChannel(Channel channel, String deviceId) {
- String socketkey = channel.id().asLongText();
- Channel channelInMap = socketyChannelMap.get(socketkey);
- String deviceIdInMap = channelDeviceMap.get(channel);
- if (channelInMap != null && deviceIdInMap != null) {
- logger.debug("device [{}] has link [{}]", deviceId, socketkey);
- return;
- }
- socketyChannelMap.put(socketkey, channel);
- channelDeviceMap.put(channel, deviceId);
- String addressStr = ConnectMsg.ipToLong(address);
- ConnectMsg cMsg = new ConnectMsg(address, socketkey);
- Jedis jedis = jedisPool.getResource();
- try {
- jedis.select(REDIS_INDEX_LINK);
- String insertKey = PREFIX_LINK + addressStr;
- String selectKey = PREFIX_DEVICE + deviceId;
- String insertBackupKey = PREFIX_LINK_BACK + addressStr;
- jedis.sadd(insertKey, socketkey);
- jedis.sadd(insertBackupKey, deviceId);
- jedis.set(selectKey, StringUtil.convert2String(cMsg));
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- jedis.close();
- }
- }
-
- private void deleteLinkFromRedis(String deviceId) {
- String deleteKey = PREFIX_DEVICE + deviceId;
- try(Jedis jedis = jedisPool.getResource()) {
- jedis.select(REDIS_INDEX_LINK);
- String connectMsg = jedis.get(deleteKey);
- if (connectMsg != null) {
- ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
- String socketId = cmsg.getSocketId();
- socketyChannelMap.remove(socketId);
- String socketQueryKey = PREFIX_LINK + address;
- jedis.srem(socketQueryKey, socketId);
- jedis.del(deleteKey);
- logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
- }
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- private void processPackageData(PackageData packageData) {
- final MsgHeader header = packageData.getMsgHeader();
- // 1. 终端心跳-消息体为空 ==> 平台通用应答
- if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
- logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- this.msgProcessService.processTerminalHeartBeatMsg(packageData);
- manageChannel(packageData.getChannel(), header.getTerminalPhone());
- logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 5. 终端鉴权 ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
- logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
- this.msgProcessService.processAuthMsg(authenticationMsg);
- logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 6. 终端注册 ==> 终端注册应答
- else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
- logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
- this.msgProcessService.processRegisterMsg(msg);
- logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
- logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- this.msgProcessService.processTerminalLogoutMsg(packageData);
- logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 3. 自定义位置信息汇报 ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_CUSTOMIZE_LOCATION_INFO_UPLOAD == header.getMsgId()) {
- logger.info(">>>>>[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- LocationSelfInfoUploadMsg locationInfoUploadMsg = this.decoder.toSelfLocationInfoUploadMsg(packageData);
- this.msgProcessService.processSelfLocationInfoUploadMsg(locationInfoUploadMsg);
- logger.info("<<<<<[自定义位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[自定义位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage(), e);
- }
- }
- // 4. 位置信息汇报 ==> 平台通用应答
- else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
- logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- try {
- LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
- this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
- logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- } catch (Exception e) {
- logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
- e.getMessage());
- }
- }
- // 5. 车辆控制回复
- else if (Constants.MSG_TERMINAL_CAR_CONTROL_REPLY == header.getMsgId()) {
- logger.info(">>>>>[车辆控制回复],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
- }
- // 其他情况
- else {
- logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
- packageData);
- }
- }
- private void printAcceptanceData(ByteBuf dataByteBuf) {
- ByteBuf dataByteBufCopy = dataByteBuf.copy();
- byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
- dataByteBufCopy.readBytes(dataByteArray);
- String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray);
- logger.info("acceptance original data [{}]", printHexBinary);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- if (!channel.isActive()) {
- String deviceId = channelDeviceMap.get(channel);
- if (deviceId != null) {
- channelDeviceMap.remove(channel);
- deleteLinkFromRedis(deviceId);
- }
- }
- super.channelInactive(ctx);
- ctx.close();
- final String sessionId = ctx.channel().id().asLongText();
- Session session = sessionManager.findBySessionId(sessionId);
- this.sessionManager.removeBySessionId(sessionId);
- logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session));
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- Session session = Session.buildSession(ctx.channel());
- sessionManager.put(session.getId(), session);
- logger.debug("client linking server session : [{}]", StringUtil.convert2String(session));
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
- logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session));
- ctx.close();
- }
- }
- }
- }
|