Ver Fonte

易通支持kafka

rainbow954 há 7 anos atrás
pai
commit
203451ad5c

+ 7 - 6
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceInboundHandlerAdapter.java

@@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.xml.bind.DatatypeConverter;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -86,15 +87,15 @@ public  class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
 
 	private String topic;
 
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
-		
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{	
 		MDC.put("strategyName", this.getPrefixName());
 	}
 
-	protected void receiveMsg(byte[]  msg, String deviceId, Channel channel) {
-		
-		manageChannel(channel, deviceId);
-		kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
+	protected void sendMsg2Kafka(byte[]  msg, String deviceId, Channel channel) {
+		if (StringUtils.isNotBlank(deviceId)&&msg.length>0) {
+			manageChannel(channel, deviceId);
+			kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
+		}
 		//singleThreadPool.execute(() -> dataStorage(msg));
 	}
 

+ 6 - 4
src/main/java/com/tidecloud/dataacceptance/service/HexBinaryAcceptanceHandlerAdapter.java

@@ -33,12 +33,14 @@ public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboun
 	}
 
 	protected void printAcceptanceData(ByteBuf dataByteBuf,ChannelHandlerContext ctx) {
-		ByteBuf dataByteBufCopy = dataByteBuf.copy();
-		byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
-		dataByteBufCopy.readBytes(dataByteArray);
+		//ByteBuf dataByteBufCopy = dataByteBuf.copy();
+		dataByteBuf.markReaderIndex();
+		byte[] dataByteArray = new byte[dataByteBuf.readableBytes()];
+		dataByteBuf.readBytes(dataByteArray);
 		String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray);
 		logger.info("设备: [{}] 传入数据为 : {}", channelDeviceMap.get(ctx.channel()), printHexBinary);
-		dataByteBufCopy.release();
+		dataByteBuf.resetReaderIndex();
+		//dataByteBufCopy.release();
 	}
 
 	abstract protected void handle(ByteBuf in, Channel channel) throws Exception;

+ 1 - 1
src/main/java/com/tidecloud/dataacceptance/service/impl/BSJGpsServerHandler.java

@@ -67,7 +67,7 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 			//发送数据到kafka
 			final MsgHeader header = packageData.getMsgHeader();
 			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
