package com.tidecloud.dataacceptance.service.impl; import com.accept.client.DeviceCronClient; import com.accept.client.VoiceMsgClient; import com.accept.model.VoiceMsgVo; import com.alibaba.fastjson.JSON; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.service.DelimiterJingWeiFrameDecoder; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.util.ByteProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created by jhw on 2018/7/20. */ @Sharable @Scope("prototype") @Component(WatchJWServerHandler.name) public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter { public static final String name = "WatchJWServerHandler"; private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class); private static ExecutorService executorService = Executors.newSingleThreadExecutor(); private static final Long INTERVAL_TIME = 300000L; // 开关时间 private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S /** * 紧急模式 */ private static final Integer URGENCY = 3; /** * 其他模式 */ private static final Integer OTHER = 2; /** * 省电模式开关 */ private static Map switchMap = new ConcurrentHashMap<>(); /** * 设置模式 */ private static Map modelMap = new ConcurrentHashMap<>(); /** * 心率 */ private static Map beatMap = new ConcurrentHashMap<>(); private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ','); @Autowired private VoiceMsgClient voiceMsgClient; @Autowired private DeviceCronClient deviceCronClient; @Override protected void handle(ByteBuf in, Channel channel) throws Exception { //String msg = byteBufferToString(in.nioBuffer());TODO (注意) byte[] req = new byte[in.readableBytes()]; in.readBytes(req); String msg = new String(req, "UTF-8"); try { String deviceId = channelDeviceMap.get(channel); String factory = msg.substring(0, 2);// 工厂 String type = msg.substring(2, 6);// 标记 if (deviceId != null) { MDC.put(MDC_DEVICEID, deviceId); } else { logger.info("该手表没有登录:传入数据为" + msg); if (!"AP00".equals(type)) { logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg); channel.close(); return; } } logger.info("传入数据为:" + msg); Long time = System.currentTimeMillis(); WorkModel workModel = null; SwitchWorkModel swm = null; if (deviceId != null) { workModel = modelMap.get(deviceId);//已登录 swm = switchMap.get(deviceId);// 开关 if (workModel != null && workModel.getUrgentType() == OTHER && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) { workModel.setUrgentTime(time); workModel.setUrgentType(OTHER); modelMap.put(deviceId, workModel); normalReplyModel(factory, deviceId, channel, 3); logger.warn("超过指定时间没有收到回复》》》 重新设置"); // 更新开关切换时间 swm.setSwitchTime(time); switchMap.put(deviceId, swm); } voiceReplyToClient(deviceId, channel);//语音下行发送 } switch (type) { case "AP00": // 初始化登录 resolveLoginMSG(msg, channel); break; case "AP03": // 连接(心跳) BP03# normalReply(factory, channel, "BP03"); checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文 sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发 logger.warn("心跳报文下发 msg+" + msg); sendApthReplyToClient(deviceId, channel); break; case "AP01": // 位置信息 String gpsState = msg.substring(12, 13); // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接 setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据 normalReply(factory, channel, "BP01"); break; case "AP33": // 设置模式回复 //IWAP33,080835,1# IWAP33,080835,03# if (deviceId != null) { Integer moderType = getInteger(msg.substring(14, msg.indexOf("#")));// 收到回复状态 if (swm.getWorkType() == URGENCY) {// 当前设置的模式 if (workModel != null && moderType == URGENCY) { workModel.setUrgentType(URGENCY); logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg); } else { logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>"); Thread.sleep(1000); normalReplyModel(factory, deviceId, channel, URGENCY); workModel.setUrgentType(OTHER); // 更新开关时间 swm.setSwitchTime(time); switchMap.put(deviceId, swm); } modelMap.put(deviceId, workModel); } } break; case "APHT": // 心率测量 sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发 normalReply(factory, channel, "BPHT"); if (beatMap.containsKey(deviceId)) { beatMap.put(deviceId, 2); logger.warn("标记APHT=2参数状态变更deviceId=" + deviceId); } break; case "AP49": // 心率上传 IWAP49,68# String heatBeat = msg.substring(7, 9);// 工厂 StringBuffer sb = new StringBuffer("IWAPHT,");// 伪装 APHT 一样的数据格式 sb.append(heatBeat).append(",").append("0,0,#"); sendMsg2Kafka((sb.toString() + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发 normalReply(factory, channel, "BP49"); if (beatMap.containsKey(deviceId)) { beatMap.put(deviceId, 2); logger.warn("标记AP49=2参数状态变更deviceId=" + deviceId); } break; case "APXL": // 心率主动下发,终端回复 beatMap.put(deviceId, 1);// 标记下发成功 logger.warn("标记APXL = 1参数状态变更deviceId=" + deviceId); break; case "AP07": // 语音上传 // TODO 设置语音上传 IWAP07,20140818064408,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX# VoiceMsgVo ap07Msg = splitAP07VoiceMsg(msg, deviceId, in); // 调用接口 写库 VoiceMsgVo vo = voiceMsgClient.insVoiceMsg(ap07Msg); normalVoiceReply(channel, vo); break; case "AP28": // 语音下行终端回复 // 调用接口 查询回复内容 VoiceMsgVo ap28Msg = splitAP28VoiceMsg(msg, deviceId); VoiceMsgVo voiceMsg = null; if (ap28Msg.getLag() == 1) { if (ap28Msg.getNu() < ap28Msg.getTotal()) { // 回复下个语音包 voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu() + 1); } else { // 更新数据状态为发送完毕 voiceMsgClient.updateVoiceMsgSendFinish(deviceId, ap28Msg.getMsgId(), 4); } } else { // 回复上个语音包 voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu()); } if (voiceMsg != null && voiceMsg.getLag() == 1) { normalBP28Reply(channel, voiceMsg); } break; default: // 其他 logger.info("client send data without handle type ..."); break; } } finally { MDC.clear(); } } /** * 语音下行发送 * * @param deviceId * @param channel */ protected void voiceReplyToClient(String deviceId, Channel channel) { try { VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1); if (voiceF.getLag() == 1) { normalBP28Reply(channel, voiceF); // 更新数据状态为发送完毕 voiceMsgClient.updateVoiceMsgSendFinish(deviceId, voiceF.getMsgId(), 3); } } catch (Exception e) { logger.error("语音下行发送异常!!!!! deviceId=" + deviceId); } } /** * 心率下行发送 * beatMap (1 下发成功,2 下发完成) * * @param deviceId * @param channel */ protected void sendApthReplyToClient(String deviceId, Channel channel) { try { // 时间校验 1: 是否需要下发心率获取 Boolean lag = deviceCronClient.getSendApThByDeviceId(deviceId); if (lag) { if (beatMap.isEmpty() || !beatMap.containsKey(deviceId) || 1 == beatMap.get(deviceId)) { setBPXLToClient(deviceId, channel); } } else { beatMap.remove(deviceId); logger.warn("标记参数移除deviceId=" + deviceId); } } catch (Exception e) { logger.error("心率下行发送异常!!!!! deviceId=" + deviceId); } } /** * 获取心率数据 下行指令 * * @param deviceId * @param channel */ protected void setBPXLToClient(String deviceId, Channel channel) { StringBuilder replyCommand = new StringBuilder(); replyCommand.append("IWBPXL").append(","); replyCommand.append(deviceId).append(","); replyCommand.append("080835");//指令流水 replyCommand.append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } /** * 语音上行回复 * * @param channel * @content 回复内容 IWBP07,20140818064408,6,1,1# */ private void normalVoiceReply(Channel channel, VoiceMsgVo voiceMsg) { StringBuilder replyCommand = new StringBuilder(); replyCommand.append("IWBP07").append(","); replyCommand.append(voiceMsg.getVoiceTime()).append(","); replyCommand.append(voiceMsg.getTotal()).append(","); replyCommand.append(voiceMsg.getNu()).append(","); replyCommand.append(voiceMsg.getLag());// 接受成功1:成功,0 失败 replyCommand.append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } /** * 语音下行回复 * * @param channel * @content 回复内容 获取下一条代发送终端语音 IWBP28, D3590D54,XXXX,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX# */ private void normalBP28Reply(Channel channel, VoiceMsgVo voiceMsg) { StringBuilder replyCommand = new StringBuilder(); replyCommand.append("IWBP28").append(","); replyCommand.append("D3590D54").append(","); replyCommand.append(voiceMsg.getMsgId()).append(","); replyCommand.append(voiceMsg.getTotal()).append(","); replyCommand.append(voiceMsg.getNu()).append(","); replyCommand.append(voiceMsg.getLength()).append(","); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); buffer.writeBytes(voiceMsg.getMsg()); buffer.readerIndex(0); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg()))); } /** * 上行语音包才解析 * * @param msg IWAP07,20140818064408,6,1,1024,XXXXXXXX# * @param deviceId * @return */ protected VoiceMsgVo splitAP07VoiceMsg(String msg, String deviceId, ByteBuf byteBufIn) { VoiceMsgVo voiceMsg = new VoiceMsgVo(); String[] msgArr = msg.split(","); voiceMsg.setDeviceId(deviceId); voiceMsg.setVoiceTime(msgArr[1]); voiceMsg.setTotal(Integer.valueOf(msgArr[2])); voiceMsg.setNu(Integer.valueOf(msgArr[3])); voiceMsg.setLength(Integer.valueOf(msgArr[4])); try { byteBufIn.readerIndex(0); // 按照逗号切割 int cutLength = 0;// 存储最后一次查询条件 for (int i = 0; i < 5; i++) { int commaLength = byteBufIn.forEachByte(byteBufIn.readerIndex(), 30, FIND_COMMA); cutLength = commaLength; byteBufIn.readerIndex(commaLength + 1); } byteBufIn.readerIndex(cutLength + 1); byte[] bytes = new byte[voiceMsg.getLength() + 1]; byteBufIn.readBytes(bytes); String msg1 = new String(bytes, "UTF-8"); voiceMsg.setMsg(bytes); } catch (Exception e) { logger.warn("语音解析错无" + JSON.toJSONString(voiceMsg) + ":" + e.getStackTrace()); } return voiceMsg; } /** * 下行语音包才解析 * * @param msg IWAP28,D3590D54,XXXX,6,1,1# * @param deviceId * @return */ protected VoiceMsgVo splitAP28VoiceMsg(String msg, String deviceId) { VoiceMsgVo voiceMsg = new VoiceMsgVo(); String[] msgArr = msg.split(","); voiceMsg.setDeviceId(deviceId); voiceMsg.setTotal(Integer.valueOf(msgArr[3])); voiceMsg.setNu(Integer.valueOf(msgArr[4])); voiceMsg.setLag(Integer.valueOf(msgArr[5].substring(0, 1))); voiceMsg.setMsgId(Integer.valueOf(msgArr[2])); return voiceMsg; } /** * 模式设置 * * @param factory * @param deviceId * @param channel */ protected void checkSwitchMap(String factory, String deviceId, Channel channel) { Long nowTime = System.currentTimeMillis();// 当前时间戳 if (deviceId == null || !switchMap.containsKey(deviceId)) { return; } SwitchWorkModel swm = switchMap.get(deviceId); logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime()))); if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) { Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY; if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) { workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式 } if (URGENCY == workType) { //如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据 swm.setFirstRemove(Boolean.FALSE); } // 如果当前状态为A 紧急模式 swm.setSwitchTime(nowTime); swm.setWorkType(workType); switchMap.put(deviceId, swm); logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm)); normalReplyModel(factory, deviceId, channel, workType); if (workType == URGENCY) {// 设置紧急模式 则更新紧急模式设置状态 WorkModel wm = new WorkModel(); wm.setUrgentType(OTHER);// wm.setUrgentTime(nowTime);// 设置紧急模式时间 modelMap.put(deviceId, wm); } } else { logger.warn("心跳检测是下发模式:工作不更改状态"); } } /** * 初始化 数据记录 注册记录 * * @param deviceId */ protected void initSwitchMap(String deviceId) { if (deviceId == null) { return; } Long time = System.currentTimeMillis(); SwitchWorkModel swm = new SwitchWorkModel(); swm.setActiveTime(time); swm.setSwitchTime(time); swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式 swm.setGpsUpTime(0L);// GPS 上报时间接口 swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃 switchMap.put(deviceId, swm); logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm)); } /** * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接 *

