Ver Fonte

卡牌接入

jianghouwei há 7 anos atrás
pai
commit
0a6a6840b7

+ 3 - 0
src/main/java/com/tidecloud/dataacceptance/service/HexBinaryAcceptanceHandlerAdapter.java

@@ -10,6 +10,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 
+/**
+ *  16 进制 2进制  抽象
+ */
 public abstract class HexBinaryAcceptanceHandlerAdapter extends AcceptanceInboundHandlerAdapter {
 	private static final Logger logger = LoggerFactory.getLogger(HexBinaryAcceptanceHandlerAdapter.class);
 	@Override

+ 256 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/GK309GpsServerHandler.java

@@ -0,0 +1,256 @@
+package com.tidecloud.dataacceptance.service.impl;
+
+import com.tidecloud.dataacceptance.codec.HeaderTailDelimiterFrameDecoder;
+import com.tidecloud.dataacceptance.common.CRCUtil;
+import com.tidecloud.dataacceptance.common.DateUtil;
+import com.tidecloud.dataacceptance.common.NumUtil;
+import com.tidecloud.dataacceptance.entity.YiTongGPSDevice;
+import com.tidecloud.dataacceptance.entity.YiTongGpsForWarnDevice;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+import com.tidecloud.dataacceptance.util.FileUtils;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import javax.xml.bind.DatatypeConverter;
+import java.util.Date;
+
+/**
+ * @author jhw
+ * GK309 卡牌数据
+ */
+@Component
+@ChannelHandler.Sharable
+@Scope("prototype")
+public class GK309GpsServerHandler extends HexBinaryAcceptanceHandlerAdapter {
+
+    /**
+     * 协议信息数据
+     * [ 起始位|包长度|协议号|信息内容|信息序列号|错误校验|停止位 ]
+     */
+    private static final Logger logger = LoggerFactory.getLogger(GK309GpsServerHandler.class);
+
+    // 起始位
+    private static final Integer START_BITS = 2;// 起始位长度
+    private static final Byte START_BIT = 0x78; //单字节长度数据包
+    private static final Byte START_BIT2 = 0X79;// 双字节长度数据包
+
+//    private static final int START_BIT = 120; //单字节长度数据包
+//    private static final int START_BIT2 = 121;// 双字节长度数据包
+
+    // 协议号
+    private static final byte LOGIN_MSG = 0x01;// 登录 (必须回复)
+    private static final byte STATUS_MSG = 0x13;// 心跳,状态信息包  (必须回复)
+    private static final byte LOCATION_MSG = 0x10;// 一般GPS 位置信息上传
+    private static final byte CORRECT_TIME_MSG = 0x1F;// 校验时间
+
+    private static final Integer DATA_SIZE = 6;
+
+    @Override
+    protected void handle(ByteBuf in, Channel channel) throws Exception {
+        if (in.isReadable()) {
+            in.markReaderIndex();
+            try {
+                byte b = 0;
+                int index = 0;
+                while (index < START_BITS) {
+                    b = in.readByte();
+                    if (START_BIT2 != b && START_BIT != b) {
+                        channel.close();
+                    }
+                    index++;
+                }
+                /*  单字节长度或者双字节长度 */
+                int length = 0;
+                if (START_BIT == b) {
+                    length = in.readByte() & 0xff; // 一个字节
+                } else  {
+                    length = in.readShort() & 0xffff;// 两个字节
+                }
+                handle(in, length, channel);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    private void handle(ByteBuf in, int length, Channel channel) throws Exception {
+        if (in.isReadable()) {
+            String deviceId = channelDeviceMap.get(channel);
+            if (deviceId != null) {
+                MDC.put(MDC_DEVICEID, deviceId);
+            }
+            byte msgType = in.readByte();// 协议号 1个字节
+            if (LOGIN_MSG == msgType) {
+                resolveLoginMSG(in, channel);
+            } else if (STATUS_MSG == msgType) {
+                reply(channel, STATUS_MSG, "心跳包");
+            } else if (LOCATION_MSG == msgType) {
+                logger.info("GPS 定位包");
+               // resolveLocationMSG(in,deviceId);
+                sendMsg2Kafka(getOriginalData(in, deviceId), deviceId, channel);
+            } else if (CORRECT_TIME_MSG == msgType) {
+                reply(channel, CORRECT_TIME_MSG, "校时包");
+            } else {
+                logger.info("client send data without handle type ...");
+            }
+        }
+
+    }
+
+    /**
+     * 登录管理
+     *
+     * @param in
+     * @param channel
+     */
+    private void resolveLoginMSG(ByteBuf in, Channel channel) {
+        /**  终端ID|类型识别码|扩展位*/
+        byte[] deviceIdBytes = new byte[8];// 终端ID [8位]
+        in.readBytes(deviceIdBytes);
+        String deviceId = DatatypeConverter.printHexBinary(deviceIdBytes);
+        String deviceIdInMap = channelDeviceMap.get(channel);
+        MDC.put(MDC_DEVICEID, deviceId);
+        if (!deviceId.equals(deviceIdInMap)) {
+            manageChannel(channel, deviceId);
+        }
+        reply(channel, LOGIN_MSG, "登录包");
+    }
+
+
+    /**
+     * 回复包
+     *
+     * @param channel
+     * @param msgType
+     */
+    private void reply(Channel channel, byte msgType, String msg) {
+
+        /**
+         * 起始位[]长度[] 协议号[]	序列号[]CRC校验[]停止位[2]
+         */
+        ByteBuf buffer = Unpooled.buffer();
+        byte[] crcBytes = new byte[]{0x05, msgType, 0x00, 0x05}; // CRC校验数据
+        int doCrc = CRCUtil.do_crc(65535, crcBytes);//CRC-ITU
+        byte[] intToByte = NumUtil.intToByte(doCrc, 2);
+        byte[] bytes = new byte[]{0x78, 0x78, 0x05, msgType, 0x00, 0x05, intToByte[0], intToByte[1], 0x0D, 0x0A};
+        buffer.writeBytes(bytes);
+        ChannelFuture channelFuture = channel.write(buffer);
+        channelFuture.addListener(future -> {
+            String deviceId = channelDeviceMap.get(channel);
+            logger.info("server reply [{}] to client device [{}] success:[{}] ", msg, deviceId, DatatypeConverter.printHexBinary(bytes));
+        });
+    }
+
+    /**
+     * 原始数据重新组装
+     *
+     * @param in
+     * @param deviceId
+     * @return
+     */
+    private byte[] getOriginalData(ByteBuf in, String deviceId) {
+        if (StringUtils.isNotBlank(deviceId)) {
+            in.resetReaderIndex();
+            byte[] deviceArr = deviceId.getBytes();
+            int length = in.readableBytes();
+            byte[] dataByteArray = new byte[length + deviceArr.length];
+            in.readBytes(dataByteArray, 0, in.readableBytes());
+            System.arraycopy(deviceArr, 0, dataByteArray, length, deviceArr.length);
+            return dataByteArray;
+        } else
+            return null;
+    }
+
+    private void resolveLocationMSG(ByteBuf in, String deviceId) throws Exception {
+        YiTongGPSDevice yiTongGPSDevice = new YiTongGPSDevice();
+        StringBuffer dateTimeStrBuf = new StringBuffer();
+        int indexOfDateTime = 0;
+        while (indexOfDateTime < DATA_SIZE) {
+            byte b = in.readByte();
+            dateTimeStrBuf.append(NumUtil.byte2String(b));
+            indexOfDateTime++;
+        }
+        // 日期
+        yiTongGPSDevice.setDate(dateTimeStrBuf.toString());
+        // gps信息卫星数
+        yiTongGPSDevice.setGpsCount(in.readByte());
+        // 维度
+        yiTongGPSDevice.setLat(in.readInt());
+        // 经度
+        yiTongGPSDevice.setLng(in.readInt());
+        // 速度
+        yiTongGPSDevice.setSpeedbyte(in.readByte());
+        // 航向
+        yiTongGPSDevice.setCourseStatus(in.readShort());
+        // 国家代号
+        yiTongGPSDevice.setMcc(in.readShort());
+        // 移动网号码
+        yiTongGPSDevice.setMnc(in.readByte());
+        // 位置区码
+        yiTongGPSDevice.setLac(in.readShort());
+        // 移动基站Cell Tower ID
+        yiTongGPSDevice.setCellId(in.readMedium());
+        yiTongGPSDevice.setAcc(in.readByte());
+        // 数据上报模式 0x00:定时上报,0x01:定距上报,0x02:拐点上传,0x03:ACC状态改变上传,0X08:开机上报位置信息
+        yiTongGPSDevice.setReportModel(in.readByte());
+        // 0x01:实时 0x00:补传
+        yiTongGPSDevice.setIsmendMsg(in.readByte());
+        double mileage = NumUtil.toFixed2Place((double) in.readInt() / 1000);
+        // 里程设备默认是关闭的,需要指令,设备端才发送
+        yiTongGPSDevice.setDeviceId(deviceId);
+        yiTongGPSDevice.setMileage(mileage);
+        yiTongGPSDevice.setDataType(1);
+        // 写文件操作
+        String deviceStr = YiTongGPSDevice.buildDeviceStr(yiTongGPSDevice);
+        FileUtils.dataStorage(deviceStr.getBytes(), dataPath, prefixName);
+    }
+
+    /**
+     * 按照开始,结束标记位读取数据
+     */
+    @Override
+    public void startAcceptor() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        byte[] tailSplitBytes = new byte[]{0x0D, 0x0A};
+        byte[] headerSplitBytes = new byte[]{0x78, 0x78};
+        byte[] headerSplitBytes1 = new byte[]{0x79, 0x79};
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        protected void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
+                                    Unpooled.copiedBuffer(tailSplitBytes), Unpooled.copiedBuffer(headerSplitBytes),
+                                    Unpooled.copiedBuffer(headerSplitBytes1)));
+                            ch.pipeline().addLast(GK309GpsServerHandler.this);
+                        }
+                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+            ChannelFuture f = b.bind(port).sync();
+            f.channel().closeFuture().sync();
+            logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+                    this.getPort());
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        } finally {
+            cleanRedisLinkData();
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+
+}

