rainbow954 há 7 anos atrás
pai
commit
35d96fa2c9

+ 5 - 0
pom.xml

@@ -72,6 +72,11 @@
   			<artifactId>commons-lang</artifactId>
   			<version>2.6</version>
   		</dependency>
+  		<dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <version>1.2.0.RELEASE</version>
+        </dependency>
 	</dependencies>
 
 	<build>

+ 28 - 0
src/main/java/com/tidecloud/dataacceptance/config/AcceptanceDeviceConfig.java

@@ -0,0 +1,28 @@
+package com.tidecloud.dataacceptance.config;
+
+import java.util.List;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * Created by ryan 
+ */
+@ConfigurationProperties(prefix = "acceptance.device")
+@Component
+public class AcceptanceDeviceConfig {
+	private List<DeviceConfig> deviceList;
+
+	public List<DeviceConfig> getDeviceList() {
+		return deviceList;
+	}
+
+	public void setDeviceList(List<DeviceConfig> deviceList) {
+		this.deviceList = deviceList;
+	}
+   
+	
+}
+
+
+

+ 85 - 0
src/main/java/com/tidecloud/dataacceptance/config/DeviceConfig.java

@@ -0,0 +1,85 @@
+package com.tidecloud.dataacceptance.config;
+
+public class DeviceConfig {
+
+	private String dataFileDir;
+	
+
+	private String name;
+	
+	private String handlerClass;
+	
+	private String topic;
+	
+	private Integer port;
+	
+	private String ip;
+
+	private boolean enable;
+
+
+	
+	public String getDataFileDir() {
+		return dataFileDir;
+	}
+
+	public void setDataFileDir(String dataFileDir) {
+		this.dataFileDir = dataFileDir;
+	}
+
+	
+
+	
+
+	public boolean isEnable() {
+		return enable;
+	}
+
+	public void setEnable(boolean enable) {
+		this.enable = enable;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	
+	public String getTopic() {
+		return topic;
+	}
+
+	public void setTopic(String topic) {
+		this.topic = topic;
+	}
+
+	public Integer getPort() {
+		return port;
+	}
+
+	public String getIp() {
+		return ip;
+	}
+
+	public void setIp(String ip) {
+		this.ip = ip;
+	}
+
+	public void setPort(Integer port) {
+		this.port = port;
+	}
+
+	public String getHandlerClass() {
+		return handlerClass;
+	}
+
+	public void setHandlerClass(String handlerClass) {
+		this.handlerClass = handlerClass;
+	}
+
+	
+
+}

+ 51 - 0
src/main/java/com/tidecloud/dataacceptance/config/KafkaProducerConfig.java

@@ -0,0 +1,51 @@
+package com.tidecloud.dataacceptance.config;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by vinson on 17/4/19.
+ */
+@Configuration
+@EnableKafka
+public class KafkaProducerConfig {
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String kafkaBrokers;
+
+    @Bean
+    public ProducerFactory<Integer, String> producerFactory() {
+        return new DefaultKafkaProducerFactory<>(producerConfigs());
+    }
+
+    @Bean
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
+        props.put(ProducerConfig.RETRIES_CONFIG, 2);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return props;
+    }
+
+    @Bean
+    public KafkaTemplate<Integer, String> kafkaTemplate() {
+        return new KafkaTemplate<Integer, String>(producerFactory());
+
+    }
+}

+ 153 - 0
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceDeviceService.java

