Ver Fonte

feature: 添加原始数据转发功能

vinson há 4 anos atrás
pai
commit
20ac107ac1

+ 5 - 0
pom.xml

@@ -137,6 +137,11 @@
             <artifactId>commons-io</artifactId>
             <version>2.6</version>
         </dependency>
+        <dependency>
+            <groupId>com.tidecloud.iot</groupId>
+            <artifactId>data-dispatcher-client</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 23 - 0
src/main/java/com/tidecloud/dataacceptance/config/KafkaProducerConfig.java

@@ -6,6 +6,7 @@ import java.util.Map;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -29,6 +30,11 @@ public class KafkaProducerConfig {
         return new DefaultKafkaProducerFactory<>(producerConfigs());
     }
 
+    @Bean
+    public ProducerFactory<Integer, String> stringProducerFactory() {
+        return new DefaultKafkaProducerFactory<>(stringProducerConfigs());
+    }
+
     @Bean
     public Map<String, Object> producerConfigs() {
         Map<String, Object> props = new HashMap<>();
@@ -42,9 +48,26 @@ public class KafkaProducerConfig {
         return props;
     }
 
+    @Bean
+    public Map<String, Object> stringProducerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
+        props.put(ProducerConfig.RETRIES_CONFIG, 2);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return props;
+    }
+
     @Bean
     public KafkaTemplate<Integer, byte[]> kafkaTemplate() {
         return new KafkaTemplate<Integer, byte[]>(producerFactory());
+    }
 
+    @Bean
+    public KafkaTemplate<Integer, String> IntegerStringkafkaTemplate() {
+        return new KafkaTemplate<Integer, String>(stringProducerFactory());
     }
 }

+ 35 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/BSJGpsServerHandler.java

@@ -2,10 +2,15 @@ package com.tidecloud.dataacceptance.service.impl;
 
 import java.util.concurrent.TimeUnit;
 
+import com.smartsanitation.common.util.StringUtil;
+import com.tidecloud.iot.vo.MsgWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Scope;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Component;
 
 import com.tidecloud.dataacceptance.codec.MsgDecoder;
@@ -34,6 +39,9 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.timeout.IdleStateHandler;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.xml.bind.DatatypeConverter;
 
 /**
  * @author cdk
@@ -49,6 +57,11 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 
 	private TerminalMsgProcessService msgProcessService = new TerminalMsgProcessService();
 
+	private static final String originalTopic = "msg_for_dispatche";
+
+	@Autowired
+	private KafkaTemplate<Integer, String> kafkaTemplate;
+
 	@Override
 	public void handle(ByteBuf dataByteBuf, Channel channel) throws Exception {
 	
@@ -68,6 +81,7 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 			packageData.setDeviceId(deviceId);
 			//发送数据到kafka
 			final MsgHeader header = packageData.getMsgHeader();
+			sendOrginalMsg(dataByteArray,  packageData.getDeviceId(), header);
 			if (Constants.MSG_TERMINAL_LOCATION_INFO_UPLOAD == header.getMsgId()
 					|| Constants.MSG_TERMINAL_LOCATION_INFO_BATCH_UPLOAD == header.getMsgId()) {
 				sendMsg2Kafka(dataByteArray, packageData.getDeviceId(), channel);
@@ -79,6 +93,27 @@ public class BSJGpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
 		
 	}
 
+	private void sendOrginalMsg(byte[] dataByteArray, String deviceId, MsgHeader header) {
+		final int msgId = header.getMsgId();
+		boolean isInitial = Constants.MSG_TERMINAL_REGISTER.equals(msgId);
+		MsgWrapper msgWrapper = new MsgWrapper(deviceId, DatatypeConverter.printHexBinary(dataByteArray), isInitial);
+		msgWrapper.setProtocolCode(0);
+		final Integer key = deviceId.hashCode();
+		kafkaTemplate.send(originalTopic, key, StringUtil.convert2String(msgWrapper))
+				.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
+					@Override
+					public void onFailure(Throwable ex) {
+						logger.error(ex.getMessage(), ex);
+					}
+
+					@Override
+					public void onSuccess(SendResult<Integer, String> result) {
+						logger.info("send original msg success");
+					}
+				});
+
+	}
+
 	private void processPackageData(PackageData packageData) {
 		final MsgHeader header = packageData.getMsgHeader();