-				receiveMsg(dataByteArray, packageData.getDeviceId(), channel);
+				sendMsg2Kafka(dataByteArray, packageData.getDeviceId(), channel);
 			}
 			this.processPackageData(packageData);
 		} catch (Exception e) {

+ 1 - 1
src/main/java/com/tidecloud/dataacceptance/service/impl/BingShuiGpsServerHandler.java

@@ -64,7 +64,7 @@ public class BingShuiGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter
 		
 		byte[] dataByteArray = new byte[in.readableBytes()];
 		in.readBytes(dataByteArray);
-		receiveMsg(dataByteArray,phone,channel);
+		sendMsg2Kafka(dataByteArray,phone,channel);
 
 
 		String serialStr = dataMap.get("S");

+ 1 - 1
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchServerHandler.java

@@ -53,7 +53,7 @@ public class WatchServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 		case "UD2":
 			byte[] dataByteArray = new byte[in.readableBytes()];
 			in.readBytes(dataByteArray);
-			receiveMsg(dataByteArray, deviceId, channel);
+			sendMsg2Kafka(dataByteArray, deviceId, channel);
 			// logger.info("正常存储设备信息:" + getDevice(msg).toString());
 			break;
 		case "LK":

+ 54 - 35
src/main/java/com/tidecloud/dataacceptance/service/impl/YiTongGpsServerHandler.java

@@ -4,11 +4,13 @@ import java.util.Date;
 
 import javax.xml.bind.DatatypeConverter;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
+import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder;
 import com.tidecloud.dataacceptance.common.CRCUtil;
 import com.tidecloud.dataacceptance.common.DateUtil;
 import com.tidecloud.dataacceptance.common.NumUtil;
@@ -29,7 +31,6 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -60,53 +61,68 @@ public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
 	@Override
 	protected void handle(ByteBuf in, Channel channel) throws Exception {
-		try {
-			int index = 0;
-			byte b = 0;
-			while (index < START_BITS) {
-				b = in.readByte();
-				if (START_BIT != b && START_BIT2 != b) {
-					channel.close();
+		if (in.isReadable()) {
+			in.markReaderIndex();
+			try {
+				int index = 0;
+				byte b = 0;
+				while (index < START_BITS) {
+					b = in.readByte();
+					if (START_BIT != b && START_BIT2 != b) {
+						channel.close();
+					}
+					index++;
 				}
-				index++;
-			}
-			int length = 0;
-			if (START_BIT == b) {
-				length = in.readByte() & 0xff;
-			} else {
-				length = in.readShort() & 0xffff;
+				int length = 0;
+				if (START_BIT == b) {
+					length = in.readByte() & 0xff;
+				} else {
+					length = in.readShort() & 0xffff;
+				}
+				handle(in, length, channel);
+			} catch (Exception e) {
+				logger.error(e.getMessage(), e);
 			}
-			handle(in, length, channel);
-		} catch (Exception e) {
-			logger.error(e.getMessage(), e);
 		}
 	}
 
+	private byte[] getOriginalData(ByteBuf in, String deviceId) {
+		if (StringUtils.isNotBlank(deviceId)) {
+			in.resetReaderIndex();
+			byte[] deviceArr = deviceId.getBytes();
+			int length = in.readableBytes();
+			byte[] dataByteArray = new byte[length + deviceArr.length];
+			in.readBytes(dataByteArray,0,in.readableBytes());
+			System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
+			return dataByteArray;
+		} else
+			return null;
+	}
+
 	private void handle(ByteBuf in, int length, Channel channel) throws Exception {
 		if (in.isReadable()) {
-			in = in.readBytes(length);
+			String deviceId = channelDeviceMap.get(channel);
+
 			byte msgType = in.readByte();
 			if (LOGIN_MSG == msgType) {
+
 				resolveLoginMSG(in, channel);
 			} else if (LOCATION_MSG == msgType) {
-				String deviceId = channelDeviceMap.get(channel);
-				if (deviceId == null) {
-					logger.info("链接管理失效。。。。。。。");
-				} else {
-					resolveLocationMSG(in, deviceId);
-				}
+				sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
+				// resolveLocationMSG(in, deviceId);
 			} else if (STATUS_MSG == msgType) {
 				reply(channel, STATUS_MSG);
 			} else if (WARNING_MSG == msgType) {
-				String deviceId = channelDeviceMap.get(channel);
-				resolveWarningMsg(in, deviceId);
+				// resolveWarningMsg(in, deviceId);
 				reply(channel, WARNING_MSG);
+				sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
 			} else if (CORRECT_TIME_MSG == msgType) {
 				reply(channel, CORRECT_TIME_MSG);
 			} else if (VOLTAGE_MSG == msgType) {
-				resolveVoltageMSG(in, channel);
+				sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
+				// resolveVoltageMSG(in, channel);
 			} else if (COMMAND_COPY_MSG == msgType) {
-				resolveCommandCopyMSG(in, channel, length);
+				// resolveCommandCopyMSG(in, channel, length);
 			} else {
 				logger.info("client send data without handle type ...");
 			}
@@ -166,7 +182,7 @@ public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 		// 写文件操作
 		String deviceStr = yiTongGPSDevice.buildDeviceStr();
 
-		FileUtils.dataStorage(deviceStr.getBytes(),dataPath,prefixName);
+		FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
 	}
 
 	private void resolveVoltageMSG(ByteBuf in, Channel channel) {
@@ -178,7 +194,7 @@ public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 			String date = DateUtil.formatDate2String(DateUtil.calculateByHour(new Date(), -8));
 			YiTongGPSDevice yiTongGPSDevice = buildYiTongGpsDevcie(voltageDouble, deviceId, date);
 			String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
-			FileUtils.dataStorage(deviceStr.getBytes(),dataPath,prefixName);
+			FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
 		}
 	}
 
@@ -240,7 +256,7 @@ public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 		yiTongGPSDevice.setDataType(1);
 		// 写文件操作
 		String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
-		FileUtils.dataStorage(deviceStr.getBytes(),dataPath,prefixName);
+		FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
 	}
 
 	private void reply(Channel channel, byte msgType) {
@@ -262,15 +278,18 @@ public class YiTongGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 	public void startAcceptor() {
 		EventLoopGroup bossGroup = new NioEventLoopGroup();
 		EventLoopGroup workerGroup = new NioEventLoopGroup();
-		byte[] splitBytes = new byte[] { 0x0D, 0x0A };
+		byte[] headerSplitBytes = new byte[] { 0x78, 0x78 };
+		byte[] headerSplitBytes1 = new byte[] { 0x79, 0x79 };
+		byte[] tailSplitBytes = new byte[] { 0x0D, 0x0A };
 		try {
 			ServerBootstrap b = new ServerBootstrap();
 			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
 					.childHandler(new ChannelInitializer<SocketChannel>() {
 						@Override
 						protected void initChannel(SocketChannel ch) throws Exception {
-							ch.pipeline()
-									.addLast(new DelimiterBasedFrameDecoder(65535, Unpooled.copiedBuffer(splitBytes)));
+							ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
+									Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
+									Unpooled.copiedBuffer(headerSplitBytes1)));
 							ch.pipeline().addLast(YiTongGpsServerHandler.this);
 						}
 					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);

+ 1 - 1
src/main/resources/prod/application.yml

@@ -76,7 +76,7 @@ acceptance:
     port: 7011
     dataFileDir: /home/service/collector_yitong/rawdata-car/
     handlerClass: com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler
-    enable: false    
+    enable: true    
    -
     name: yuguang
     topic: device-yuguang

+ 2 - 2
src/test/java/com/tidecloud/dataacceptance/HelloClient.java

@@ -49,13 +49,13 @@ public class HelloClient {
         ExecutorService singleThreadPool = Executors.newFixedThreadPool(20);
         
         int i =0 ;
-        while (i++<2000) 
+        while (i++<1) 
         {
         	
         	singleThreadPool.execute(()->{
         		 
                 try {
-        				client.connect("localhost", 7009);
+        				client.connect("localhost", 7011);
         				Thread.sleep(2000);
         			} catch (Exception e) {
         				// TODO Auto-generated catch block

+ 5 - 1
src/test/java/com/tidecloud/dataacceptance/HelloClientIntHandler.java

@@ -2,6 +2,8 @@ package com.tidecloud.dataacceptance;
 
 import java.util.Random;
 
+import javax.xml.bind.DatatypeConverter;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -76,7 +78,9 @@ public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {
    //     msg= DatatypeConverter.parseHexBinary(msg1);
 //        String printHexBinary = DatatypeConverter.printHexBinary(msg);
 //        System.out.println(printHexBinary);
-      
+       msg1 = "78781101075253367890024270003201000512790D0A7878222212050f03240acf03305ba00cd5bcc000048201cc00582f0036fa010300005633e20d0a";
+        //msg1= "";
+        msg= DatatypeConverter.parseHexBinary(msg1);
         ByteBuf encoded = ctx.alloc().buffer(4 * msg.length);  
        encoded.writeBytes(msg);  
         ctx.write(encoded);