@@ -0,0 +1,153 @@
+package com.tidecloud.dataacceptance.service;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Service;
+
+import com.tidecloud.dataacceptance.config.AcceptanceDeviceConfig;
+import com.tidecloud.dataacceptance.config.DeviceConfig;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
+/**
+ * Created by ryan
+ */
+@Service
+public class AcceptanceDeviceService implements ApplicationContextAware {
+	private static Logger logger = LoggerFactory.getLogger(AcceptanceDeviceService.class);
+	private ApplicationContext context;
+
+	@Autowired
+	private AcceptanceDeviceConfig acceptanceDeviceListConfig;
+
+	@PostConstruct
+	public void transfer() throws IOException, ParseException {
+
+		List<DeviceConfig> deviceListConfig = acceptanceDeviceListConfig.getDeviceList();
+		List<DeviceConfig> workDeviceList = new ArrayList<>();
+		for (DeviceConfig config : deviceListConfig) {
+			if (config.isEnable()) {
+				workDeviceList.add(config);
+			}
+		}
+
+		if (workDeviceList.size() > 0) {
+			ExecutorService singleThreadPool = new ThreadPoolExecutor(workDeviceList.size(), 20, 1L,
+					TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), Executors.defaultThreadFactory(),
+					new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
+
+			for (int i = 0; i < workDeviceList.size(); i++) {
+				DeviceConfig config = workDeviceList.get(i);
+				singleThreadPool.execute(() -> {
+					MDC.put("strategyName", config.getName());
+					try {
+						Class handlerClass = Class.forName(config.getHandlerClass());
+						AcceptanceInboundHandlerAdapter handler = (AcceptanceInboundHandlerAdapter) context
+								.getBean(handlerClass);
+						handler.setDataPath(config.getDataFileDir());
+						handler.setTopic(config.getTopic());
+						String ip = getIp("eth0");
+						handler.setIp(ip);
+						handler.setPort(config.getPort());
+						handler.setPrefixName(config.getName());
+						start(handler);
+					} catch (ClassNotFoundException e) {
+						logger.error(e.getMessage());
+					} catch (Exception ex) {
+						logger.error(ex.getMessage());
+					}
+
+				});
+			}
+		}
+
+	}
+
+	public static String getIp(String eth0) {
+		String hostAddress = "127.0.0.1";
+		try {
+			Enumeration<NetworkInterface> interfaces = null;
+			interfaces = NetworkInterface.getNetworkInterfaces();
+			while (interfaces.hasMoreElements()) {
+				NetworkInterface ni = interfaces.nextElement();
+				if (eth0.equalsIgnoreCase(ni.getName())) {
+					Enumeration<InetAddress> addresss = ni.getInetAddresses();
+					while (addresss.hasMoreElements()) {
+						InetAddress ip = addresss.nextElement();
+						if (ip != null && ip instanceof Inet4Address) {
+							if (ip.getHostAddress().equals("127.0.0.1")) {
+								continue;
+							}
+							return	hostAddress = ip.getHostAddress();
+						}
+					}
+				}
+			}
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		}
+		return hostAddress;
+	}
+
+	public void start(AcceptanceInboundHandlerAdapter handler) {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES));
+							ch.pipeline().addLast(handler);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+			ChannelFuture f = b.bind(handler.getPort()).sync();
+			logger.info("start accept service for {}, bind address {}:{}", handler.getPrefixName(), handler.getIp(),
+					handler.getPort());
+			f.channel().closeFuture().sync();
+		} catch (Exception e) {
+			logger.error(e.getMessage());
+		} finally {
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+
+	}
+
+	@Override
+	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+		this.context = applicationContext;
+	}
+
+}

+ 271 - 0
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceInboundHandlerAdapter.java