+ 103 - 0
src/main/resources/application.yml

@@ -0,0 +1,103 @@
+spring:
+  application:
+    name: device-acceptance
+  kafka:
+    bootstrap-servers: 192.168.0.119:9092
+  redis:
+    host: 192.168.0.119
+    password: tidecloudredis
+    timeout: 10000
+    port: 6379
+    max: 100
+    maxIdle: 10
+    minIdle: 3
+    maxWaitMills: 10000
+eureka:
+  client:
+    service-url:
+      defaultZone: http://192.168.0.118:12000/eureka
+  instance:
+    prefer-ip-address: true
+  port: 16666
+  localaddress: 10.27.118.76
+server:
+  port: ${random.int(16000,19999)}
+acceptance:
+ device:
+  deviceList:
+   -
+    name: watch
+    topic: device-watch
+    ip: 10.27.118.76
+    port: 7009
+    dataFileDir: /home/service/collector_watch/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.WatchServerHandler
+    enable: true
+    
+   -
+    name: bingshui
+    topic: device-bingshui
+    ip: 10.27.118.76
+    port: 7510
+    dataFileDir: /home/service/collector_7510/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.BingShuiGpsServerHandler
+    enable: false
+    
+   -
+    name: bsj
+    topic: device-bsj
+    ip: 10.25.19.87
+    port: 6707
+    dataFileDir: /home/service/collector_6707/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.BSJGpsServerHandler
+    enable: false
+    
+   -
+    name: yitong
+    topic: device-yitong
+    ip: 10.27.118.76
+    port: 7011
+    dataFileDir: /home/service/collector_yitong/rawdata-car/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler
+    enable: false    
+   -
+    name: yuguang
+    topic: device-yuguang
+    ip: 10.27.118.76
+    port: 7510
+    dataFileDir: /home/service/collector_7510/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.YuGuangGpsServerHandler
+    enable: false       
+
+   -
+    name: vorgea
+    topic: device-vorgea
+    ip: 10.27.118.76
+    port: 7511
+    dataFileDir: /home/service/collector_vorgea/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.VorgeaUR0401ServerHandler
+    enable: false
+#   -
+#    name: deyijia_watch
+#    topic: device-deyijia-watch
+#    ip: 192.168.0.133
+#    port: 7518
+#    dataFileDir: /home/service/collector_6727/rawdata/
+#    handlerClass: com.tidecloud.dataacceptance.service.impl.WatchDYJServerHandler
+#    enable: true
+   -
+    name: kds_gp_gps
+    topic: device-kds-gps
+    ip: 192.168.0.133
+    port: 7518
+    dataFileDir: /home/service/collector_6727/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.GK309GpsServerHandler
+    enable: true
+      
+logging:
+  config:
+    classpath: logback.xml
+
+
+
+