Ver Fonte

feature: 为西海岸808协议添加转发功能
1. 抽取808协议转发代码
2. bsj和西海岸808调用抽取的转发代码

vinson há 4 anos atrás
pai
commit
1abc63340d

+ 36 - 0
src/main/java/com/tidecloud/dataacceptance/service/HexBinaryAcceptanceHandlerAdapter.java

@@ -2,6 +2,10 @@ package com.tidecloud.dataacceptance.service;
 
 import javax.xml.bind.DatatypeConverter;
 
+import com.smartsanitation.common.util.StringUtil;
+import com.tidecloud.dataacceptance.common.Constants;
+import com.tidecloud.dataacceptance.entity.PackageData;
+import com.tidecloud.iot.vo.MsgWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -9,6 +13,10 @@ import org.slf4j.MDC;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFutureCallback;
 
 /**
  * 16 进制 2进制  抽象
@@ -16,6 +24,11 @@ import io.netty.channel.ChannelHandlerContext;
 public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboundHandlerAdapter {
     private static final Logger logger = LoggerFactory.getLogger(HexBinaryAcceptanceHandlerAdapter.class);
 
+    private static final String originalTopic = "msg_for_dispatche";
+
+    @Autowired
+    private KafkaTemplate<Integer, String> kafkaTemplate;
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         super.channelRead(ctx, msg);
@@ -63,4 +76,27 @@ public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboun
 
     abstract protected void handle(ByteBuf in, Channel channel) throws Exception;
 
+    protected void sendOrginalMsgFor808(byte[] dataByteArray, String deviceId, PackageData.MsgHeader header) {
+        final int msgId = header.getMsgId();
+        boolean isInitial = Constants.MSG_TERMINAL_REGISTER.equals(msgId);
+        sendOriginalMsg(dataByteArray, deviceId, isInitial);
+    }
+    protected void sendOriginalMsg(byte[] dataByteArray, String deviceId, boolean isInitial) {
+        MsgWrapper msgWrapper = new MsgWrapper(deviceId, DatatypeConverter.printHexBinary(dataByteArray), isInitial);
+        msgWrapper.setProtocolCode(0);
+        final Integer key = deviceId.hashCode();
+        kafkaTemplate.send(originalTopic, key, StringUtil.convert2String(msgWrapper))
+                .addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
+                    @Override
+                    public void onFailure(Throwable ex) {
+                        logger.error(ex.getMessage(), ex);
+                    }
+
+                    @Override
+                    public void onSuccess(SendResult<Integer, String> result) {
+                        logger.info("send original msg success");
+                    }
+                });
+    }
+
 }

+ 2 - 32
src/main/java/com/tidecloud/dataacceptance/service/impl/BSJGpsServerHandler.java

@@ -2,15 +2,10 @@ package com.tidecloud.dataacceptance.service.impl;
 
 import java.util.concurrent.TimeUnit;
 
-import com.smartsanitation.common.util.StringUtil;
-import com.tidecloud.iot.vo.MsgWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Scope;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
 import com.tidecloud.dataacceptance.codec.MsgDecoder;
@@ -39,9 +34,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.timeout.IdleStateHandler;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-import javax.xml.bind.DatatypeConverter;
 
 /**
  * @author cdk
@@ -57,10 +49,7 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
 	private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
 
-	private static final String originalTopic = "msg_for_dispatche";
 
-	@Autowired
-	private KafkaTemplate<Integer, String> kafkaTemplate;
 
 	@Override
 	public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
@@ -81,7 +70,7 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 			packageData.setDeviceId(deviceId);
 			//发送数据到kafka
 			final MsgHeader header = packageData.getMsgHeader();
-			sendOrginalMsg(dataByteArray,  packageData.getDeviceId(), header);
+			sendOrginalMsgFor808(dataByteArray,  packageData.getDeviceId(), header);
 			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()
 					|| Constants.MSG_TERMINAL_LOCATION_INFO_BATCH_UPLOAD == header.getMsgId()) {
 				sendMsg2Kafka(dataByteArray, packageData.getDeviceId(), channel);
@@ -93,26 +82,6 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 		
 	}
 
-	private void sendOrginalMsg(byte[] dataByteArray, String deviceId, MsgHeader header) {
-		final int msgId = header.getMsgId();
-		boolean isInitial = Constants.MSG_TERMINAL_REGISTER.equals(msgId);
-		MsgWrapper msgWrapper = new MsgWrapper(deviceId, DatatypeConverter.printHexBinary(dataByteArray), isInitial);
-		msgWrapper.setProtocolCode(0);
-		final Integer key = deviceId.hashCode();
-		kafkaTemplate.send(originalTopic, key, StringUtil.convert2String(msgWrapper))
-				.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
-					@Override
-					public void onFailure(Throwable ex) {
-						logger.error(ex.getMessage(), ex);
-					}
-
-					@Override
-					public void onSuccess(SendResult<Integer, String> result) {
-						logger.info("send original msg success");
-					}
-				});
-
-	}
 
 	private void processPackageData(PackageData packageData) {
 		final MsgHeader header = packageData.getMsgHeader();
@@ -236,4 +205,5 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 		}
 	}
 
+
 }

+ 1 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/TianXiGPServerHandler.java

@@ -66,6 +66,7 @@ public class TianXiGPServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 			// 此段代码是公用,故直接采用的直接发送
 			// 此处添加的时间 解析端不使用。参见解析端
 			final MsgHeader header = packageData.getMsgHeader();
+			sendOrginalMsgFor808(dataByteArray,  packageData.getDeviceId(), header);
 			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
 				sendMsg2Kafka(byteMerger(dataByteArray, DateUtil.formatDate2String(new Date()).getBytes()), packageData.getDeviceId(), channel);
 			}

+ 10 - 0
src/test/java/com/tidecloud/dataacceptance/Test.java

@@ -0,0 +1,10 @@
+package com.tidecloud.dataacceptance;
+
+/**
+ * TODO: add description
+ *
+ * @author :vinson.liu
+ * @date :2020/7/31 11:30 上午
+ */
+public class Test {
+}