* * @return 返回值为ture 则需要断开连接,如果返回值为false 则不需要断开连接 */ protected Boolean checkGpsTime(String gTime, Long time) { try { DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss"); Date date = fmt.parse(gTime); if ((time - date.getTime()) > 3 * 3600 * 1000) { return Boolean.TRUE; } } catch (Exception e) { logger.error(e.getMessage(), e); } return Boolean.FALSE; } /** * APOI 上报数据 更新活跃时间 * * @param deviceId */ protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) { if (deviceId == null) { return null; } Long time = System.currentTimeMillis(); SwitchWorkModel swm = switchMap.getOrDefault(deviceId, new SwitchWorkModel()); swm.setGpsUpTime(time); if ("A".equals(gpsState)) { swm.setActiveTime(time); logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm)); if (swm.getFirstRemove()) { sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel); } else { swm.setFirstRemove(Boolean.TRUE); logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId); } } else { logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>"); } switchMap.put(deviceId, swm); logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm)); return swm; } protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) { ByteBuf dataByteBufCopy = dataByteBuf.copy(); byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()]; dataByteBufCopy.readBytes(dataByteArray); dataByteBufCopy.release(); } /** * 登录管理 * * @param channel */ private void resolveLoginMSG(String msg, Channel channel) { /*IWAP00353456789012345# */ String message = String.valueOf(msg); String factory = message.substring(0, 2); String deviceId = message.substring(6, 21); String deviceIdInMap = channelDeviceMap.get(channel); MDC.put(MDC_DEVICEID, deviceId); if (!deviceId.equals(deviceIdInMap)) { manageChannel(channel, deviceId); } String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss"); normalReply(factory, channel, "BP00," + date + ",8"); Date loginTime = new Date(); initSwitchMap(deviceId); executorService.execute(new Runnable() { @Override public void run() { Date currentTime = new Date(); try { Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000; if (secondsSinceLogin < 5L) { TimeUnit.SECONDS.sleep(5 - secondsSinceLogin); } normalReplyModel(factory, deviceId, channel, URGENCY); WorkModel wm = new WorkModel(); wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间 wm.setUrgentType(OTHER);// 默认指令模式为正常模式 modelMap.put(deviceId, wm); } catch (InterruptedException e) { logger.error(e.getMessage()); } } }); } // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)# private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) { StringBuilder replyCommand = new StringBuilder(); replyCommand.append(factory).append("BP33").append(",") .append(deviceId).append(",") .append("080835").append(",") .append(workType).append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr)); } /** * 回复 * * @param channel * @content 回复内容 */ private void normalReply(String factory, Channel channel, String content) { // gps ==== >IW BP01# // 登录 ==== >IW BP00,20150101125223,8# // 心跳 ==== >IW BP03# StringBuilder replyCommand = new StringBuilder(); replyCommand.append(factory); replyCommand.append(content); replyCommand.append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } @Override public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes()); // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter)); ch.pipeline().addLast(new DelimiterJingWeiFrameDecoder(65535, false, delimiter)); ch.pipeline().addLast(WatchJWServerHandler.this); } }); ChannelFuture f = b.bind(port).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (Exception ex) { logger.warn(ex.getMessage(), ex); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } /** * 移除 失效的 deviceId * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); if (!channel.isActive()) { String deviceId = channelDeviceMap.get(channel); if (deviceId != null) { switchMap.remove(deviceId); modelMap.remove(deviceId); } } super.channelInactive(ctx); } class WorkModel { /** * 设置紧急模式时间 */ private Long urgentTime; /** * 设置紧急模式状态 */ private Integer urgentType; public Long getUrgentTime() { return urgentTime; } public void setUrgentTime(Long urgentTime) { this.urgentTime = urgentTime; } public Integer getUrgentType() { return urgentType; } public void setUrgentType(Integer urgentType) { this.urgentType = urgentType; } } /** * 开关工作模式 */ class SwitchWorkModel { /** * 有效数据时间 毫秒 */ private Long activeTime; /** * 开关 A 有效,V 无效 */ private Integer workType; /** * 开关切换时间 毫秒 */ private Long switchTime; /** * GPS APO1 上傳时间 */ private Long gpsUpTime; /** * 模式切换第一条是否移除 true 不需要移除, false 需要移除 */ private Boolean firstRemove = Boolean.TRUE; public Long getActiveTime() { return activeTime; } public void setActiveTime(Long activeTime) { this.activeTime = activeTime; } public Integer getWorkType() { return workType; } public void setWorkType(Integer workType) { this.workType = workType; } public Long getSwitchTime() { return switchTime; } public void setSwitchTime(Long switchTime) { this.switchTime = switchTime; } public Long getGpsUpTime() { return gpsUpTime; } public void setGpsUpTime(Long gpsUpTime) { this.gpsUpTime = gpsUpTime; } public Boolean getFirstRemove() { return firstRemove; } public void setFirstRemove(Boolean firstRemove) { this.firstRemove = firstRemove; } } }