@@ -0,0 +1,271 @@
+package com.tidecloud.dataacceptance.service;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.FailureCallback;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.SuccessCallback;
+
+import com.smartsanitation.common.util.StringUtil;
+import com.tidecloud.dataacceptance.entity.ConnectMsg;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+
+public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
+	
+	protected String ip;
+	
+	protected Integer port;
+
+	protected String dataPath;
+
+	protected String prefixName ;
+	
+	private static final Logger logger = LoggerFactory.getLogger(AcceptanceInboundHandlerAdapter.class);
+	private static final Long TEN_M = 10485760l;
+	
+	private static String PREFIX_LINK = "s.";
+	private static String PREFIX_DEVICE = "d.";
+
+	public static Map<String, String> channelMap = new HashMap<String, String>();
+	public static Map<String, Channel> manageChannelMap = new HashMap<>();
+	public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
+	public static Map<String, String> commandCopy = new HashMap<>();
+	private static ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
+
+	private static ExecutorService kafkaSendThreadPool = Executors.newSingleThreadExecutor();
+	@Autowired
+	private JedisPool jedisPool;
+	@Autowired
+	private KafkaTemplate<Integer, String> kafkaTemplate;
+
+	private String topic;
+
+
+	@Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ByteBuf byteBuf = (ByteBuf)msg;
+        String str = byteBufferToString(byteBuf.nioBuffer());
+        logger.info("上传数据:{}", str);
+       
+        try {
+            reply(ctx, str);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+        	byteBuf.release();
+        }
+    }
+	abstract public void reply(ChannelHandlerContext ctx, String msg) throws Exception;
+	
+	protected void sendMsg(String msg,String deviceId) {
+    	kafkaSendThreadPool.execute(()->sendKakfaMsg(msg,deviceId));
+        singleThreadPool.execute(()->dataStorage(msg));
+    }
+
+
+
+	public void manageLink(Channel channel, String deviceId) {
+		String channelId = UUID.randomUUID().toString();
+		manageChannelMap.put(channelId, channel);
+		channelMap.put(channelId, deviceId);
+		channelMapOfChannelKey.put(deviceId, channel);
+		logger.info("链接管理,链接id [{}]", channelId);
+
+		ConnectMsg cMsg = new ConnectMsg(this.ip, channelId);
+		try (Jedis jedis = jedisPool.getResource()) {
+			jedis.select(15);
+			String insertKey = PREFIX_LINK + ip;
+			String selectKey = PREFIX_DEVICE + deviceId;
+
+			jedis.set(insertKey, deviceId);
+			jedis.set(selectKey, StringUtil.convert2String(cMsg));
+		} catch (Exception e) {
+			logger.error(e.getLocalizedMessage());
+		}
+	}
+
+
+
+
+	protected void sendKakfaMsg(String msg, String deviceId) {
+		Integer key = new Integer(deviceId.hashCode());
+		ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
+		
+		// 发送成功回调
+		SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
+			@Override
+			public void onSuccess(SendResult<Integer, String> result) {
+				// 成功业务逻辑
+				logger.info("发送kafka成功.deviceId:{}, msg:{}", key, msg);
+			}
+		};
+		// 发送失败回调
+		FailureCallback failureCallback = new FailureCallback() {
+			@Override
+			public void onFailure(Throwable ex) {
+				// 失败业务逻辑
+				logger.error("发送kafka失败.deviceId:{}, msg:{}", key, msg);
+			}
+		};
+		listenableFuture.addCallback(successCallback, failureCallback);
+
+	}
+
+	public void dataStorage(String deviceStr) {
+		File path = new File(dataPath);
+		File[] listFiles = path.listFiles();
+		boolean isTouch = true;
+		if (listFiles != null) {
+			for (File sonFile : listFiles) {
+				long len = sonFile.length();
+				if (len < TEN_M) {
+					// String fileName = dataPath + prefixName + new
+					// SimpleDateFormat("yyMMddHHmmss").format(new Date());
+					// File file = new File(fileName);
+					writeDevice2File(sonFile, deviceStr);
+					logger.info("正在写入数据: " + deviceStr);
+					isTouch = false;
+				}
+			}
+		}
+		if (isTouch) {
+			String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
+			File file = new File(fileName);
+			writeDevice2File(file, deviceStr);
+			logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:"
+					+ new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
+		}
+	}
+
+	public static void writeDevice2File(File file, String deviceStr) {
+		BufferedWriter bufferedWriter = null;
+		try {
+			bufferedWriter = new BufferedWriter(new FileWriter(file, true));
+			bufferedWriter.write(deviceStr);
+			bufferedWriter.newLine();
+			bufferedWriter.flush();
+		} catch (IOException e) {
+			logger.error(e.getMessage());
+		} finally {
+			if (bufferedWriter != null) {
+				try {
+					bufferedWriter.close();
+				} catch (IOException e) {
+					logger.error("close file[{}] failed: {}", file.getName(), e.getMessage());
+				}
+			}
+
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		cause.printStackTrace();
+		ctx.close();
+	}
+
+	@Override
+	public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+		saveChannel(ctx);
+	}
+
+	private void saveChannel(ChannelHandlerContext ctx) {
+	}
+
+	public static String byteBufferToString(ByteBuffer buffer) {
+		CharBuffer charBuffer = null;
+		try {
+			Charset charset = Charset.forName("UTF-8");
+			CharsetDecoder decoder = charset.newDecoder();
+			charBuffer = decoder.decode(buffer);
+			buffer.flip();
+			return charBuffer.toString();
+		} catch (Exception ex) {
+			ex.printStackTrace();
+			return null;
+		}
+	}
+
+	protected Integer getInteger(String str) {
+		if (str == null) {
+			return null;
+		}
+		return Integer.valueOf(str);
+	}
+
+	protected Double getDouble(String str) {
+		if (str == null) {
+			return null;
+		}
+		return Double.valueOf(str);
+	}
+
+	static byte[] int2bytes(int num) {
+		byte[] b = new byte[4];
+		// int mask=0xff;
+		for (int i = 0; i < 4; i++) {
+			b[3 - i] = (byte) (num >>> (24 - i * 8));
+		}
+		return b;
+	}
+
+	public String getDataPath() {
+		return dataPath;
+	}
+
+	public void setDataPath(String dataPath) {
+		this.dataPath = dataPath;
+	}
+
+	public String getTopic() {
+		return topic;
+	}
+
+	public void setTopic(String topic) {
+		this.topic = topic;
+	}
+	public String getIp() {
+		return ip;
+	}
+	public void setIp(String ip) {
+		this.ip = ip;
+	}
+	public Integer getPort() {
+		return port;
+	}
+	public void setPort(Integer port) {
+		this.port = port;
+	}
+	public String getPrefixName() {
+		return prefixName;
+	}
+	public void setPrefixName(String prefixName) {
+		this.prefixName = prefixName;
+	}
+
+}

+ 0 - 67
src/main/java/com/tidecloud/dataacceptance/service/AcceptanceService.java

@@ -1,67 +0,0 @@
-package com.tidecloud.dataacceptance.service;
-
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.PostConstruct;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import com.tidecloud.dataacceptance.service.impl.WatchServerHandler;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
-
-/**
- * Hello world!
- */
-@Component
-public class AcceptanceService {
-
-    @Value("${server.netty.port}")
-    private Integer port;
-    
-    @Autowired
-    private WatchServerHandler watchServcie;
-    
-    @PostConstruct
-    public void run() throws Exception {
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                EventLoopGroup bossGroup = new NioEventLoopGroup();
-                EventLoopGroup workerGroup = new NioEventLoopGroup();
-                try {
-                    ServerBootstrap b = new ServerBootstrap();
-                    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
-                            childHandler(new ChannelInitializer<SocketChannel>() {
-                                @Override
-                                protected void initChannel(SocketChannel ch) throws Exception {
-                                    ch.pipeline().addLast("idleStateHandler",
-                                            new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES));
-                                    ch.pipeline().addLast(watchServcie);
-                                }
-                            })
-                            .option(ChannelOption.SO_BACKLOG, 128)
-                            .childOption(ChannelOption.SO_KEEPALIVE, true);
-
-                    ChannelFuture f = b.bind(port).sync();
-                    f.channel().closeFuture().sync();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } finally {
-                    workerGroup.shutdownGracefully();
-                    bossGroup.shutdownGracefully();
-                }
-            }
-        }).start();
-      }
-}

