package com.tidecloud.dataacceptance.service.impl; import com.accept.client.DeviceMsgClient; import com.accept.client.VoiceMsgClient; import com.accept.model.VoiceMsgVo; import com.alibaba.fastjson.JSON; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.entity.Advice; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.util.ByteProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * * 经纬AS9智能 GPS 定位终端 * */ @Sharable @Scope("prototype") @Component(JingWeiCardServerHandler.name) public class JingWeiCardServerHandler extends HexBinaryAcceptanceHandlerAdapter { public static final String name = "JingWeiCardServerHandler"; private static final Logger logger = LoggerFactory.getLogger(JingWeiCardServerHandler.class); private static ExecutorService executorService = Executors.newSingleThreadExecutor(); private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ','); private static Map deviceChannelMap = new HashMap<>(); @Autowired private DeviceMsgClient deviceMsgClient; @Autowired private VoiceMsgClient voiceMsgClient; @Override protected void handle(ByteBuf in, Channel channel) throws Exception { byte[] req = new byte[in.readableBytes()]; in.readBytes(req); String msg = new String(req, "UTF-8"); System.out.println(req.length); logger.info("传入数据:》》》》》》》》》" + msg); Advice advice = setAdvice(msg); if (advice == null) { return; } //服务器发送语音数据 voiceReplyToClient(advice.getDeviceId(), channel); switch (advice.getAdviceType()) { case "KA": // 链路保持 sendMsg2Kafka((msg + DateUtil.formatDate2String(new Date())).getBytes(), advice.getDeviceId(), channel); // 这里需要写死成这样,协议如此规定 advice.setFacotry("IC"); normalReply(advice, channel, "KA"); deviceChannelMap.put(advice.getDeviceId(), channel); deviceMsgClient.acceptDeviceMsgParam("JingWei", advice.getDeviceId(), 1, msg, System.currentTimeMillis()); break; case "UD": // 位置信息 sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel); logger.info("位置数据上报[UD]:" + advice.toString()); deviceMsgClient.acceptDeviceMsgParam("JingWei", advice.getDeviceId(), 2, msg, System.currentTimeMillis()); break; case "UD2": // 盲点补传数据 位置数据 logger.info("盲点补传数据[UD2]:" + advice.toString()); sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel); break; case "TKQ": // 终端请求语音下发 logger.info("终端请求语音下发[TKQ]:" + advice.toString()); normalReply(advice, channel, "TKQ"); break; case "TKQ2": // 终端请求好友语音下发 logger.info("终端请求好友语音下发[TKQ2]:" + advice.toString()); normalReply(advice, channel, "TKQ2"); break; case "TK": // 微聊对讲终端发送内容 if (advice.getAdviceSerial().equals("1")){ logger.info("发送对讲数据成功!"); // 调用接口 查询回复内容 // VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId()); voiceMsgClient.updateVoiceMsgSendFinish(advice.getDeviceId(), 1, 4); break; } else { logger.info("微聊对讲数据[TK]:" + advice.toString()); VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId(), req); voiceMsgClient.insVoiceMsg(msgVo); normalReply(advice, channel, "TK,1"); } break; case "UPLOAD": //数据上传间隔设置 logger.info("数据上传间隔设置[UPLOAD]"); setUpload(advice,channel); break; case "LZ": // 设置语言和时区 logger.info("设置语言和时区[LZ]:" + advice.toString()); normalReply(advice, channel, getLgZoneTime()); break; default: // 其他 logger.info("client send data without handle type ..."); break; } } //[3G*YYYYYYYYYY*LEN*TK,AMR 格式音频数据] protected VoiceMsgVo splitVoiceMsg(String msg, String deviceId, byte[] req) { int length = req.length - 24; byte[] bs = new byte[length]; System.arraycopy(req, 23, bs, 0, length); VoiceMsgVo voiceMsg = new VoiceMsgVo(); SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");//设置日期格式 String time = df.format(new Date()); voiceMsg.setDeviceId(deviceId); voiceMsg.setTotal(1); voiceMsg.setNu(1); voiceMsg.setVoiceTime(time); voiceMsg.setMsg(bs); voiceMsg.setMsg(bs); return voiceMsg; } /** * 查询服务器传来的语音包并回复 * * @param deviceId * @param channel */ protected void voiceReplyToClient(String deviceId, Channel channel) { try { VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1); if (voiceF.getLag() == 1) { normalReply2(channel, voiceF,""); //发送中的数据 voiceMsgClient.updateVoiceMsgSendFinish(deviceId, voiceF.getMsgId(), 4); } } catch (Exception e) { logger.error("语音下行发送异常!!!!! deviceId=" + deviceId); } } private void normalReply2(Channel channel, VoiceMsgVo voiceMsg, String msg) { ByteBuf buffer = buildVoiceToClientBytebuf(voiceMsg, msg); Integer msgId = voiceMsg.getMsgId(); String deviceId = voiceMsg.getDeviceId(); sendVoiceToDevice(channel, buffer, msgId, deviceId); } public boolean sendVoiceToDevice(String deviceId, byte[] bytes) { Channel channel = deviceChannelMap.get(deviceId); if (channel == null) { logger.warn("the device[{}] is offline and send cancled", deviceId); return false; } ByteBuf byteBuf = buildVoiceToClientBytebuf(deviceId, bytes); sendVoiceToDevice(channel, byteBuf,0, deviceId); return true; } private void sendVoiceToDevice(Channel channel, ByteBuf buffer, Integer msgId, String deviceId) { ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> { logger.info("send voice[{}] to client[{}]", msgId, deviceId); } ); } private ByteBuf buildVoiceToClientBytebuf(VoiceMsgVo voiceMsg, String msg) { String deviceId = voiceMsg.getDeviceId(); byte[] bytes = voiceMsg.getMsg(); return buildVoiceToClientBytebuf(deviceId, bytes); } private ByteBuf buildVoiceToClientBytebuf(String deviceId, byte[] bytes) { StringBuilder replyCommand = new StringBuilder(); replyCommand.append("["); replyCommand.append("3G").append("*"); replyCommand.append(deviceId).append("*"); replyCommand .append(numToHex16(bytes.length + 3)) .append("*"); replyCommand.append("TK").append(","); String replyCommandStr = replyCommand.toString(); StringBuilder replyCommand2 = new StringBuilder(); replyCommand2.append("]"); String replyCommandStr2 = replyCommand2.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length + bytes.length + replyCommandStr2.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); buffer.writeBytes(bytes); buffer.writeBytes(replyCommandStr2.getBytes()); buffer.readerIndex(0); return buffer; } /** * 主动回复 * * @param channel * @content 获取下一条待发送终端语音 [3G*YYYYYYYYYY*LEN*TK,AMR 格式音频数据] */ private void normalReply(Channel channel, VoiceMsgVo voiceMsg, String msg) { StringBuilder replyCommand = new StringBuilder(); String message = String.valueOf(msg); int startIndex = message.indexOf("["); int endIndex = message.indexOf("]"); String data = message.substring(startIndex + 1, endIndex); String[] bodys = data.split("\\*"); replyCommand.append("["); replyCommand.append("3G").append("*"); String deviceId = voiceMsg.getDeviceId(); // if (deviceId.equals("9513532727")) { // deviceId = "9513532780"; // } // deviceId = "9513532727"; replyCommand.append(deviceId).append("*"); replyCommand .append(bodys[2]) .append("*"); replyCommand.append("TK").append(","); String replyCommandStr = replyCommand.toString(); StringBuilder replyCommand2 = new StringBuilder(); replyCommand2.append("]"); String replyCommandStr2 = replyCommand2.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length + voiceMsg.getMsg().length + replyCommandStr2.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); buffer.writeBytes(voiceMsg.getMsg()); buffer.writeBytes(replyCommandStr2.getBytes()); buffer.readerIndex(0); byte[] bs = new byte[buffer.readableBytes()]; buffer.readBytes(bs); buffer.readerIndex(0); channel = deviceChannelMap.get(deviceId); if (channel == null || !channel.isActive()) { deviceChannelMap.remove(deviceId); logger.warn("the channle of device[{}] has closed", deviceId); return; } ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg()) + replyCommandStr2)); } protected static String getLgZoneTime() { StringBuilder sb = new StringBuilder("LZ,1,8"); return sb.toString(); } protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) { ByteBuf dataByteBufCopy = dataByteBuf.copy(); byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()]; dataByteBufCopy.readBytes(dataByteArray); dataByteBufCopy.release(); } /** * 设置终端 上报频率 * * @param advice * @param channel */ protected void setUpload(Advice advice, Channel channel) { // 上报频率 一分钟一次 String content = "UPLOAD,1"; Date loginTime = new Date(); executorService.execute(new Runnable() { public void run() { Date currentTime = new Date(); try { Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000; if (secondsSinceLogin < 5L) { TimeUnit.SECONDS.sleep(5 - secondsSinceLogin); } normalReply(advice, channel, content); } catch (InterruptedException e) { logger.error(e.getMessage()); } } }); } /** * 回复 * [3G*YYYYYYYYYY*LEN*LK] * * @param advice * @param channel * @content 回复内容 */ private void normalReply(Advice advice, Channel channel, String content) { String factory = advice.getFacotry(); String deviceId = advice.getDeviceId(); StringBuilder replyCommand = new StringBuilder(); replyCommand.append("["); // 拼接厂商标识 replyCommand.append(factory).append("*"); // 拼接设备标识 replyCommand.append(deviceId).append("*"); // 内容长度 replyCommand.append(numToHex16(content.length())).append("*"); // 指令内容标记 replyCommand.append(content); replyCommand.append("]"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } //使用1字节就可以表示b public static String numToHex8(int b) { return String.format("%02x", b);//2表示需要两个16进行数 } //需要使用2字节表示b public static String numToHex16(int b) { return String.format("%04x", b); } public static String numToHex32(int b) { return String.format("%08x", b); } /** * 解析 Advice 数据 * [厂商*设备 ID*内容长度*内容] * @param msg * @return */ private Advice setAdvice(String msg) { Advice advice = new Advice(); if (msg != null) { try { //厂商标识 设备ID 内容长度 指令内容 String message = String.valueOf(msg); int startIndex = message.indexOf("["); int endIndex = message.indexOf("]"); String data = message.substring(startIndex + 1, endIndex); String[] bodys = data.split("\\*"); // 厂商 advice.setFacotry(bodys[0]); // 设备Id advice.setDeviceId(bodys[1]); // 指令长度 advice.setAdvicelength(bodys[2]); // 获取内容 String[] contents = bodys[3].split(","); // 标识符 advice.setAdviceType(contents[0]); //指令流水字段被用作判断音频是否接受成功 if (contents.length == 2) { advice.setAdviceSerial(contents[1]); } return advice; } catch (Exception e) { logger.error(e.getMessage(), e); } } return null; } @Override public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter)); ch.pipeline().addLast(JingWeiCardServerHandler.this); } }); ChannelFuture f = b.bind(port).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (Exception ex) { logger.warn(ex.getMessage(), ex); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }