Ver Fonte

向kafka发送byte[]

rainbow954 há 7 anos atrás
pai
commit
dfebe1e3e1

+ 5 - 0
pom.xml

@@ -69,6 +69,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-redis</artifactId>
         </dependency>
+         <dependency>
+			<groupId>com.tidecloud</groupId>
+			<artifactId>data-common</artifactId>
+			<version>0.0.1-SNAPSHOT</version>
+		</dependency>
 	    <!-- <dependency>
 	        <groupId>io.springfox</groupId>
 	    	<artifactId>springfox-swagger2</artifactId>

+ 12 - 13
src/main/java/com/tidecloud/dataacceptance/config/KafkaProducerConfig.java

@@ -1,22 +1,21 @@
 package com.tidecloud.dataacceptance.config;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
 import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.*;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
 
 /**
- * Created by vinson on 17/4/19.
+ * Created by ryan on 18/4/27.
  */
 @Configuration
 @EnableKafka
@@ -26,7 +25,7 @@ public class KafkaProducerConfig {
     private String kafkaBrokers;
 
     @Bean
-    public ProducerFactory<Integer, String> producerFactory() {
+    public ProducerFactory<Integer, byte[]> producerFactory() {
         return new DefaultKafkaProducerFactory<>(producerConfigs());
     }
 
@@ -39,13 +38,13 @@ public class KafkaProducerConfig {
         props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         return props;
     }
 
     @Bean
-    public KafkaTemplate<Integer, String> kafkaTemplate() {
-        return new KafkaTemplate<Integer, String>(producerFactory());
+    public KafkaTemplate<Integer, byte[]> kafkaTemplate() {
+        return new KafkaTemplate<Integer, byte[]>(producerFactory());
 
     }
 }

+ 14 - 6
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceInboundHandlerAdapter.java

@@ -20,6 +20,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import javax.xml.bind.DatatypeConverter;
+
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,8 +92,9 @@ public  class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
 	private static ExecutorService kafkaSendThreadPool = Executors.newSingleThreadExecutor();
 	@Autowired
 	private JedisPool jedisPool;
+	
 	@Autowired
-	private KafkaTemplate<Integer, String> kafkaTemplate;
+	private KafkaTemplate<Integer, byte[]> kafkaTemplate;
 
 	private String topic;
 
@@ -100,7 +103,7 @@ public  class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
 		MDC.put("strategyName", this.getPrefixName());
 	}
 
-	protected void receiveMsg(String msg, String deviceId, Channel channel) {
+	protected void receiveMsg(byte[]  msg, String deviceId, Channel channel) {
 		
 		manageChannel(channel, deviceId);
 		kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
@@ -133,15 +136,20 @@ public  class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
 		}
 	}
 
-	protected void sendKakfaMsg(String msg, String deviceId) {
+
+
+	protected void sendKakfaMsg(byte[] dataByteArray,  String deviceId) {
 		Integer key = new Integer(deviceId.hashCode());
 		MDC.put("strategyName", this.getPrefixName());
-		ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
+		
+		
+		String msg = DatatypeConverter.printHexBinary(dataByteArray);
+		ListenableFuture<SendResult<Integer, byte[]>> listenableFuture = kafkaTemplate.send(topic, key, dataByteArray);
 
 		// 发送成功回调
-		SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
+		SuccessCallback<SendResult<Integer,  byte[]>> successCallback = new SuccessCallback<SendResult<Integer,  byte[]>>() {
 			@Override
-			public void onSuccess(SendResult<Integer, String> result) {
+			public void onSuccess(SendResult<Integer,  byte[]> result) {
 				MDC.put("strategyName", AcceptanceInboundHandlerAdapter.this.getPrefixName());
 				// 成功业务逻辑
 				logger.info("发送kafka成功.deviceId:{}, msg:{}", key, msg);

+ 4 - 1
src/main/java/com/tidecloud/dataacceptance/service/HexBinaryAcceptanceHandlerAdapter.java

@@ -15,6 +15,9 @@ public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboun
 	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 		super.channelRead(ctx, msg);
 		ByteBuf in = (ByteBuf) msg;
+		if (!in.isReadable()) {
+			return;
+		}
 		printAcceptanceData(in,ctx);
 		
 		try {
@@ -34,7 +37,7 @@ public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboun
 		byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
 		dataByteBufCopy.readBytes(dataByteArray);
 		String printHexBinary = DatatypeConverter.printHexBinary(dataByteArray);
-		logger.info("设备: [{}] 传入数据为 : [{}]", channelDeviceMap.get(ctx.channel()), printHexBinary);
+		logger.info("设备: [{}] 传入数据为 : {}", channelDeviceMap.get(ctx.channel()), printHexBinary);
 		dataByteBufCopy.release();
 	}
 

+ 7 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/BSJGpsServerHandler.java

@@ -62,6 +62,13 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 			PackageData packageData = this.decoder.bytes2PackageData(dataByteArrayDoEscape);
 			// link manage
 			packageData.setChannel(channel);
+			
+			packageData.setDeviceId(packageData.getMsgHeader().getTerminalPhone());
+			//发送数据到kafka
+			final MsgHeader header = packageData.getMsgHeader();
+			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()) {
+				receiveMsg(dataByteArray, packageData.getDeviceId(), channel);
+			}
 			this.processPackageData(packageData);
 		} catch (Exception e) {
 			logger.error(e.getLocalizedMessage());

+ 13 - 5
src/main/java/com/tidecloud/dataacceptance/service/impl/BingShuiGpsServerHandler.java

@@ -10,14 +10,13 @@ import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
-import com.tidecloud.dataacceptance.service.StringAcceptanceHandlerAdapter;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -27,11 +26,19 @@ import io.netty.util.concurrent.GenericFutureListener;
 @Component
 @Scope("prototype")
 @ChannelHandler.Sharable
-public class BingShuiGpsServerHandler extends StringAcceptanceHandlerAdapter {
+public class BingShuiGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
 	private static final Logger logger = LoggerFactory.getLogger(BingShuiGpsServerHandler.class);
 
 	public void handle(String oringalData, Channel channel) {
+		
+	}
+
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		ByteBuf byteBuf = (ByteBuf) in;
+		String oringalData = byteBufferToString(byteBuf.nioBuffer());
+		logger.info("接入数据:{}", oringalData);
 		Map<String, String> dataMap = new HashMap<>();
 
 		String[] dataArray = oringalData.split("\\s+");
@@ -55,8 +62,9 @@ public class BingShuiGpsServerHandler extends StringAcceptanceHandlerAdapter {
 			}
 		}
 		
-		
-		receiveMsg(oringalData,phone,channel);
+		byte[] dataByteArray = new byte[in.readableBytes()];
+		in.readBytes(dataByteArray);
+		receiveMsg(dataByteArray,phone,channel);
 
 
 		String serialStr = dataMap.get("S");

+ 12 - 4
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchServerHandler.java

@@ -13,7 +13,7 @@ import org.springframework.stereotype.Component;
 import com.tidecloud.dataacceptance.entity.Advice;
 import com.tidecloud.dataacceptance.entity.Device;
 import com.tidecloud.dataacceptance.entity.LbsInfo;
-import com.tidecloud.dataacceptance.service.StringAcceptanceHandlerAdapter;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -29,12 +29,16 @@ import io.netty.util.concurrent.GenericFutureListener;
 @Sharable
 @Scope("prototype")
 @Component(WatchServerHandler.name)
-public class WatchServerHandler extends StringAcceptanceHandlerAdapter {
+public class WatchServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
 	public static final String name = "WatchServerHandler";
 	private static final Logger logger = LoggerFactory.getLogger(WatchServerHandler.class);
 
-	public void handle(String msg, Channel channel) throws Exception {
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		ByteBuf byteBuf = (ByteBuf) in;
+		String msg = byteBufferToString(byteBuf.nioBuffer());
+		logger.info("接入数据:{}", msg);
 		Advice advice = getAdevice(msg);
 		String deviceId = advice.getDeviceId();
 		String adviceType = advice.getAdviceType();
@@ -44,7 +48,9 @@ public class WatchServerHandler extends StringAcceptanceHandlerAdapter {
 		case "UD":
 		case "heart":
 		case "UD2":
-			receiveMsg(msg, deviceId, channel);
+			byte[] dataByteArray = new byte[in.readableBytes()];
+			in.readBytes(dataByteArray);
+			receiveMsg(dataByteArray, deviceId, channel);
 			//logger.info("正常存储设备信息:" + getDevice(msg).toString());
 			break;
 		case "LK":
@@ -59,6 +65,7 @@ public class WatchServerHandler extends StringAcceptanceHandlerAdapter {
 		}
 	}
 
+
 	private void normalReply(Advice advice, Channel channel) {
 		String facotry = advice.getFacotry();
 		String adviceType = advice.getAdviceType();
@@ -200,4 +207,5 @@ public class WatchServerHandler extends StringAcceptanceHandlerAdapter {
 		return lbsList;
 	}
 
+	
 }

+ 8 - 8
src/test/java/com/tidecloud/dataacceptance/HelloClientIntHandler.java

@@ -52,7 +52,7 @@ public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {
         		
         };
         String message = "[3G*3925079946*0116*UD2,070418,155939,V,38.034930,N,114.4015300,E,0.00,0.0,0.0,0,60,52,4,0,00000010,7,255,460,0,12447,45388,132,12500,40896,127,12500,10589,125,12500,40897,122,13000,40087,121,12500,47974,120,12500,49241,120,2,TP-LINK_5AEC,b0:95:8e:d7:5a:ec,-94,guojiahuan,50:bd:5f:7b:af:7c,-85,13.1]";
-		
+        msg = message.getBytes();
       // String message = "[3G*3925160519*00A1*UD,040418,101823,A,29.719760,N,119.6245500,E,2.81,49.2,0.0,3,71,9,0,0,00000011,4,255,460,0,22463,34421,130,22463,24421,120,22575,14074,119,22463,34961,113,0,78.8]";
 //      byte[] a=  {0x02,0x00,0x00,0x35,0x01,0x23,0x12,0x31,0x23,0x12,0x07,0x5D,0x00,0x00,0x00,0x00,0x00,0x0C,0x00,0x13,0x01,
 //    		  0x8D,0x6D,0xB4,0x07,0x1B,0x08,
@@ -62,13 +62,13 @@ public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {
    // msg=  DatatypeConverter.parseHexBinary(a);
 //      String msg = "n,2,860854021358527,2018-03-21,12:00:20,31.888678,117.480155,0.025002,9942.554893,111.110001,z00575.675,s00107.600,f00000.000,t00000.000,x0062,e0,g0,m0,v0,N0,p0000,k0000000000";
 //       msg=msg+"\n";
-        String msg1 = "7e00020000010000000001007F7D017e";
-      //  String msg1 ="7e0102000A010000000001007D0230303030303030343434737e";
-      //  message = "[3G*3925160532*00CB*UD,100418,065535,V,30.632577,N,120.6878867,E,0.00,0.0,0.0,0,100,98,0,0,00000010,7,255,460,0,6234,57713,148,6234,57714,129,6234,57683,124,6234,57698,123,6234,57697,122,6234,57457,119,6234,57729,118,0,16.6]";
-       msg= DatatypeConverter.parseHexBinary(msg1);
-        
-        String printHexBinary = DatatypeConverter.printHexBinary(msg);
-        System.out.println(printHexBinary);
+          //  String msg1 ="7e0102000A010000000001007D0230303030303030343434737e";
+    //    String msg1 = "[3G*3925160532*00CB*UD,100418,065535,V,30.632577,N,120.6878867,E,0.00,0.0,0.0,0,100,98,0,0,00000010,7,255,460,0,6234,57713,148,6234,57714,129,6234,57683,124,6234,57698,123,6234,57697,122,6234,57457,119,6234,57729,118,0,16.6]";
+     
+        String msg1 = "7e0200004401414218432307D200000000000000030243191806D316180000015E00A2180506235959010400016C3DEB20000800233032302E3633000C00B2898607B609170001687400060089FFFFFFFF947e";
+        msg= DatatypeConverter.parseHexBinary(msg1);
+//        String printHexBinary = DatatypeConverter.printHexBinary(msg);
+//        System.out.println(printHexBinary);
       
         ByteBuf encoded = ctx.alloc().buffer(4 * msg.length);  
        encoded.writeBytes(msg);