Browse Source

经纬工牌接入段

liuguokai_cloud 5 years ago
parent
commit
4a5b0d5bb2

+ 375 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/JingWeiCardServerHandler.java

@@ -0,0 +1,375 @@
+package com.tidecloud.dataacceptance.service.impl;
+
+import com.accept.client.DeviceMsgClient;
+import com.accept.client.VoiceMsgClient;
+import com.accept.model.VoiceMsgVo;
+import com.alibaba.fastjson.JSON;
+import com.tidecloud.dataacceptance.common.DateUtil;
+import com.tidecloud.dataacceptance.entity.Advice;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * 经纬AS9智能 GPS 定位终端
+ *
+ */
+@Sharable
+@Scope("prototype")
+@Component(JingWeiCardServerHandler.name)
+public class JingWeiCardServerHandler extends HexBinaryAcceptanceHandlerAdapter {
+
+    public static final String name = "JingWeiCardServerHandler";
+    private static final Logger logger = LoggerFactory.getLogger(JingWeiCardServerHandler.class);
+    private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    @Autowired
+    private DeviceMsgClient deviceMsgClient;
+
+    @Autowired
+    private VoiceMsgClient voiceMsgClient;
+
+    @Override
+    protected void handle(ByteBuf in, Channel channel) throws Exception {
+        byte[] req = new byte[in.readableBytes()];
+        in.readBytes(req);
+        String msg = new String(req, "UTF-8");
+        logger.info("传入数据:》》》》》》》》》" + msg);
+        Advice advice = setAdvice(msg);
+        if (advice == null) {
+            return;
+        }
+        //服务器发送语音数据
+        voiceReplyToClient(advice.getDeviceId(), channel);
+
+        switch (advice.getAdviceType()) {
+            case "LK": // 链路保持
+                sendMsg2Kafka((msg + DateUtil.formatDate2String(new Date())).getBytes(), advice.getDeviceId(), channel);
+                normalReply(advice, channel, "LK");
+                deviceMsgClient.acceptDeviceMsgParam("JingWei",
+                        advice.getDeviceId(),
+                        1, msg, System.currentTimeMillis());
+                break;
+            case "UD": // 位置信息
+                sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
+                logger.info("位置数据上报[UD]:" + advice.toString());
+                deviceMsgClient.acceptDeviceMsgParam("JingWei",
+                        advice.getDeviceId(),
+                        2, msg, System.currentTimeMillis());
+                break;
+            case "UD2": // 盲点补传数据 位置数据
+                logger.info("盲点补传数据[UD2]:" + advice.toString());
+                sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
+                break;
+            case "TK": // 微聊对讲终端发送内容
+                if (advice.getAdviceSerial().equals("1")){
+                    logger.info("发送对讲数据成功!");
+                    // 调用接口  查询回复内容
+//                    VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId());
+                    voiceMsgClient.updateVoiceMsgSendFinish(advice.getDeviceId(), 1, 4);
+                    break;
+                } else {
+                    logger.info("微聊对讲数据[TK]:" + advice.toString());
+                    VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId());
+                    voiceMsgClient.insVoiceMsg(msgVo);
+                    normalReply(advice, channel, "TK,1");
+                }
+                break;
+            case "UPLOAD": //数据上传间隔设置
+                logger.info("数据上传间隔设置[UPLOAD]");
+                setUpload(advice,channel);
+                break;
+            case "LZ": // 设置语言和时区
+                logger.info("设置语言和时区[LZ]:" + advice.toString());
+                normalReply(advice, channel, getLgZoneTime());
+                break;
+            default: // 其他
+                logger.info("client send data without handle type ...");
+                break;
+        }
+    }
+    //[CS*YYYYYYYYYY*LEN*TK,AMR 格式音频数据]
+
+    protected VoiceMsgVo splitVoiceMsg(String msg, String deviceId) {
+        VoiceMsgVo voiceMsg = new VoiceMsgVo();
+        SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");//设置日期格式
+        String time = df.format(new Date());
+        voiceMsg.setDeviceId(deviceId);
+        voiceMsg.setTotal(1);
+        voiceMsg.setNu(1);
+        voiceMsg.setVoiceTime(time);
+        try {
+            String[] deviceMsg = msg.split("\\,|]");
+            StringBuilder replyCommand = new StringBuilder();
+            for (int i = 1; i <= (deviceMsg.length - 1) / 2; i++) {
+                replyCommand.append(amrToHex16(deviceMsg[(i*2)-1],deviceMsg[i*2])).append(",");
+            }
+            replyCommand.deleteCharAt(replyCommand.length()-1);
+            String replyCommandStr = replyCommand.toString();
+            byte[] bytes = replyCommandStr.getBytes();
+            voiceMsg.setMsg(bytes);
+            voiceMsg.setLength(Integer.valueOf(bytes.length));
+        } catch (Exception e) {
+            logger.warn("语音解析错误" + JSON.toJSONString(voiceMsg) + ":" + e.getStackTrace());
+        }
+        return voiceMsg;
+    }
+
+    public static String amrToHex16(String a,String b) {
+        String exp = "0X7D";
+        if (a.equals(exp) && b.equals("0X01")){
+            return "0X7D";
+        } else if (a.equals(exp) && b.equals("0X02")){
+            return "0X5B";
+        } else if (a.equals(exp) && b.equals("0X03")){
+            return "0X5D";
+        } else if (a.equals(exp) && b.equals("0X04")){
+            return "0X2C";
+        } else if (a.equals(exp) && b.equals("0X05")){
+            return "0X2A";
+        }
+        return "";
+    }
+    public static String amrToReverse(String a) {
+        if (a.equals("0X7D")){
+            return "0X7D 0X01";
+        } else if (a.equals("0X5B")){
+            return "0X7D 0X02";
+        } else if (a.equals("0X5D")){
+            return "0X7D 0X03";
+        } else if (a.equals("0X2C")){
+            return "0X7D 0X04";
+        } else if (a.equals("0X2A")){
+            return "0X7D 0X05";
+        }
+        return "";
+    }
+
+    /**
+     * 回复内容
+     *
+     * @param msg      [CS*YYYYYYYYYY*LEN*TK,1]
+     * @param deviceId
+     * @return
+     */
+/*    protected VoiceMsgVo splitVoiceMsg(String msg, String deviceId) {
+        VoiceMsgVo voiceMsg = new VoiceMsgVo();
+        String[] msgArr = msg.split("\\*");
+        // 获取内容
+        String[] contents = msgArr[3].split(",");
+        voiceMsg.setDeviceId(deviceId);
+        voiceMsg.setLag(Integer.valueOf(contents[1]));
+        return voiceMsg;
+    }*/
+
+    /**
+     * 查询服务器传来的语音包并回复
+     *
+     * @param deviceId
+     * @param channel
+     */
+    protected void voiceReplyToClient(String deviceId, Channel channel) {
+        try {
+            VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
+            normalReply(channel, voiceF);
+            //发送中的数据
+            voiceMsgClient.updateVoiceMsgSendFinish(deviceId, 1, 3);
+        } catch (Exception e) {
+            logger.error("语音下行发送异常!!!!! deviceId=" + deviceId);
+        }
+    }
+
+    /**
+     *  主动回复
+     *
+     * @param channel
+     * @content    获取下一条待发送终端语音  [CS*YYYYYYYYYY*LEN*TK,AMR 格式音频数据]
+     */
+    private void normalReply(Channel channel, VoiceMsgVo voiceMsg) {
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append("CS").append("*");
+        replyCommand.append(voiceMsg.getDeviceId()).append("*");
+        replyCommand.append(numToHex16(voiceMsg.getLength())).append("*");
+        String replyCommandStr = replyCommand.toString();
+
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        buffer.writeBytes(voiceMsg.getMsg());
+        buffer.readerIndex(0);
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg())));
+    }
+
+
+    protected static String getLgZoneTime() {
+        StringBuilder sb = new StringBuilder("LZ,1,8");
+        return sb.toString();
+    }
+
+    protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
+        ByteBuf dataByteBufCopy = dataByteBuf.copy();
+        byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
+        dataByteBufCopy.readBytes(dataByteArray);
+        dataByteBufCopy.release();
+    }
+
+    /**
+     * 设置终端 上报频率
+     *
+     * @param advice
+     * @param channel
+     */
+    protected void setUpload(Advice advice, Channel channel) {
+        //  上报频率 一分钟一次
+        String content = "UPLOAD,1";
+        Date loginTime = new Date();
+        executorService.execute(new Runnable() {
+            public void run() {
+                Date currentTime = new Date();
+                try {
+                    Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
+                    if (secondsSinceLogin < 5L) {
+                        TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
+                    }
+                    normalReply(advice, channel, content);
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage());
+                }
+            }
+        });
+    }
+
+    /**
+     * 回复
+     * [ZJ*YYYYYYYYYY*LEN*LK]
+     *
+     * @param advice
+     * @param channel
+     * @content 回复内容
+     */
+    private void normalReply(Advice advice, Channel channel, String content) {
+        String factory = advice.getFacotry();
+        String deviceId = advice.getDeviceId();
+
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append("[");
+        //  拼接厂商标识
+        replyCommand.append(factory).append("*");
+        //  拼接设备标识
+        replyCommand.append(deviceId).append("*");
+        //  内容长度
+        replyCommand.append(numToHex16(content.length())).append("*");
+        // 指令内容标记
+        replyCommand.append(content);
+        replyCommand.append("]");
+
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
+    }
+
+    //使用1字节就可以表示b
+    public static String numToHex8(int b) {
+        return String.format("%02x", b);//2表示需要两个16进行数
+    }
+
+    //需要使用2字节表示b
+    public static String numToHex16(int b) {
+        return String.format("%04x", b);
+    }
+
+    public static String numToHex32(int b) {
+        return String.format("%08x", b);
+    }
+
+    /**
+     * 解析 Advice 数据
+     * [厂商*设备 ID*内容长度*内容]
+     * @param msg
+     * @return
+     */
+    private Advice setAdvice(String msg) {
+
+        Advice advice = new Advice();
+        if (msg != null) {
+            try {
+                //厂商标识   设备ID   内容长度   指令内容
+                String message = String.valueOf(msg);
+                int startIndex = message.indexOf("[");
+                int endIndex = message.indexOf("]");
+                String data = message.substring(startIndex + 1, endIndex);
+                String[] bodys = data.split("\\*");
+                // 厂商
+                advice.setFacotry(bodys[0]);
+                // 设备Id
+                advice.setDeviceId(bodys[1]);
+                // 指令长度
+                advice.setAdvicelength(bodys[2]);
+                // 获取内容
+                String[] contents = bodys[3].split(",");
+                // 标识符
+                advice.setAdviceType(contents[0]);
+                //指令流水字段被用作判断音频是否接受成功
+                advice.setAdviceSerial(contents[1]);
+                return advice;
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void startAcceptor() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .option(ChannelOption.SO_BACKLOG, 1024)
+                    .handler(new LoggingHandler(LogLevel.INFO))
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        protected void initChannel(SocketChannel ch) {
+                            ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes());
+                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
+                            ch.pipeline().addLast(JingWeiCardServerHandler.this);
+                        }
+                    });
+            ChannelFuture f = b.bind(port).sync();
+            logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+                    this.getPort());
+            f.channel().closeFuture().sync();
+        } catch (Exception ex) {
+            logger.warn(ex.getMessage(), ex);
+        } finally {
+            cleanRedisLinkData();
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+}

+ 8 - 0
src/main/resources/dev/application.yml

@@ -160,6 +160,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.HengShengWatchServerHandler
     enable: true
+   -
+    name: jingwei_card
+    topic: device-jingwei-work-card
+    ip: 10.25.19.87
+    port: 7530
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.JingWeiCardServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml

+ 8 - 1
src/main/resources/prod/application.yml

@@ -160,7 +160,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.HengShengWatchServerHandler
     enable: true
-
+   -
+    name: jingwei_card
+    topic: device-jingwei-work-card
+    ip: 10.25.19.87
+    port: 7530
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.JingWeiCardServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml

+ 8 - 0
src/main/resources/test/application.yml

@@ -160,6 +160,14 @@ acceptance:
     dataFileDir: /home/service/collector_7527/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.HengShengWatchServerHandler
     enable: true
+   -
+    name: jingwei_card
+    topic: device-jingwei-work-card
+    ip: 10.25.19.87
+    port: 7530
+    dataFileDir: /home/service/collector_7527/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.JingWeiCardServerHandler
+    enable: true
 logging:
   config:
     classpath: logback.xml