Browse Source

天熙视瓶设备接入

jianghouwei 6 years ago
parent
commit
682af51b47

+ 187 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/TianXiGPServerHandler.java

@@ -0,0 +1,187 @@
+package com.tidecloud.dataacceptance.service.impl;
+
+import com.tidecloud.dataacceptance.codec.MsgDecoder;
+import com.tidecloud.dataacceptance.common.Constants;
+import com.tidecloud.dataacceptance.common.DateUtil;
+import com.tidecloud.dataacceptance.common.JT808ProtocolUtils;
+import com.tidecloud.dataacceptance.entity.LocationInfoUploadMsg;
+import com.tidecloud.dataacceptance.entity.PackageData;
+import com.tidecloud.dataacceptance.entity.PackageData.MsgHeader;
+import com.tidecloud.dataacceptance.entity.TerminalAuthenticationMsg;
+import com.tidecloud.dataacceptance.entity.TerminalRegisterMsg;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+import com.tidecloud.dataacceptance.service.TerminalMsgProcessService;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author cdk
+ */
+@Component
+@Scope("prototype")
+@ChannelHandler.Sharable
+public class TianXiGPServerHandler extends HexBinaryAcceptanceHandlerAdapter {
+
+	private static final Logger logger = LoggerFactory.getLogger(TianXiGPServerHandler.class);
+
+	private final MsgDecoder decoder = new MsgDecoder();
+
+
+	private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
+
+	@Override
+	public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
+
+		try {
+			if (!dataByteBuf.isReadable()) {
+				return;
+			}
+			byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
+			dataByteBuf.readBytes(dataByteArray);
+			byte[] dataByteArrayDoEscape = JT808ProtocolUtils.doEscape4Receive(dataByteArray, 0, dataByteArray.length);
+			// let dataByteArray transfer 808DataEntity
+			PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
+			// link manage
+			packageData.setChannel(channel);
+			String deviceId = packageData.getMsgHeader().getTerminalPhone();
+			MDC.put(MDC_DEVICEID, deviceId);
+			packageData.setDeviceId(deviceId);
+			//发送数据到kafka
+			final MsgHeader header = packageData.getMsgHeader();
+			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
+				sendMsg2Kafka(byteMerger(dataByteArray, DateUtil.formatDate2String(new Date()).getBytes()), packageData.getDeviceId(), channel);
+			}
+			this.processPackageData(packageData);
+		} catch (Exception e) {
+			logger.error(e.getLocalizedMessage());
+		}
+
+	}
+
+	public static byte[] byteMerger(byte[] byte_1, byte[] byte_2) {
+		byte[] byte_3 = new byte[byte_1.length + byte_2.length];
+		System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
+		System.arraycopy(byte_2, 0, byte_3, byte_1.length, byte_2.length);
+		return byte_3;
+	}
+
+	private void processPackageData(PackageData packageData) {
+		final MsgHeader header = packageData.getMsgHeader();
+
+		// 1. 终端心跳-消息体为空 ==> 平台通用应答
+		if (Constants.MSG_TERMINAL_HEART_BEAT_ID == header.getMsgId()) {
+			logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				this.msgProcessService.processTerminalHeartBeatMsg(packageData);
+				manageChannel(packageData.getChannel(), header.getTerminalPhone());
+				logger.info("<<<<<[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+
+		// 5. 终端鉴权 ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_AUTHENTIFICATION_ID == header.getMsgId()) {
+			logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);
+				this.msgProcessService.processAuthMsg(authenticationMsg);
+				manageChannel(packageData.getChannel(), header.getTerminalPhone());
+				logger.info("<<<<<[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 6. 终端注册 ==> 终端注册应答
+		else if (Constants.MSG_TERMINAL_REGISTER == header.getMsgId()) {
+			logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);
+				this.msgProcessService.processRegisterMsg(msg);
+				logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_LOG_OUT == header.getMsgId()) {
+			logger.info(">>>>>[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				this.msgProcessService.processTerminalLogoutMsg(packageData);
+				logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 4. 位置信息汇报 ==> 平台通用应答
+		else if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
+			logger.info(">>>>>[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			try {
+				LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);
+				this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);
+				logger.info("<<<<<[位置信息],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());
+			} catch (Exception e) {
+				logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),
+						e.getMessage());
+			}
+		}
+		// 其他情况
+		else {
+			logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),
+					packageData);
+		}
+	}
+
+
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		byte[] splitBytes1 = new byte[]{0x7e};
+		byte[] splitBytes2 = new byte[]{0x7e, 0x7e};
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							ch.pipeline().addLast("idleStateHandler",
+									new IdleStateHandler(Constants.TCP_CLIENT_IDLE_MINUTES, 0, 0, TimeUnit.MINUTES));
+							ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1204,
+									Unpooled.copiedBuffer(splitBytes1), Unpooled.copiedBuffer(splitBytes2)));
+							ch.pipeline().addLast(TianXiGPServerHandler.this);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+			ChannelFuture f = b.bind(this.getPort()).sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+			f.channel().closeFuture().sync();
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
+
+}

+ 8 - 0
src/main/resources/dev/application.yml

@@ -144,6 +144,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.JSTGPServerHandler
     enable: true
+   -
+    name: tianxi_gps
+    topic: device-tianxi-gps
+    ip: 10.25.19.87
+    port: 6003
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.TianXiGPServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml

+ 8 - 0
src/main/resources/prod/application.yml

@@ -144,6 +144,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.JSTGPServerHandler
     enable: true
+   -
+    name: tianxi_gps
+    topic: device-tianxi-gps
+    ip: 10.25.19.87
+    port: 6003
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.TianXiGPServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml