123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- package com.tidecloud.dataacceptance.service.impl;
- import com.accept.client.DeviceMsgClient;
- import com.accept.client.VoiceMsgClient;
- import com.accept.model.VoiceMsgVo;
- import com.alibaba.fastjson.JSON;
- import com.tidecloud.dataacceptance.common.DateUtil;
- import com.tidecloud.dataacceptance.entity.Advice;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.DelimiterBasedFrameDecoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.util.ByteProcessor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * 经纬AS9智能 GPS 定位终端
- *
- */
- @Sharable
- @Scope("prototype")
- @Component(JingWeiCardServerHandler.name)
- public class JingWeiCardServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- public static final String name = "JingWeiCardServerHandler";
- private static final Logger logger = LoggerFactory.getLogger(JingWeiCardServerHandler.class);
- private static ExecutorService executorService = Executors.newSingleThreadExecutor();
- private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
- private static Map<String, Channel> deviceChannelMap = new HashMap<>();
- @Autowired
- private DeviceMsgClient deviceMsgClient;
- @Autowired
- private VoiceMsgClient voiceMsgClient;
- @Override
- protected void handle(ByteBuf in, Channel channel) throws Exception {
- byte[] req = new byte[in.readableBytes()];
- in.readBytes(req);
- String msg = new String(req, "UTF-8");
- System.out.println(req.length);
- logger.info("传入数据:》》》》》》》》》" + msg);
- Advice advice = setAdvice(msg);
- if (advice == null) {
- return;
- }
- //服务器发送语音数据
- voiceReplyToClient(advice.getDeviceId(), channel);
- switch (advice.getAdviceType()) {
- case "KA": // 链路保持
- sendMsg2Kafka((msg + DateUtil.formatDate2String(new Date())).getBytes(), advice.getDeviceId(), channel);
- // 这里需要写死成这样,协议如此规定
- advice.setFacotry("IC");
- normalReply(advice, channel, "KA");
- deviceChannelMap.put(advice.getDeviceId(), channel);
- deviceMsgClient.acceptDeviceMsgParam("JingWei",
- advice.getDeviceId(),
- 1, msg, System.currentTimeMillis());
- break;
- case "UD": // 位置信息
- sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
- logger.info("位置数据上报[UD]:" + advice.toString());
- deviceMsgClient.acceptDeviceMsgParam("JingWei",
- advice.getDeviceId(),
- 2, msg, System.currentTimeMillis());
- break;
- case "UD2": // 盲点补传数据 位置数据
- logger.info("盲点补传数据[UD2]:" + advice.toString());
- sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
- break;
- case "TKQ": // 终端请求语音下发
- logger.info("终端请求语音下发[TKQ]:" + advice.toString());
- normalReply(advice, channel, "TKQ");
- break;
- case "TKQ2": // 终端请求好友语音下发
- logger.info("终端请求好友语音下发[TKQ2]:" + advice.toString());
- normalReply(advice, channel, "TKQ2");
- break;
- case "TK": // 微聊对讲终端发送内容
- if (advice.getAdviceSerial().equals("1")){
- logger.info("发送对讲数据成功!");
- // 调用接口 查询回复内容
- // VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId());
- voiceMsgClient.updateVoiceMsgSendFinish(advice.getDeviceId(), 1, 4);
- break;
- } else {
- logger.info("微聊对讲数据[TK]:" + advice.toString());
- VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId(), req);
- voiceMsgClient.insVoiceMsg(msgVo);
- normalReply(advice, channel, "TK,1");
- }
- break;
- case "UPLOAD": //数据上传间隔设置
- logger.info("数据上传间隔设置[UPLOAD]");
- setUpload(advice,channel);
- break;
- case "LZ": // 设置语言和时区
- logger.info("设置语言和时区[LZ]:" + advice.toString());
- normalReply(advice, channel, getLgZoneTime());
- break;
- default: // 其他
- logger.info("client send data without handle type ...");
- break;
- }
- }
- //[3G*YYYYYYYYYY*LEN*TK,AMR 格式音频数据]
- protected VoiceMsgVo splitVoiceMsg(String msg, String deviceId, byte[] req) {
- int length = req.length - 24;
- byte[] bs = new byte[length];
- System.arraycopy(req, 23, bs, 0, length);
- VoiceMsgVo voiceMsg = new VoiceMsgVo();
- SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");//设置日期格式
- String time = df.format(new Date());
- voiceMsg.setDeviceId(deviceId);
- voiceMsg.setTotal(1);
- voiceMsg.setNu(1);
- voiceMsg.setVoiceTime(time);
- voiceMsg.setMsg(bs);
- voiceMsg.setMsg(bs);
- return voiceMsg;
- }
- /**
- * 查询服务器传来的语音包并回复
- *
- * @param deviceId
- * @param channel
- */
- protected void voiceReplyToClient(String deviceId, Channel channel) {
- try {
- VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
- if (voiceF.getLag() == 1) {
- normalReply2(channel, voiceF,"");
- //发送中的数据
- voiceMsgClient.updateVoiceMsgSendFinish(deviceId, voiceF.getMsgId(), 4);
- }
- } catch (Exception e) {
- logger.error("语音下行发送异常!!!!! deviceId=" + deviceId);
- }
- }
- private void normalReply2(Channel channel, VoiceMsgVo voiceMsg, String msg) {
- ByteBuf buffer = buildVoiceToClientBytebuf(voiceMsg, msg);
- Integer msgId = voiceMsg.getMsgId();
- String deviceId = voiceMsg.getDeviceId();
- sendVoiceToDevice(channel, buffer, msgId, deviceId);
- }
- public boolean sendVoiceToDevice(String deviceId, byte[] bytes) {
- Channel channel = deviceChannelMap.get(deviceId);
- if (channel == null) {
- logger.warn("the device[{}] is offline and send cancled", deviceId);
- return false;
- }
- ByteBuf byteBuf = buildVoiceToClientBytebuf(deviceId, bytes);
- sendVoiceToDevice(channel, byteBuf,0, deviceId);
- return true;
- }
- private void sendVoiceToDevice(Channel channel, ByteBuf buffer, Integer msgId, String deviceId) {
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future ->
- {
- logger.info("send voice[{}] to client[{}]", msgId, deviceId);
- }
- );
- }
- private ByteBuf buildVoiceToClientBytebuf(VoiceMsgVo voiceMsg, String msg) {
- String deviceId = voiceMsg.getDeviceId();
- byte[] bytes = voiceMsg.getMsg();
- return buildVoiceToClientBytebuf(deviceId, bytes);
- }
- private ByteBuf buildVoiceToClientBytebuf(String deviceId, byte[] bytes) {
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("[");
- replyCommand.append("3G").append("*");
- replyCommand.append(deviceId).append("*");
- replyCommand
- .append(numToHex16(bytes.length + 3))
- .append("*");
- replyCommand.append("TK").append(",");
- String replyCommandStr = replyCommand.toString();
- StringBuilder replyCommand2 = new StringBuilder();
- replyCommand2.append("]");
- String replyCommandStr2 = replyCommand2.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length + bytes.length + replyCommandStr2.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- buffer.writeBytes(bytes);
- buffer.writeBytes(replyCommandStr2.getBytes());
- buffer.readerIndex(0);
- return buffer;
- }
- /**
- * 主动回复
- *
- * @param channel
- * @content 获取下一条待发送终端语音 [3G*YYYYYYYYYY*LEN*TK,AMR 格式音频数据]
- */
- private void normalReply(Channel channel, VoiceMsgVo voiceMsg, String msg) {
- StringBuilder replyCommand = new StringBuilder();
- String message = String.valueOf(msg);
- int startIndex = message.indexOf("[");
- int endIndex = message.indexOf("]");
- String data = message.substring(startIndex + 1, endIndex);
- String[] bodys = data.split("\\*");
- replyCommand.append("[");
- replyCommand.append("3G").append("*");
- String deviceId = voiceMsg.getDeviceId();
- // if (deviceId.equals("9513532727")) {
- // deviceId = "9513532780";
- // }
- // deviceId = "9513532727";
- replyCommand.append(deviceId).append("*");
- replyCommand
- .append(bodys[2])
- .append("*");
- replyCommand.append("TK").append(",");
- String replyCommandStr = replyCommand.toString();
- StringBuilder replyCommand2 = new StringBuilder();
- replyCommand2.append("]");
- String replyCommandStr2 = replyCommand2.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length + voiceMsg.getMsg().length + replyCommandStr2.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- buffer.writeBytes(voiceMsg.getMsg());
- buffer.writeBytes(replyCommandStr2.getBytes());
- buffer.readerIndex(0);
- byte[] bs = new byte[buffer.readableBytes()];
- buffer.readBytes(bs);
- buffer.readerIndex(0);
- channel = deviceChannelMap.get(deviceId);
- if (channel == null || !channel.isActive()) {
- deviceChannelMap.remove(deviceId);
- logger.warn("the channle of device[{}] has closed", deviceId);
- return;
- }
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg()) + replyCommandStr2));
- }
- protected static String getLgZoneTime() {
- StringBuilder sb = new StringBuilder("LZ,1,8");
- return sb.toString();
- }
- protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
- ByteBuf dataByteBufCopy = dataByteBuf.copy();
- byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
- dataByteBufCopy.readBytes(dataByteArray);
- dataByteBufCopy.release();
- }
- /**
- * 设置终端 上报频率
- *
- * @param advice
- * @param channel
- */
- protected void setUpload(Advice advice, Channel channel) {
- // 上报频率 一分钟一次
- String content = "UPLOAD,1";
- Date loginTime = new Date();
- executorService.execute(new Runnable() {
- public void run() {
- Date currentTime = new Date();
- try {
- Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
- if (secondsSinceLogin < 5L) {
- TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
- }
- normalReply(advice, channel, content);
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
- }
- });
- }
- /**
- * 回复
- * [3G*YYYYYYYYYY*LEN*LK]
- *
- * @param advice
- * @param channel
- * @content 回复内容
- */
- private void normalReply(Advice advice, Channel channel, String content) {
- String factory = advice.getFacotry();
- String deviceId = advice.getDeviceId();
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("[");
- // 拼接厂商标识
- replyCommand.append(factory).append("*");
- // 拼接设备标识
- replyCommand.append(deviceId).append("*");
- // 内容长度
- replyCommand.append(numToHex16(content.length())).append("*");
- // 指令内容标记
- replyCommand.append(content);
- replyCommand.append("]");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
- }
- //使用1字节就可以表示b
- public static String numToHex8(int b) {
- return String.format("%02x", b);//2表示需要两个16进行数
- }
- //需要使用2字节表示b
- public static String numToHex16(int b) {
- return String.format("%04x", b);
- }
- public static String numToHex32(int b) {
- return String.format("%08x", b);
- }
- /**
- * 解析 Advice 数据
- * [厂商*设备 ID*内容长度*内容]
- * @param msg
- * @return
- */
- private Advice setAdvice(String msg) {
- Advice advice = new Advice();
- if (msg != null) {
- try {
- //厂商标识 设备ID 内容长度 指令内容
- String message = String.valueOf(msg);
- int startIndex = message.indexOf("[");
- int endIndex = message.indexOf("]");
- String data = message.substring(startIndex + 1, endIndex);
- String[] bodys = data.split("\\*");
- // 厂商
- advice.setFacotry(bodys[0]);
- // 设备Id
- advice.setDeviceId(bodys[1]);
- // 指令长度
- advice.setAdvicelength(bodys[2]);
- // 获取内容
- String[] contents = bodys[3].split(",");
- // 标识符
- advice.setAdviceType(contents[0]);
- //指令流水字段被用作判断音频是否接受成功
- if (contents.length == 2) {
- advice.setAdviceSerial(contents[1]);
- }
- return advice;
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- return null;
- }
- @Override
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes());
- ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
- ch.pipeline().addLast(JingWeiCardServerHandler.this);
- }
- });
- ChannelFuture f = b.bind(port).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (Exception ex) {
- logger.warn(ex.getMessage(), ex);
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- }
|