+ 29 - 227
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchServerHandler.java

@@ -1,29 +1,15 @@
 package com.tidecloud.dataacceptance.service.impl;
 
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.smartsanitation.common.util.StringUtil;
 import com.tidecloud.dataacceptance.entity.Advice;
-import com.tidecloud.dataacceptance.entity.ConnectMsg;
 import com.tidecloud.dataacceptance.entity.Device;
-import com.tidecloud.dataacceptance.entity.HrtStart;
+import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -31,52 +17,23 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
 
 /**
  * Created by vinson on 2017/9/7.
  */
 @Sharable
-@Component
-public class WatchServerHandler extends ChannelInboundHandlerAdapter {
-    
-    private String dataPath = "/home/service/collector_feidong/rawdata/";
-    // private String dataPath = "D:\\work\\rawdata1\\";
+@Component(WatchServerHandler.name)
+public class WatchServerHandler extends AcceptanceInboundHandlerAdapter {
+     
+	public static final String name = "WatchServerHandler";
     private static final Logger logger = LoggerFactory.getLogger(WatchServerHandler.class);
-    private static final Long TEN_M = 10485760l;
-    private static final String prefixName = "feidong";
-    private static String PREFIX_LINK = "s.";
-    private static String PREFIX_DEVICE = "d.";
-    
-    public static Map<String, String> channelMap = new HashMap<String, String>();
-    public static Map<String, Channel> manageChannelMap = new HashMap<>();
-    public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
-    public static Map<String, String> commandCopy = new HashMap<>();
-    private static ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
-    @Autowired
-    private JedisPool jedisPool;
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        ByteBuf byteBuf = (ByteBuf)msg;
-        String str = byteBufferToString(byteBuf.nioBuffer());
-        logger.info("上传数据:{}", str);
-        
-        singleThreadPool.execute(()->dataStorage(str));
-	
-
-        //try {
-        //    reply(ctx, str);
-        //} catch (Exception e) {
-        //    logger.error(e.getMessage());
-        //}
+   
+    public WatchServerHandler() {
+    	System.out.println(name);
     }
-
-    private void reply(ChannelHandlerContext ctx, String msg) throws Exception {
+    public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
         logger.info("设备上传数据:" + msg);
         Advice advice = getAdevice(msg);
         String deviceId = advice.getDeviceId();
@@ -89,15 +46,9 @@ public class WatchServerHandler extends ChannelInboundHandlerAdapter {
         switch (adviceType) {
         
         case "UD":
-            Device deviceUD = getDevice(msg);
-            deviceUD.setDeviceId(deviceId);
-            dataStorage(deviceUD);
-            logger.info("正常存储设备信息:" + deviceUD.toString());
-            break;
+        case "heart":	
         case "UD2":
-            Device deviceUD2 = getDevice(msg);
-            deviceUD2.setDeviceId(deviceId);
-            dataStorage(deviceUD2);
+        	sendMsg(msg,deviceId);
             logger.info("正常存储设备信息:" + getDevice(msg).toString());
             break;
         case "LK":
@@ -106,66 +57,17 @@ public class WatchServerHandler extends ChannelInboundHandlerAdapter {
         case "UPLOAD":
             logger.info("device [{}] setting copy time success [{}]", deviceId, new Date());
             break;
-        case "heart":
-            dataStorageHartstart(deviceId, msg);
+       
         default:
             break;
         }
     }
     
+   
     
-    private void dataStorageHartstart(String deviceId, String msg) {
-        int startIndex = msg.indexOf("[");
-        int endIndex = msg.indexOf("]");
-        String data = msg.substring(startIndex + 1, endIndex);
-        String[] bodys = data.split(",");
-
-        HrtStart hrtStart = new HrtStart();
-        hrtStart.setDate(new Date());
-        hrtStart.setDeviceId(deviceId);
-        hrtStart.setHrtCount(getInteger(bodys[1]));
-        String deviceStr = HrtStart.buildHrtStart(hrtStart);
-        
-        File path = new File(dataPath);
-        File[] listFiles = path.listFiles();
-        boolean isTouch = true;
-        for (File sonFile : listFiles) {
-            long len = sonFile.length();
-            if (len < TEN_M) {
-               // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
-               // File file = new File(fileName);
-                writeDevice2File(sonFile, deviceStr);
-                logger.info("正在写入数据: " + deviceStr);
-                isTouch = false;
-            }
-        }
-        if (isTouch) {
-            String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
-            File file = new File(fileName);
-            writeDevice2File(file, deviceStr);
-            logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
-        }
-    }
+ 
 
-    private void manageLink(Channel channel, String deviceId) {
-        String channelId = UUID.randomUUID().toString();
-        manageChannelMap.put(channelId, channel);
-        channelMap.put(channelId, deviceId);
-        channelMapOfChannelKey.put(deviceId, channel);
-        logger.info("链接管理,链接id [{}]", channelId);
-        
-        ConnectMsg cMsg = new ConnectMsg("10.27.118.76", channelId);
-        try (Jedis jedis = jedisPool.getResource()) {
-            jedis.select(15);
-            String insertKey = PREFIX_LINK + "10.27.118.76";
-            String selectKey = PREFIX_DEVICE + deviceId;
-            
-            jedis.set(insertKey, deviceId);
-            jedis.set(selectKey, StringUtil.convert2String(cMsg));
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage());
-        }
-    }
+   
 
     private void normalReply(Advice advice, Channel channel) {
         String facotry = advice.getFacotry();
@@ -237,7 +139,7 @@ public class WatchServerHandler extends ChannelInboundHandlerAdapter {
          0                           [18]
          ...                       ...
      */
-    private Device getDevice(String msg) throws Exception {
+    protected Device getDevice(String msg) throws Exception {
         int startIndex = msg.indexOf("[");
         int endIndex = msg.indexOf("]");
         String data = msg.substring(startIndex + 1, endIndex);
@@ -265,126 +167,26 @@ public class WatchServerHandler extends ChannelInboundHandlerAdapter {
         return device;
     }
 
-    private void dataStorage(Device device) {
-        String deviceStr = Device.buildDeviceStr(device);
-	dataStorage(deviceStr);
-    }
-    
-    private void dataStorage(String deviceStr) {
-        File path = new File(dataPath);
-        File[] listFiles = path.listFiles();
-        boolean isTouch = true;
-	if (listFiles != null) {
-        for (File sonFile : listFiles) {
-            long len = sonFile.length();
-            if (len < TEN_M) {
-               // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
-               // File file = new File(fileName);
-                writeDevice2File(sonFile, deviceStr);
-                logger.info("正在写入数据: " + deviceStr);
-                isTouch = false;
-            }
-        }
-	}
-        if (isTouch) {
-            String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
-            File file = new File(fileName);
-            writeDevice2File(file, deviceStr);
-            logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
-        }
-    }
-    
-    public static void writeDevice2File(File file, String deviceStr){
-        BufferedWriter bufferedWriter = null;
-        try {
-            bufferedWriter = new BufferedWriter(new FileWriter(file, true));
-            bufferedWriter.write(deviceStr);
-            bufferedWriter.newLine();
-            bufferedWriter.flush();
-        } catch (IOException e) {
-            logger.error(e.getMessage());
-        } finally {
-            if (bufferedWriter != null) {
-                try {
-                    bufferedWriter.close();
-                } catch (IOException e) {
-                    logger.error("close file[{}] failed: {}", file.getName(), e.getMessage());
-                }
-            }
-
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        cause.printStackTrace();
-        ctx.close();
-    }
 
-    @Override
-    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
-        saveChannel(ctx);
-    }
-
-    private void saveChannel(ChannelHandlerContext ctx) {
-    }
 
     /**
      * 【Receive from 223.104.255.118 :61922】:[3G*3918197044*000D*LK,12642,0,93]
      */
     public static void main(String[] args) {
-        for (int i = 0; i < 100; i++) {
-            Device device = new Device();
-            device.setDeviceId("3918197044");
-            device.setElectric(11.2d);
-            device.setItemState("125");
-            device.setLat(24.4441);
-            device.setLng(114.223);
-            device.setSpeed(21.2);
-            device.setStep(12);
-            device.setTerminalState(12);
-            device.setTimestamp(new Date());
-            new WatchServerHandler().dataStorage(device);
-        }
+//        for (int i = 0; i < 100; i++) {
+//            Device device = new Device();
+//            device.setDeviceId("3918197044");
+//            device.setElectric(11.2d);
+//            device.setItemState("125");
+//            device.setLat(24.4441);
+//            device.setLng(114.223);
+//            device.setSpeed(21.2);
+//            device.setStep(12);
+//            device.setTerminalState(12);
+//            device.setTimestamp(new Date());
+//            new WatchServerHandler().dataStorage(device);
+//        }
     }
 
-    public static String byteBufferToString(ByteBuffer buffer) {
-        CharBuffer charBuffer = null;
-        try {
-            Charset charset = Charset.forName("UTF-8");
-            CharsetDecoder decoder = charset.newDecoder();
-            charBuffer = decoder.decode(buffer);
-            buffer.flip();
-            return charBuffer.toString();
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            return null;
-        }
-    }
-
-    private Integer getInteger(String str) {
-        if (str == null) {
-            return null;
-        }
-        return Integer.valueOf(str);
-    }
-
-    private Double getDouble(String str) {
-        if (str == null) {
-            return null;
-        }
-        return Double.valueOf(str);
-    }
-    
-    static byte[] int2bytes(int num)
-    {
-           byte[] b=new byte[4];
-           //int mask=0xff;
-           for(int i=0;i<4;i++){
-                b[3-i]=(byte)(num>>>(24-i*8));
-           }
-          return b;
-    }
-  
    
 }

+ 32 - 11
src/main/resources/application.yml

@@ -1,18 +1,39 @@
 spring:
+  application:
+    name: device-acceptance
+  kafka:
+    bootstrap-servers: 172.19.0.24:9092
   redis:
-    host: 10.25.48.128
-    password: tidecloudredis
-    timeout: 10000
+    host: 192.168.0.119
     port: 6379
-    max: 100
-    maxIdle: 10
-    minIdle: 3
-    maxWaitMills: 10000
-server:
-  netty:
-    port: 7009
+    timeout: 10000
+    password: tidecloudredis
+eureka:
+  client:
+    service-url:
+      defaultZone: http://192.168.0.118:12000/eureka
+  instance:
+    prefer-ip-address: true
   port: 16666
   localaddress: 10.27.118.76
+
+acceptance:
+ device:
+  deviceList:
+   -
+    name: watch
+    topic: device-watch
+    ip: 10.27.118.76
+    port: 7009
+    dataFileDir: /home/service/collector_watch/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.WatchServerHandler
+    enable: true
+
+      
 logging:
   config:
-    classpath: logback.xml
+    classpath: logback.xml
+
+
+
+

+ 30 - 0
src/main/resources/dev/application.yml

@@ -0,0 +1,30 @@
+spring:
+  application:
+    name: device-acceptance
+  kafka:
+    bootstrap-servers: 192.168.0.119:9092
+  redis:
+    host: 192.168.0.119
+    port: 6379
+    timeout: 10000
+    password: tidecloudredis
+eureka:
+  client:
+    service-url:
+      defaultZone: http://192.168.0.118:12000/eureka
+  instance:
+    prefer-ip-address: true
+server:
+  netty:
+    port: 7009
+  port: 16666
+  localaddress: 10.27.118.76
+logging:
+  config:
+    classpath: logback.xml
+  
+device:
+  topic: device-data
+
+
+

+ 64 - 49
src/main/resources/logback.xml

@@ -1,59 +1,74 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-    <property name="CONSOLE_PATTERN"
-              value="%gray(%d{MM-dd HH:mm:ss.SSS}) %highlight(%-5level) -- [%gray(%thread)] %cyan(%logger{26}:%line): %msg%n"/>
-    <property name="FILE_PATTERN"
-              value="%d{MM-dd HH:mm:ss.SSS} %-5level -- [%thread] %logger{26}:%line: %msg%n"/>
-    <appender name="sql" class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder>
-            <pattern>${FILE_PATTERN}</pattern>
-        </encoder>
-        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>/var/log/tidecloud/data-accept/watch/sql%d{yyyy-MM-dd}.log</fileNamePattern>
-            <!-- 保留 10天数据,默认无限-->
-            <MaxHistory>10</MaxHistory>
-        </rollingPolicy>
-    </appender>
-    <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder>
-            <pattern>${FILE_PATTERN}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-            <level>INFO</level>
-        </filter>
-        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>/var/log/tidecloud/data-accept/watch/info%d{yyyy-MM-dd}.log</fileNamePattern>
-            <MaxHistory>10</MaxHistory>
-        </rollingPolicy>
-    </appender>
-    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <File>/var/log/tidecloud/data-accept/watch/error.log</File>
-        <encoder>
-            <pattern>${FILE_PATTERN}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-            <level>ERROR</level>
-        </filter>
-        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-            <maxIndex>30</maxIndex>
-            <FileNamePattern>/var/log/tidecloud/data-accept/watch/error.log.%i</FileNamePattern>
-        </rollingPolicy>
-        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
-            <MaxFileSize>20MB</MaxFileSize>
-        </triggeringPolicy>
-    </appender>
+<configuration>   
+   <property name="LOG_FILE_ROOT_PATH"
+		value="/var/log/tidecloud/data-accept" />
+	<property name="CONSOLE_PATTERN"
+		value="%gray(%d{MM-dd HH:mm:ss.SSS}) %highlight(%-5level) -- [%gray(%thread)] %cyan(%logger{26}:%line): %msg%n" />
+	<property name="FILE_PATTERN"
+		value="%d{MM-dd HH:mm:ss.SSS} %-5level -- [%thread] %logger{26}:%line: %msg%n" />
+			
     <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
             <pattern>${FILE_PATTERN}</pattern>
         </encoder>
     </appender>
-    <logger name="com.tidecloud.motorcar.dao" additivity="false" level="DEBUG">
-        <appender-ref ref="CONSOLE"/>
-        <appender-ref ref="sql" />
-    </logger>
+  
+    
+    <appender name="STRATEGY-THREAD-INFO" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<key>strategyName</key>
+			<defaultValue>system</defaultValue>
+		</discriminator>
+		<sift>
+			<appender name="INFO_FILE_1"
+				class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+				<encoder>
+					<pattern>${FILE_PATTERN}</pattern>
+				</encoder>
+				<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+					<level>INFO</level>
+				</filter>
+				<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<fileNamePattern>${LOG_FILE_ROOT_PATH}/${strategyName}/info%d{yyyy-MM-dd}.log
+					</fileNamePattern>
+					<MaxHistory>10</MaxHistory>
+				</rollingPolicy>
+			</appender>
+			
+		</sift>
+	</appender>
+	<appender name="STRATEGY-THREAD-ERROR" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<key>strategyName</key>
+			<defaultValue>system</defaultValue>
+		</discriminator>
+		<sift>
+			<appender name="ERROR_FILE_1"
+				class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>${LOG_FILE_ROOT_PATH}/${strategyName}/error.log</File>
+				<encoder>
+					<pattern>${FILE_PATTERN}</pattern>
+				</encoder>
+				<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+					<level>ERROR</level>
+				</filter>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+					<maxIndex>30</maxIndex>
+					<FileNamePattern>${LOG_FILE_ROOT_PATH}/${strategyName}/error.log.%i
+					</FileNamePattern>
+				</rollingPolicy>
+				<triggeringPolicy
+					class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+					<MaxFileSize>20MB</MaxFileSize>
+				</triggeringPolicy>
+			</appender>
+		</sift>
+	</appender>
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>
-        <appender-ref ref="INFO_FILE"/>
-        <appender-ref ref="ERROR_FILE"/>
+       <appender-ref ref="STRATEGY-THREAD-ERROR" />
+		<appender-ref ref="STRATEGY-THREAD-INFO" />
     </root>
 </configuration>

+ 18 - 0
src/main/resources/prod/application.yml

@@ -0,0 +1,18 @@
+spring:
+  redis:
+    host: 10.25.48.128
+    password: tidecloudredis
+    timeout: 10000
+    port: 6379
+    max: 100
+    maxIdle: 10
+    minIdle: 3
+    maxWaitMills: 10000
+server:
+  netty:
+    port: 7009
+  port: 16666
+  localaddress: 10.27.118.76
+logging:
+  config:
+    classpath: logback.xml

+ 67 - 0
src/test/java/com/tidecloud/dataacceptance/HelloClient.java

@@ -0,0 +1,67 @@
+package com.tidecloud.dataacceptance;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.xml.bind.DatatypeConverter;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public class HelloClient {
+
+	public void connect(String host, int port) throws Exception {  
+        
+        EventLoopGroup workerGroup = new NioEventLoopGroup();  
+        try {  
+            Bootstrap b = new Bootstrap();  
+            b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)  
+                    .handler(new ChannelInitializer<SocketChannel>() {  
+                        @Override  
+                        public void initChannel(SocketChannel ch) throws Exception {  
+                            ch.pipeline().addLast(new HelloClientIntHandler());  
+                        }  
+                    });  
+  
+            // Start the client.  
+            ChannelFuture f = b.connect(host, port).sync();  
+           
+           f.channel().close();
+            // Wait until the connection is closed.  
+           f.channel().closeFuture().sync();  
+        } finally {  
+            workerGroup.shutdownGracefully();  
+        }  
+    }  
+  
+    public static void main(String[] args) throws Exception {  
+        HelloClient client = new HelloClient();  
+        
+      
+        ExecutorService singleThreadPool = Executors.newFixedThreadPool(20);
+        
+        int i =0 ;
+        while (i++<2) {
+        	
+        	singleThreadPool.execute(()->{
+        		 
+                try {
+        				client.connect("localhost", 7009);
+        				Thread.sleep(5000);
+        			} catch (Exception e) {
+        				// TODO Auto-generated catch block
+        				e.printStackTrace();
+        			}
+       		}); 
+        	
+        }
+        
+        System.out.println("end");
+    }  
+}

+ 69 - 0
src/test/java/com/tidecloud/dataacceptance/HelloClientIntHandler.java

@@ -0,0 +1,69 @@
+package com.tidecloud.dataacceptance;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {  
+    
+    private static Log logger = LogFactory.getLog(HelloClientIntHandler.class);  
+  
+    // 接收server端的消息,并打印出来  
+    @Override  
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
+          
+        logger.info("HelloClientIntHandler.channelRead");  
+        ByteBuf result = (ByteBuf) msg;  
+        byte[] result1 = new byte[result.readableBytes()];  
+        result.readBytes(result1);  
+        System.out.println("Server said:" + new String(result1));  
+        result.release();  
+    }  
+  
+    // 连接成功后,向server发送消息  
+    @Override  
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {  
+          
+        logger.info("HelloClientIntHandler.channelActive");  
+        //00 00 37 30 37 34 32 45 54 31 30 30 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 30 30 30 30 30 30 30 02 23 7E
+//        byte[] msg = { 0x7E, 0x01,0x00 ,0x00 ,0x25, 0x01, 0x23 ,0x12 ,0x31 ,0x23 ,0x12 ,0x00, 0x13, 0x00 ,0x00 ,
+//        		0x00 ,0x00, 0x37 ,0x30 ,0x37 ,0x34 ,0x32 ,0x45, 0x54 ,0x31, 0x30, 0x30, 0x00, 0x00 ,0x00 ,0x00 ,
+//        		0x00, 0x00, 0x00, 0x00 ,0x00, 0x00, 0x00 ,0x00 ,0x00 ,0x00 ,0x00 ,0x30 ,0x30 ,0x30 ,0x30 ,0x30 ,
+//        		0x30 ,0x30 ,0x02 ,0x23, 0x7E}; 
+        
+  
+//     byte[] msg = {0x7E, 0x02 , 0x00 , 0x00 , 0x39 , 0x01 , 0x23 , 0x12 , 0x31 , 0x23 , 0x12 , 0x00 , 0x1C , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x0C , 0x00 , 0x13 , 0x01 ,
+//    		  (byte)0x8D , 0x6E , (byte)0xE7 , 0x07 , 0x1B , 0x08 , (byte)0xE4 , 0x00 , 0x6B , 0x00 , 0x00 , 0x01 , 0x1D , 0x18 , 0x03 , 0x30 , 0x15 , 0x30 , 0x45 , 0x01 , 0x04 , 0x00 
+//    		  , 0x00 , 0x00 , 0x00 , 0x02 , 0x02 , 0x03 , (byte)0xE8 , 0x03 , 0x02 , 0x00 , 0x00 ,(byte) 0xE8 , 0x01 , 0x00 , 0x2B , 0x04 , 0x4B , 0x74 , 0x01 , (byte)0xD0 , 0x30 , 0x01 
+//    		  , 0x18 , 0x31 , 0x01 , 0x0C , 0x13 , 0x7E };
+     
+ 
+        
+        byte[] msg = {
+        		0x7E ,0x02 ,0x00 ,0x00 ,0x72 ,0x01 ,0x23 ,0x12 ,0x31 ,0x23 ,0x12 ,0x00 ,0x07 ,0x00 ,0x00 ,0x00 ,0x00 ,0x00 ,0x0C ,0x00 ,0x13 ,0x01 ,(byte)0x8D ,0x6E ,0x45 ,0x07 ,0x1B ,0x0D ,(byte)0xD8 ,0x00 ,(byte)0xF6 ,0x00 ,0x00 ,0x00 ,0x00 ,0x18 ,0x03
+        		,0x30 ,0x18 ,0x47 ,0x52 ,0x01 ,0x04 ,0x00 ,0x00 ,0x00 ,0x00 ,0x02 ,0x02 ,0x03 ,(byte)0xE8 ,0x03 ,0x02 ,0x00 ,0x00 ,(byte)0xFC ,0x3A ,0x01 ,0x02 ,0x00 ,0x00 ,0x02 ,0x0C ,0x00 ,0x00 ,0x00 ,0x1E ,0x00 ,0x00 ,0x00 ,0x1E ,0x00 ,0x00 ,0x00 
+        		,0x1E ,0x03 ,0x0C ,0x00 ,0x00 ,0x00 ,0x2D ,0x00 ,0x00 ,0x00 ,0x1E ,0x00 ,0x00 ,0x00 ,0x0F ,0x04 ,0x08 ,0x00 ,0x00 ,0x00 ,0x00 ,0x00 ,0x00 ,0x07 ,0x08 ,0x05 ,0x02 ,0x00 ,0x01 ,0x06 ,0x04 ,0x00 ,0x00 ,0x00 ,0x28 ,0x07 ,0x04 
+        		,0x00 ,0x00 ,0x00 ,0x00 ,0x2B ,0x04 ,0x4B ,0x7D ,0x02 ,0x01 ,(byte)0xD9 ,0x30 ,0x01 ,0x1C ,0x31 ,0x01 ,0x0F ,0x13 ,0x7E
+        		
+        };
+       String message = "[3G*3925160519*00A1*UD,040418,101823,A,29.719760,N,119.6245500,E,2.81,49.2,0.0,3,71,9,0,0,00000011,4,255,460,0,22463,34421,130,22463,24421,120,22575,14074,119,22463,34961,113,0,78.8]";
+//      byte[] a=  {0x02,0x00,0x00,0x35,0x01,0x23,0x12,0x31,0x23,0x12,0x07,0x5D,0x00,0x00,0x00,0x00,0x00,0x0C,0x00,0x13,0x01,
+//    		  0x8D,0x6D,0xB4,0x07,0x1B,0x08,
+//    		  0x8A,0x00,0x37,0x00,0x00,0x00,0xFC,0x18,0x03,0x30,0x14,0x27,0x37,0x01,0x04,0x00,0x00,0x00,0x00,0x03,0x02,0x00,0x00,
+//    		  0xE8,0x01,0x00,
+//    		  0x2B,0x04,0x4C,0x22,0x01,0x35,0x30,0x01,0x1C,0x31,0x01,0x0D, 0xE5};
+   // msg=  DatatypeConverter.parseHexBinary(a);
+//      String msg = "n,2,860854021358527,2018-03-21,12:00:20,31.888678,117.480155,0.025002,9942.554893,111.110001,z00575.675,s00107.600,f00000.000,t00000.000,x0062,e0,g0,m0,v0,N0,p0000,k0000000000";
+//       msg=msg+"\n";
+        msg=message.getBytes();
+        ByteBuf encoded = ctx.alloc().buffer(4 * msg.length);  
+       encoded.writeBytes(msg);  
+        ctx.write(encoded);  
+        ctx.flush();  
+    }  
+}