Browse Source

增加经纬 心跳配置

jianghouwei 6 years ago
parent
commit
0debfa09b8

+ 92 - 19
src/main/java/com/tidecloud/dataacceptance/service/impl/WatchJWServerHandler.java

@@ -1,5 +1,6 @@
 package com.tidecloud.dataacceptance.service.impl;
 
+import com.accept.client.DeviceCronClient;
 import com.accept.client.VoiceMsgClient;
 import com.accept.model.VoiceMsgVo;
 import com.alibaba.fastjson.JSON;
@@ -14,7 +15,6 @@ import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.util.ByteProcessor;
@@ -70,14 +70,20 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
      */
     private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
 
-    private final ByteBuf TAG_AP07 = Unpooled.copiedBuffer("AP07".getBytes());
-    private final ByteBuf TAG_COMMA = Unpooled.copiedBuffer(",".getBytes());
+    /**
+     * 心率
+     */
+    private static Map<String, Integer> beatMap = new ConcurrentHashMap<>();
+
 
     private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
 
     @Autowired
     private VoiceMsgClient voiceMsgClient;
 
+    @Autowired
+    private DeviceCronClient deviceCronClient;
+
     @Override
     protected void handle(ByteBuf in, Channel channel) throws Exception {
         //String msg = byteBufferToString(in.nioBuffer());TODO (注意)
@@ -126,20 +132,12 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
                     checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
                     sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
                     logger.warn("心跳报文下发 msg+" + msg);
-//                    //TODO (判断当前设备是否有需要回复的 数据)
-                    VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
-                    if (voiceF.getLag() == 1) {
-                        normalBP28Reply(channel, voiceF);
-                    }
+                    voiceReplyToClient(deviceId, channel);//语音下行发送
+                    snedApthReplyToClient(deviceId, channel);
                     break;
                 case "AP01": // 位置信息
                     String gpsState = msg.substring(12, 13);
                     // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
-                    String gDay = msg.substring(6, 12);
-                    String gTime = msg.substring(39, 45);
-//					if (checkGpsTime(gDay + gTime, time)) {
-//						channel.close();
-//					}
                     setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
                     normalReply(factory, channel, "BP01");
                     break;
@@ -166,8 +164,23 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
                     break;
                 case "APHT": // 心率测量
                     sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
+                    normalReply(factory, channel, "BPHT");
+                    if (beatMap.containsKey(deviceId)) {
+                        beatMap.put(deviceId, 2);
+                    }
+                    break;
+                case "AP49": // 心率上传  IWAP49,68#
+                    String heatBeat = msg.substring(7, 9);// 工厂
+                    StringBuffer sb = new StringBuffer("IWAPHT,");// 伪装 APHT 一样的数据格式
+                    sb.append(heatBeat).append(",").append("0,0,#");
+                    sendMsg2Kafka((sb.toString() + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
                     normalReply(factory, channel, "BP49");
-                    // TODO 设置心率周期上传还是 开机上传一次
+                    if (beatMap.containsKey(deviceId)) {
+                        beatMap.put(deviceId, 2);
+                    }
+                    break;
+                case "APXL": // 心率主动下发,终端回复
+                    beatMap.put(deviceId, 1);// 标记下发成功
                     break;
                 case "AP07": // 语音上传
                     // TODO 设置语音上传  IWAP07,20140818064408,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
@@ -206,6 +219,71 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
     }
 
 
+    /**
+     * 语音下行发送
+     *
+     * @param deviceId
+     * @param channel
+     */
+    protected void voiceReplyToClient(String deviceId, Channel channel) {
+        try {
+            VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
+            if (voiceF.getLag() == 1) {
+                normalBP28Reply(channel, voiceF);
+            }
+        } catch (Exception e) {
+            logger.error("语音下行发送异常!!!!! deviceId=" + deviceId);
+        }
+
+    }
+
+    /**
+     * 心率下行发送
+     * beatMap (1 下发成功,2 下发完成)
+     *
+     * @param deviceId
+     * @param channel
+     */
+    protected void snedApthReplyToClient(String deviceId, Channel channel) {
+        try {
+            if (!beatMap.isEmpty()
+                    || beatMap.containsKey(deviceId)
+                    || Integer.valueOf(2).equals(beatMap.get(deviceId))) {
+                return;
+            }
+            //  时间校验  1: 是否需要下发心率获取
+            Boolean lag = deviceCronClient.getSendApThByDeviceId(deviceId);
+            if (lag) {
+                setBPXLToClient(deviceId, channel);
+            } else {
+                beatMap.remove(deviceId);
+            }
+        } catch (Exception e) {
+            logger.error("心率下行发送异常!!!!! deviceId=" + deviceId);
+        }
+
+    }
+
+
+    /**
+     * 获取心率数据 下行指令
+     *
+     * @param deviceId
+     * @param channel
+     */
+    protected void setBPXLToClient(String deviceId, Channel channel) {
+        StringBuilder replyCommand = new StringBuilder();
+        replyCommand.append("IWBPXL").append(",");
+        replyCommand.append(deviceId).append(",");
+        replyCommand.append("080835");//指令流水
+        replyCommand.append("#");
+        String replyCommandStr = replyCommand.toString();
+        ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
+        buffer.writeBytes(replyCommandStr.getBytes());
+        ChannelFuture channelFuture = channel.writeAndFlush(buffer);
+        channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
+    }
+
     /**
      * 语音上行回复
      *
@@ -245,11 +323,6 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
         ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
         buffer.writeBytes(replyCommandStr.getBytes());
         buffer.writeBytes(voiceMsg.getMsg());
-//        buffer.readerIndex(0);
-//        byte[] req = new byte[buffer.readableBytes()];
-//        buffer.readBytes(req);
-//        voiceMsg.setMsg(req);
-//        VoiceMsgVo vo = voiceMsgClient.insVoiceMsg(voiceMsg);// 调用接口 写库
         buffer.readerIndex(0);
         ChannelFuture channelFuture = channel.writeAndFlush(buffer);
         channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg())));