Ver Fonte

恒盛新版协议工牌数据接入

liuguokai_cloud há 5 anos atrás
pai
commit
94fb86342e

+ 208 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/HengShengWatchServerHandler.java

@@ -0,0 +1,208 @@
+package com.tidecloud.dataacceptance.service.impl;
+
+import com.accept.client.DeviceMsgClient;
+import com.tidecloud.dataacceptance.entity.Advice;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 恒盛互通工牌
+ */
+@Sharable
+@Scope("prototype")
+@Component(HengShengWatchServerHandler.name)
+public class HengShengWatchServerHandler extends HexBinaryAcceptanceHandlerAdapter {
+
+    public static final String name = "HengShengWatchServerHandler";
+    private static final Logger logger = LoggerFactory.getLogger(HengShengWatchServerHandler.class);
+    private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    @Autowired
+    private DeviceMsgClient deviceMsgClient;
+
+    @Override
+    protected void handle(ByteBuf in, Channel channel) throws Exception {
+        byte[] req = new byte[in.readableBytes()];
+        in.readBytes(req);
+        String msg = new String(req, "UTF-8");
+        logger.info("传入数据:》》》》》》》》》" + msg);
+        Advice advice = setAdvice(msg);
+        if (advice == null) {
+            return;
+        }
+        switch (advice.getAdviceType()) {
+            case "V1": // 位置上传(GPS)
+                sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
+                logger.info("位置上传[V1]:" + advice.toString());
+                break;
+            case "TJJB": //上传计步器
+                sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
+                logger.info("计步信息[TJJB]:" + advice.toString());
+                break;
+            case "H8": // 心跳协议
+                String h8 = "H8,1";
+                normalReply(advice, channel, h8);
+                break;
+            case "WIFIP": // 位置上传(多WIFI加基站)这个设备使用不到WIFIP功能
+                sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
+                logger.info("位置上传[WIFIP]:" + advice.toString());
+                break;
+            default: // 其他
+                logger.info("client send data without handle type ...");
+                //normalReply(advice, channel, advice.getAdviceType());
+                break;
+        }
+    }
+
+    protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
+        ByteBuf dataByteBufCopy = dataByteBuf.copy();
+        byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
+        dataByteBufCopy.readBytes(dataByteArray);
+        dataByteBufCopy.release();
+    }
+
+    /**
+     * 设置终端 上报频率
+     *
+     * @param advice
+     * @param channel
+     */
+    protected void setUpload(Advice advice, Channel channel) {
+        String content = "UPLOAD,1";// 一分钟一次
+        Date loginTime = new Date();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                Date currentTime = new Date();
+                try {
+                    Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
+                    if (secondsSinceLogin < 5L) {
+                        TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
+                    }
+                    normalReply(advice, channel, content);
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+        });
+    }
+
+    /**
+     * 回复
+     * *HQ,7502000001,H8#
+     * *HQ,7502000001,V8#
+     */
+    private void normalReply(Advice advice, Channel channel, String content) {
+        String factory = advice.getFacotry();
+        String deviceId = advice.getDeviceId();
+        String adviceType = "V8";
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append("*");
+        replyCommand.append(factory).append(",");
+        replyCommand.append(deviceId).append(",");
+        replyCommand.append(adviceType);// 指令内容标记
+        replyCommand.append("#");
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
+    }
+
+    //使用1字节就可以表示b
+    public static String numToHex8(int b) {
+        return String.format("%02x", b);//2表示需要两个16进行数
+    }
+
+    //需要使用2字节表示b
+    public static String numToHex16(int b) {
+        return String.format("%04x", b);
+    }
+
+    public static String numToHex32(int b) {
+        return String.format("%08x", b);
+    }
+
+    /**
+     * 解析 Advice 数据
+     *
+     * @param msg
+     * @return
+     */
+    private Advice setAdvice(String msg) {
+
+        Advice advice = new Advice();
+        if (msg != null) {
+            try {
+                //厂商标识	设备ID	指令流水号	内容长度	指令内容
+                String message = String.valueOf(msg);
+                int startIndex = message.indexOf("*");
+                int endIndex = message.indexOf("#");
+                String data = message.substring(startIndex + 1, endIndex);
+                String[] bodys = data.split("\\,");
+                advice.setFacotry(bodys[0]);// 厂商
+                advice.setDeviceId(bodys[1]);// 设备Id
+                String[] contents = bodys[2].split(",");//获取内容
+                advice.setAdviceType(contents[0]);// 标记
+                return advice;
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void startAcceptor() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .option(ChannelOption.SO_BACKLOG, 1024)
+                    .handler(new LoggingHandler(LogLevel.INFO))
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        protected void initChannel(SocketChannel ch) {
+                            ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
+                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
+                            ch.pipeline().addLast(HengShengWatchServerHandler.this);
+                        }
+                    });
+            ChannelFuture f = b.bind(port).sync();
+            logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+                    this.getPort());
+            f.channel().closeFuture().sync();
+        } catch (Exception ex) {
+            logger.warn(ex.getMessage(), ex);
+        } finally {
+            cleanRedisLinkData();
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+}

+ 8 - 0
src/main/resources/dev/application.yml

@@ -152,6 +152,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.TianXiGPServerHandler
     enable: true
+   -
+    name: hengsheng_watch
+    topic: device-hengsheng-watch
+    ip: 10.25.19.87
+    port: 7528
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.HengShengWatchServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml

+ 9 - 0
src/main/resources/prod/application.yml

@@ -152,6 +152,15 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.TianXiGPServerHandler
     enable: true
+   -
+    name: hengsheng_watch
+    topic: device-hengsheng-watch
+    ip: 10.25.19.87
+    port: 7528
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.HengShengWatchServerHandler
+    enable: true
+
 logging:
   config:
     classpath: logback.xml

+ 9 - 1
src/main/resources/test/application.yml

@@ -6,7 +6,7 @@ spring:
   profiles:
     active: all
   redis:
-    host: 10.25.48.128
+    host: 192.168.0.119
     password: tidecloudredis
     timeout: 10000
     port: 6379
@@ -152,6 +152,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.TianXiGPServerHandler
     enable: true
+   -
+    name: hengsheng_watch
+    topic: device-hengsheng-watch
+    ip: 10.25.19.87
+    port: 7528
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.HengShengWatchServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml