JingWeiCardServerHandler.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.accept.client.DeviceMsgClient;
  3. import com.accept.client.VoiceMsgClient;
  4. import com.accept.model.VoiceMsgVo;
  5. import com.alibaba.fastjson.JSON;
  6. import com.tidecloud.dataacceptance.common.DateUtil;
  7. import com.tidecloud.dataacceptance.entity.Advice;
  8. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  9. import io.netty.bootstrap.ServerBootstrap;
  10. import io.netty.buffer.ByteBuf;
  11. import io.netty.buffer.Unpooled;
  12. import io.netty.channel.*;
  13. import io.netty.channel.ChannelHandler.Sharable;
  14. import io.netty.channel.nio.NioEventLoopGroup;
  15. import io.netty.channel.socket.SocketChannel;
  16. import io.netty.channel.socket.nio.NioServerSocketChannel;
  17. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  18. import io.netty.handler.logging.LogLevel;
  19. import io.netty.handler.logging.LoggingHandler;
  20. import io.netty.util.ByteProcessor;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.context.annotation.Scope;
  25. import org.springframework.stereotype.Component;
  26. import java.text.SimpleDateFormat;
  27. import java.util.Date;
  28. import java.util.HashMap;
  29. import java.util.Map;
  30. import java.util.concurrent.ExecutorService;
  31. import java.util.concurrent.Executors;
  32. import java.util.concurrent.TimeUnit;
  33. /**
  34. *
  35. * 经纬AS9智能 GPS 定位终端
  36. *
  37. */
  38. @Sharable
  39. @Scope("prototype")
  40. @Component(JingWeiCardServerHandler.name)
  41. public class JingWeiCardServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  42. public static final String name = "JingWeiCardServerHandler";
  43. private static final Logger logger = LoggerFactory.getLogger(JingWeiCardServerHandler.class);
  44. private static ExecutorService executorService = Executors.newSingleThreadExecutor();
  45. private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
  46. private static Map<String, Channel> deviceChannelMap = new HashMap<>();
  47. @Autowired
  48. private DeviceMsgClient deviceMsgClient;
  49. @Autowired
  50. private VoiceMsgClient voiceMsgClient;
  51. @Override
  52. protected void handle(ByteBuf in, Channel channel) throws Exception {
  53. byte[] req = new byte[in.readableBytes()];
  54. in.readBytes(req);
  55. String msg = new String(req, "UTF-8");
  56. System.out.println(req.length);
  57. logger.info("传入数据:》》》》》》》》》" + msg);
  58. Advice advice = setAdvice(msg);
  59. if (advice == null) {
  60. return;
  61. }
  62. //服务器发送语音数据
  63. voiceReplyToClient(advice.getDeviceId(), channel);
  64. switch (advice.getAdviceType()) {
  65. case "KA": // 链路保持
  66. sendMsg2Kafka((msg + DateUtil.formatDate2String(new Date())).getBytes(), advice.getDeviceId(), channel);
  67. // 这里需要写死成这样,协议如此规定
  68. advice.setFacotry("IC");
  69. normalReply(advice, channel, "KA");
  70. deviceChannelMap.put(advice.getDeviceId(), channel);
  71. deviceMsgClient.acceptDeviceMsgParam("JingWei",
  72. advice.getDeviceId(),
  73. 1, msg, System.currentTimeMillis());
  74. break;
  75. case "UD": // 位置信息
  76. sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
  77. logger.info("位置数据上报[UD]:" + advice.toString());
  78. deviceMsgClient.acceptDeviceMsgParam("JingWei",
  79. advice.getDeviceId(),
  80. 2, msg, System.currentTimeMillis());
  81. break;
  82. case "UD2": // 盲点补传数据 位置数据
  83. logger.info("盲点补传数据[UD2]:" + advice.toString());
  84. sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
  85. break;
  86. case "TKQ": // 终端请求语音下发
  87. logger.info("终端请求语音下发[TKQ]:" + advice.toString());
  88. normalReply(advice, channel, "TKQ");
  89. break;
  90. case "TKQ2": // 终端请求好友语音下发
  91. logger.info("终端请求好友语音下发[TKQ2]:" + advice.toString());
  92. normalReply(advice, channel, "TKQ2");
  93. break;
  94. case "TK": // 微聊对讲终端发送内容
  95. if (advice.getAdviceSerial().equals("1")){
  96. logger.info("发送对讲数据成功!");
  97. // 调用接口 查询回复内容
  98. // VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId());
  99. voiceMsgClient.updateVoiceMsgSendFinish(advice.getDeviceId(), 1, 4);
  100. break;
  101. } else {
  102. logger.info("微聊对讲数据[TK]:" + advice.toString());
  103. VoiceMsgVo msgVo = splitVoiceMsg(msg, advice.getDeviceId(), req);
  104. voiceMsgClient.insVoiceMsg(msgVo);
  105. normalReply(advice, channel, "TK,1");
  106. }
  107. break;
  108. case "UPLOAD": //数据上传间隔设置
  109. logger.info("数据上传间隔设置[UPLOAD]");
  110. setUpload(advice,channel);
  111. break;
  112. case "LZ": // 设置语言和时区
  113. logger.info("设置语言和时区[LZ]:" + advice.toString());
  114. normalReply(advice, channel, getLgZoneTime());
  115. break;
  116. default: // 其他
  117. logger.info("client send data without handle type ...");
  118. break;
  119. }
  120. }
  121. //[3G*YYYYYYYYYY*LEN*TK,AMR 格式音频数据]
  122. protected VoiceMsgVo splitVoiceMsg(String msg, String deviceId, byte[] req) {
  123. int length = req.length - 24;
  124. byte[] bs = new byte[length];
  125. System.arraycopy(req, 23, bs, 0, length);
  126. VoiceMsgVo voiceMsg = new VoiceMsgVo();
  127. SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");//设置日期格式
  128. String time = df.format(new Date());
  129. voiceMsg.setDeviceId(deviceId);
  130. voiceMsg.setTotal(1);
  131. voiceMsg.setNu(1);
  132. voiceMsg.setVoiceTime(time);
  133. voiceMsg.setMsg(bs);
  134. voiceMsg.setMsg(bs);
  135. return voiceMsg;
  136. }
  137. /**
  138. * 查询服务器传来的语音包并回复
  139. *
  140. * @param deviceId
  141. * @param channel
  142. */
  143. protected void voiceReplyToClient(String deviceId, Channel channel) {
  144. try {
  145. VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
  146. if (voiceF.getLag() == 1) {
  147. normalReply2(channel, voiceF,"");
  148. //发送中的数据
  149. voiceMsgClient.updateVoiceMsgSendFinish(deviceId, voiceF.getMsgId(), 4);
  150. }
  151. } catch (Exception e) {
  152. logger.error("语音下行发送异常!!!!! deviceId=" + deviceId);
  153. }
  154. }
  155. private void normalReply2(Channel channel, VoiceMsgVo voiceMsg, String msg) {
  156. ByteBuf buffer = buildVoiceToClientBytebuf(voiceMsg, msg);
  157. Integer msgId = voiceMsg.getMsgId();
  158. String deviceId = voiceMsg.getDeviceId();
  159. sendVoiceToDevice(channel, buffer, msgId, deviceId);
  160. }
  161. public boolean sendVoiceToDevice(String deviceId, byte[] bytes) {
  162. Channel channel = deviceChannelMap.get(deviceId);
  163. if (channel == null) {
  164. logger.warn("the device[{}] is offline and send cancled", deviceId);
  165. return false;
  166. }
  167. ByteBuf byteBuf = buildVoiceToClientBytebuf(deviceId, bytes);
  168. sendVoiceToDevice(channel, byteBuf,0, deviceId);
  169. return true;
  170. }
  171. private void sendVoiceToDevice(Channel channel, ByteBuf buffer, Integer msgId, String deviceId) {
  172. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  173. channelFuture.addListener(future ->
  174. {
  175. logger.info("send voice[{}] to client[{}]", msgId, deviceId);
  176. }
  177. );
  178. }
  179. private ByteBuf buildVoiceToClientBytebuf(VoiceMsgVo voiceMsg, String msg) {
  180. String deviceId = voiceMsg.getDeviceId();
  181. byte[] bytes = voiceMsg.getMsg();
  182. return buildVoiceToClientBytebuf(deviceId, bytes);
  183. }
  184. private ByteBuf buildVoiceToClientBytebuf(String deviceId, byte[] bytes) {
  185. StringBuilder replyCommand = new StringBuilder();
  186. replyCommand.append("[");
  187. replyCommand.append("3G").append("*");
  188. replyCommand.append(deviceId).append("*");
  189. replyCommand
  190. .append(numToHex16(bytes.length + 3))
  191. .append("*");
  192. replyCommand.append("TK").append(",");
  193. String replyCommandStr = replyCommand.toString();
  194. StringBuilder replyCommand2 = new StringBuilder();
  195. replyCommand2.append("]");
  196. String replyCommandStr2 = replyCommand2.toString();
  197. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length + bytes.length + replyCommandStr2.getBytes().length);
  198. buffer.writeBytes(replyCommandStr.getBytes());
  199. buffer.writeBytes(bytes);
  200. buffer.writeBytes(replyCommandStr2.getBytes());
  201. buffer.readerIndex(0);
  202. return buffer;
  203. }
  204. /**
  205. * 主动回复
  206. *
  207. * @param channel
  208. * @content 获取下一条待发送终端语音 [3G*YYYYYYYYYY*LEN*TK,AMR 格式音频数据]
  209. */
  210. private void normalReply(Channel channel, VoiceMsgVo voiceMsg, String msg) {
  211. StringBuilder replyCommand = new StringBuilder();
  212. String message = String.valueOf(msg);
  213. int startIndex = message.indexOf("[");
  214. int endIndex = message.indexOf("]");
  215. String data = message.substring(startIndex + 1, endIndex);
  216. String[] bodys = data.split("\\*");
  217. replyCommand.append("[");
  218. replyCommand.append("3G").append("*");
  219. String deviceId = voiceMsg.getDeviceId();
  220. // if (deviceId.equals("9513532727")) {
  221. // deviceId = "9513532780";
  222. // }
  223. // deviceId = "9513532727";
  224. replyCommand.append(deviceId).append("*");
  225. replyCommand
  226. .append(bodys[2])
  227. .append("*");
  228. replyCommand.append("TK").append(",");
  229. String replyCommandStr = replyCommand.toString();
  230. StringBuilder replyCommand2 = new StringBuilder();
  231. replyCommand2.append("]");
  232. String replyCommandStr2 = replyCommand2.toString();
  233. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length + voiceMsg.getMsg().length + replyCommandStr2.getBytes().length);
  234. buffer.writeBytes(replyCommandStr.getBytes());
  235. buffer.writeBytes(voiceMsg.getMsg());
  236. buffer.writeBytes(replyCommandStr2.getBytes());
  237. buffer.readerIndex(0);
  238. byte[] bs = new byte[buffer.readableBytes()];
  239. buffer.readBytes(bs);
  240. buffer.readerIndex(0);
  241. channel = deviceChannelMap.get(deviceId);
  242. if (channel == null || !channel.isActive()) {
  243. deviceChannelMap.remove(deviceId);
  244. logger.warn("the channle of device[{}] has closed", deviceId);
  245. return;
  246. }
  247. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  248. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg()) + replyCommandStr2));
  249. }
  250. protected static String getLgZoneTime() {
  251. StringBuilder sb = new StringBuilder("LZ,1,8");
  252. return sb.toString();
  253. }
  254. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  255. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  256. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  257. dataByteBufCopy.readBytes(dataByteArray);
  258. dataByteBufCopy.release();
  259. }
  260. /**
  261. * 设置终端 上报频率
  262. *
  263. * @param advice
  264. * @param channel
  265. */
  266. protected void setUpload(Advice advice, Channel channel) {
  267. // 上报频率 一分钟一次
  268. String content = "UPLOAD,1";
  269. Date loginTime = new Date();
  270. executorService.execute(new Runnable() {
  271. public void run() {
  272. Date currentTime = new Date();
  273. try {
  274. Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
  275. if (secondsSinceLogin < 5L) {
  276. TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
  277. }
  278. normalReply(advice, channel, content);
  279. } catch (InterruptedException e) {
  280. logger.error(e.getMessage());
  281. }
  282. }
  283. });
  284. }
  285. /**
  286. * 回复
  287. * [3G*YYYYYYYYYY*LEN*LK]
  288. *
  289. * @param advice
  290. * @param channel
  291. * @content 回复内容
  292. */
  293. private void normalReply(Advice advice, Channel channel, String content) {
  294. String factory = advice.getFacotry();
  295. String deviceId = advice.getDeviceId();
  296. StringBuilder replyCommand = new StringBuilder();
  297. replyCommand.append("[");
  298. // 拼接厂商标识
  299. replyCommand.append(factory).append("*");
  300. // 拼接设备标识
  301. replyCommand.append(deviceId).append("*");
  302. // 内容长度
  303. replyCommand.append(numToHex16(content.length())).append("*");
  304. // 指令内容标记
  305. replyCommand.append(content);
  306. replyCommand.append("]");
  307. String replyCommandStr = replyCommand.toString();
  308. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  309. buffer.writeBytes(replyCommandStr.getBytes());
  310. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  311. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  312. }
  313. //使用1字节就可以表示b
  314. public static String numToHex8(int b) {
  315. return String.format("%02x", b);//2表示需要两个16进行数
  316. }
  317. //需要使用2字节表示b
  318. public static String numToHex16(int b) {
  319. return String.format("%04x", b);
  320. }
  321. public static String numToHex32(int b) {
  322. return String.format("%08x", b);
  323. }
  324. /**
  325. * 解析 Advice 数据
  326. * [厂商*设备 ID*内容长度*内容]
  327. * @param msg
  328. * @return
  329. */
  330. private Advice setAdvice(String msg) {
  331. Advice advice = new Advice();
  332. if (msg != null) {
  333. try {
  334. //厂商标识 设备ID 内容长度 指令内容
  335. String message = String.valueOf(msg);
  336. int startIndex = message.indexOf("[");
  337. int endIndex = message.indexOf("]");
  338. String data = message.substring(startIndex + 1, endIndex);
  339. String[] bodys = data.split("\\*");
  340. // 厂商
  341. advice.setFacotry(bodys[0]);
  342. // 设备Id
  343. advice.setDeviceId(bodys[1]);
  344. // 指令长度
  345. advice.setAdvicelength(bodys[2]);
  346. // 获取内容
  347. String[] contents = bodys[3].split(",");
  348. // 标识符
  349. advice.setAdviceType(contents[0]);
  350. //指令流水字段被用作判断音频是否接受成功
  351. if (contents.length == 2) {
  352. advice.setAdviceSerial(contents[1]);
  353. }
  354. return advice;
  355. } catch (Exception e) {
  356. logger.error(e.getMessage(), e);
  357. }
  358. }
  359. return null;
  360. }
  361. @Override
  362. public void startAcceptor() {
  363. EventLoopGroup bossGroup = new NioEventLoopGroup();
  364. EventLoopGroup workerGroup = new NioEventLoopGroup();
  365. try {
  366. ServerBootstrap b = new ServerBootstrap();
  367. b.group(bossGroup, workerGroup)
  368. .channel(NioServerSocketChannel.class)
  369. .option(ChannelOption.SO_BACKLOG, 1024)
  370. .handler(new LoggingHandler(LogLevel.INFO))
  371. .childHandler(new ChannelInitializer<SocketChannel>() {
  372. @Override
  373. protected void initChannel(SocketChannel ch) {
  374. ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes());
  375. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
  376. ch.pipeline().addLast(JingWeiCardServerHandler.this);
  377. }
  378. });
  379. ChannelFuture f = b.bind(port).sync();
  380. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  381. this.getPort());
  382. f.channel().closeFuture().sync();
  383. } catch (Exception ex) {
  384. logger.warn(ex.getMessage(), ex);
  385. } finally {
  386. cleanRedisLinkData();
  387. workerGroup.shutdownGracefully();
  388. bossGroup.shutdownGracefully();
  389. }
  390. }
  391. }