WatchJWServerHandler.java 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.accept.client.DeviceCronClient;
  3. import com.accept.client.VoiceMsgClient;
  4. import com.accept.model.VoiceMsgVo;
  5. import com.alibaba.fastjson.JSON;
  6. import com.tidecloud.dataacceptance.common.DateUtil;
  7. import com.tidecloud.dataacceptance.service.DelimiterJingWeiFrameDecoder;
  8. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  9. import io.netty.bootstrap.ServerBootstrap;
  10. import io.netty.buffer.ByteBuf;
  11. import io.netty.buffer.Unpooled;
  12. import io.netty.channel.*;
  13. import io.netty.channel.ChannelHandler.Sharable;
  14. import io.netty.channel.nio.NioEventLoopGroup;
  15. import io.netty.channel.socket.SocketChannel;
  16. import io.netty.channel.socket.nio.NioServerSocketChannel;
  17. import io.netty.handler.logging.LogLevel;
  18. import io.netty.handler.logging.LoggingHandler;
  19. import io.netty.util.ByteProcessor;
  20. import org.slf4j.Logger;
  21. import org.slf4j.LoggerFactory;
  22. import org.slf4j.MDC;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.context.annotation.Scope;
  25. import org.springframework.stereotype.Component;
  26. import java.text.DateFormat;
  27. import java.text.SimpleDateFormat;
  28. import java.util.Date;
  29. import java.util.Map;
  30. import java.util.concurrent.ConcurrentHashMap;
  31. import java.util.concurrent.ExecutorService;
  32. import java.util.concurrent.Executors;
  33. import java.util.concurrent.TimeUnit;
  34. /**
  35. * Created by jhw on 2018/7/20.
  36. */
  37. @Sharable
  38. @Scope("prototype")
  39. @Component(WatchJWServerHandler.name)
  40. public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  41. public static final String name = "WatchJWServerHandler";
  42. private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
  43. private static ExecutorService executorService = Executors.newSingleThreadExecutor();
  44. private static final Long INTERVAL_TIME = 300000L; // 开关时间
  45. private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
  46. /**
  47. * 紧急模式
  48. */
  49. private static final Integer URGENCY = 3;
  50. /**
  51. * 其他模式
  52. */
  53. private static final Integer OTHER = 2;
  54. /**
  55. * 省电模式开关
  56. */
  57. private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
  58. /**
  59. * 设置模式
  60. */
  61. private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
  62. /**
  63. * 心率
  64. */
  65. private static Map<String, Integer> beatMap = new ConcurrentHashMap<>();
  66. private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
  67. @Autowired
  68. private VoiceMsgClient voiceMsgClient;
  69. @Autowired
  70. private DeviceCronClient deviceCronClient;
  71. @Override
  72. protected void handle(ByteBuf in, Channel channel) throws Exception {
  73. //String msg = byteBufferToString(in.nioBuffer());TODO (注意)
  74. byte[] req = new byte[in.readableBytes()];
  75. in.readBytes(req);
  76. String msg = new String(req, "UTF-8");
  77. try {
  78. String deviceId = channelDeviceMap.get(channel);
  79. String factory = msg.substring(0, 2);// 工厂
  80. String type = msg.substring(2, 6);// 标记
  81. if (deviceId != null) {
  82. MDC.put(MDC_DEVICEID, deviceId);
  83. } else {
  84. logger.info("该手表没有登录:传入数据为" + msg);
  85. if (!"AP00".equals(type)) {
  86. logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
  87. channel.close();
  88. return;
  89. }
  90. }
  91. logger.info("传入数据为:" + msg);
  92. Long time = System.currentTimeMillis();
  93. WorkModel workModel = null;
  94. SwitchWorkModel swm = null;
  95. if (deviceId != null) {
  96. workModel = modelMap.get(deviceId);//已登录
  97. swm = switchMap.get(deviceId);// 开关
  98. if (workModel != null && workModel.getUrgentType() == OTHER
  99. && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
  100. workModel.setUrgentTime(time);
  101. workModel.setUrgentType(OTHER);
  102. modelMap.put(deviceId, workModel);
  103. normalReplyModel(factory, deviceId, channel, 3);
  104. logger.warn("超过指定时间没有收到回复》》》 重新设置");
  105. // 更新开关切换时间
  106. swm.setSwitchTime(time);
  107. switchMap.put(deviceId, swm);
  108. }
  109. voiceReplyToClient(deviceId, channel);//语音下行发送
  110. }
  111. switch (type) {
  112. case "AP00": // 初始化登录
  113. resolveLoginMSG(msg, channel);
  114. break;
  115. case "AP03": // 连接(心跳) BP03#
  116. normalReply(factory, channel, "BP03");
  117. checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
  118. sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
  119. logger.warn("心跳报文下发 msg+" + msg);
  120. sendApthReplyToClient(deviceId, channel);
  121. break;
  122. case "AP01": // 位置信息
  123. String gpsState = msg.substring(12, 13);
  124. // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
  125. setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
  126. normalReply(factory, channel, "BP01");
  127. break;
  128. case "AP33": // 设置模式回复
  129. //IWAP33,080835,1# IWAP33,080835,03#
  130. if (deviceId != null) {
  131. Integer moderType = getInteger(msg.substring(14, msg.indexOf("#")));// 收到回复状态
  132. if (swm.getWorkType() == URGENCY) {// 当前设置的模式
  133. if (workModel != null && moderType == URGENCY) {
  134. workModel.setUrgentType(URGENCY);
  135. logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
  136. } else {
  137. logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
  138. Thread.sleep(1000);
  139. normalReplyModel(factory, deviceId, channel, URGENCY);
  140. workModel.setUrgentType(OTHER);
  141. // 更新开关时间
  142. swm.setSwitchTime(time);
  143. switchMap.put(deviceId, swm);
  144. }
  145. modelMap.put(deviceId, workModel);
  146. }
  147. }
  148. break;
  149. case "APHT": // 心率测量
  150. sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
  151. normalReply(factory, channel, "BPHT");
  152. if (beatMap.containsKey(deviceId)) {
  153. beatMap.put(deviceId, 2);
  154. logger.warn("标记APHT=2参数状态变更deviceId=" + deviceId);
  155. }
  156. break;
  157. case "AP49": // 心率上传 IWAP49,68#
  158. String heatBeat = msg.substring(7, 9);// 工厂
  159. StringBuffer sb = new StringBuffer("IWAPHT,");// 伪装 APHT 一样的数据格式
  160. sb.append(heatBeat).append(",").append("0,0,#");
  161. sendMsg2Kafka((sb.toString() + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
  162. normalReply(factory, channel, "BP49");
  163. if (beatMap.containsKey(deviceId)) {
  164. beatMap.put(deviceId, 2);
  165. logger.warn("标记AP49=2参数状态变更deviceId=" + deviceId);
  166. }
  167. break;
  168. case "APXL": // 心率主动下发,终端回复
  169. beatMap.put(deviceId, 1);// 标记下发成功
  170. logger.warn("标记APXL = 1参数状态变更deviceId=" + deviceId);
  171. break;
  172. case "AP07": // 语音上传
  173. // TODO 设置语音上传 IWAP07,20140818064408,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
  174. VoiceMsgVo ap07Msg = splitAP07VoiceMsg(msg, deviceId, in);
  175. // 调用接口 写库
  176. VoiceMsgVo vo = voiceMsgClient.insVoiceMsg(ap07Msg);
  177. normalVoiceReply(channel, vo);
  178. break;
  179. case "AP28": // 语音下行终端回复
  180. // 调用接口 查询回复内容
  181. VoiceMsgVo ap28Msg = splitAP28VoiceMsg(msg, deviceId);
  182. VoiceMsgVo voiceMsg = null;
  183. if (ap28Msg.getLag() == 1) {
  184. if (ap28Msg.getNu() < ap28Msg.getTotal()) {
  185. // 回复下个语音包
  186. voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu() + 1);
  187. } else {
  188. // 更新数据状态为发送完毕
  189. voiceMsgClient.updateVoiceMsgSendFinish(deviceId, ap28Msg.getMsgId(), 4);
  190. }
  191. } else {
  192. // 回复上个语音包
  193. voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu());
  194. }
  195. if (voiceMsg != null && voiceMsg.getLag() == 1) {
  196. normalBP28Reply(channel, voiceMsg);
  197. }
  198. break;
  199. default: // 其他
  200. logger.info("client send data without handle type ...");
  201. break;
  202. }
  203. } finally {
  204. MDC.clear();
  205. }
  206. }
  207. /**
  208. * 语音下行发送
  209. *
  210. * @param deviceId
  211. * @param channel
  212. */
  213. protected void voiceReplyToClient(String deviceId, Channel channel) {
  214. try {
  215. VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
  216. if (voiceF.getLag() == 1) {
  217. normalBP28Reply(channel, voiceF);
  218. // 更新数据状态为发送完毕
  219. voiceMsgClient.updateVoiceMsgSendFinish(deviceId, voiceF.getMsgId(), 3);
  220. }
  221. } catch (Exception e) {
  222. logger.error("语音下行发送异常!!!!! deviceId=" + deviceId);
  223. }
  224. }
  225. /**
  226. * 心率下行发送
  227. * beatMap (1 下发成功,2 下发完成)
  228. *
  229. * @param deviceId
  230. * @param channel
  231. */
  232. protected void sendApthReplyToClient(String deviceId, Channel channel) {
  233. try {
  234. // 时间校验 1: 是否需要下发心率获取
  235. Boolean lag = deviceCronClient.getSendApThByDeviceId(deviceId);
  236. if (lag) {
  237. if (beatMap.isEmpty()
  238. || !beatMap.containsKey(deviceId)
  239. || 1 == beatMap.get(deviceId)) {
  240. setBPXLToClient(deviceId, channel);
  241. }
  242. } else {
  243. beatMap.remove(deviceId);
  244. logger.warn("标记参数移除deviceId=" + deviceId);
  245. }
  246. } catch (Exception e) {
  247. logger.error("心率下行发送异常!!!!! deviceId=" + deviceId);
  248. }
  249. }
  250. /**
  251. * 获取心率数据 下行指令
  252. *
  253. * @param deviceId
  254. * @param channel
  255. */
  256. protected void setBPXLToClient(String deviceId, Channel channel) {
  257. StringBuilder replyCommand = new StringBuilder();
  258. replyCommand.append("IWBPXL").append(",");
  259. replyCommand.append(deviceId).append(",");
  260. replyCommand.append("080835");//指令流水
  261. replyCommand.append("#");
  262. String replyCommandStr = replyCommand.toString();
  263. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  264. buffer.writeBytes(replyCommandStr.getBytes());
  265. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  266. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  267. }
  268. /**
  269. * 语音上行回复
  270. *
  271. * @param channel
  272. * @content 回复内容 IWBP07,20140818064408,6,1,1#
  273. */
  274. private void normalVoiceReply(Channel channel, VoiceMsgVo voiceMsg) {
  275. StringBuilder replyCommand = new StringBuilder();
  276. replyCommand.append("IWBP07").append(",");
  277. replyCommand.append(voiceMsg.getVoiceTime()).append(",");
  278. replyCommand.append(voiceMsg.getTotal()).append(",");
  279. replyCommand.append(voiceMsg.getNu()).append(",");
  280. replyCommand.append(voiceMsg.getLag());// 接受成功1:成功,0 失败
  281. replyCommand.append("#");
  282. String replyCommandStr = replyCommand.toString();
  283. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  284. buffer.writeBytes(replyCommandStr.getBytes());
  285. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  286. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  287. }
  288. /**
  289. * 语音下行回复
  290. *
  291. * @param channel
  292. * @content 回复内容 获取下一条代发送终端语音 IWBP28, D3590D54,XXXX,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
  293. */
  294. private void normalBP28Reply(Channel channel, VoiceMsgVo voiceMsg) {
  295. StringBuilder replyCommand = new StringBuilder();
  296. replyCommand.append("IWBP28").append(",");
  297. replyCommand.append("D3590D54").append(",");
  298. replyCommand.append(voiceMsg.getMsgId()).append(",");
  299. replyCommand.append(voiceMsg.getTotal()).append(",");
  300. replyCommand.append(voiceMsg.getNu()).append(",");
  301. replyCommand.append(voiceMsg.getLength()).append(",");
  302. String replyCommandStr = replyCommand.toString();
  303. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  304. buffer.writeBytes(replyCommandStr.getBytes());
  305. buffer.writeBytes(voiceMsg.getMsg());
  306. buffer.readerIndex(0);
  307. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  308. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg())));
  309. }
  310. /**
  311. * 上行语音包才解析
  312. *
  313. * @param msg IWAP07,20140818064408,6,1,1024,XXXXXXXX#
  314. * @param deviceId
  315. * @return
  316. */
  317. protected VoiceMsgVo splitAP07VoiceMsg(String msg, String deviceId, ByteBuf byteBufIn) {
  318. VoiceMsgVo voiceMsg = new VoiceMsgVo();
  319. String[] msgArr = msg.split(",");
  320. voiceMsg.setDeviceId(deviceId);
  321. voiceMsg.setVoiceTime(msgArr[1]);
  322. voiceMsg.setTotal(Integer.valueOf(msgArr[2]));
  323. voiceMsg.setNu(Integer.valueOf(msgArr[3]));
  324. voiceMsg.setLength(Integer.valueOf(msgArr[4]));
  325. try {
  326. byteBufIn.readerIndex(0);
  327. // 按照逗号切割
  328. int cutLength = 0;// 存储最后一次查询条件
  329. for (int i = 0; i < 5; i++) {
  330. int commaLength = byteBufIn.forEachByte(byteBufIn.readerIndex(), 30, FIND_COMMA);
  331. cutLength = commaLength;
  332. byteBufIn.readerIndex(commaLength + 1);
  333. }
  334. byteBufIn.readerIndex(cutLength + 1);
  335. byte[] bytes = new byte[voiceMsg.getLength() + 1];
  336. byteBufIn.readBytes(bytes);
  337. String msg1 = new String(bytes, "UTF-8");
  338. voiceMsg.setMsg(bytes);
  339. } catch (Exception e) {
  340. logger.warn("语音解析错无" + JSON.toJSONString(voiceMsg) + ":" + e.getStackTrace());
  341. }
  342. return voiceMsg;
  343. }
  344. /**
  345. * 下行语音包才解析
  346. *
  347. * @param msg IWAP28,D3590D54,XXXX,6,1,1#
  348. * @param deviceId
  349. * @return
  350. */
  351. protected VoiceMsgVo splitAP28VoiceMsg(String msg, String deviceId) {
  352. VoiceMsgVo voiceMsg = new VoiceMsgVo();
  353. String[] msgArr = msg.split(",");
  354. voiceMsg.setDeviceId(deviceId);
  355. voiceMsg.setTotal(Integer.valueOf(msgArr[3]));
  356. voiceMsg.setNu(Integer.valueOf(msgArr[4]));
  357. voiceMsg.setLag(Integer.valueOf(msgArr[5].substring(0, 1)));
  358. voiceMsg.setMsgId(Integer.valueOf(msgArr[2]));
  359. return voiceMsg;
  360. }
  361. /**
  362. * 模式设置
  363. *
  364. * @param factory
  365. * @param deviceId
  366. * @param channel
  367. */
  368. protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
  369. Long nowTime = System.currentTimeMillis();// 当前时间戳
  370. if (deviceId == null || !switchMap.containsKey(deviceId)) {
  371. return;
  372. }
  373. SwitchWorkModel swm = switchMap.get(deviceId);
  374. logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
  375. if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
  376. Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY;
  377. if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
  378. workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式
  379. }
  380. if (URGENCY == workType) {
  381. //如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据
  382. swm.setFirstRemove(Boolean.FALSE);
  383. }
  384. // 如果当前状态为A 紧急模式
  385. swm.setSwitchTime(nowTime);
  386. swm.setWorkType(workType);
  387. switchMap.put(deviceId, swm);
  388. logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
  389. normalReplyModel(factory, deviceId, channel, workType);
  390. if (workType == URGENCY) {// 设置紧急模式 则更新紧急模式设置状态
  391. WorkModel wm = new WorkModel();
  392. wm.setUrgentType(OTHER);//
  393. wm.setUrgentTime(nowTime);// 设置紧急模式时间
  394. modelMap.put(deviceId, wm);
  395. }
  396. } else {
  397. logger.warn("心跳检测是下发模式:工作不更改状态");
  398. }
  399. }
  400. /**
  401. * 初始化 数据记录 注册记录
  402. *
  403. * @param deviceId
  404. */
  405. protected void initSwitchMap(String deviceId) {
  406. if (deviceId == null) {
  407. return;
  408. }
  409. Long time = System.currentTimeMillis();
  410. SwitchWorkModel swm = new SwitchWorkModel();
  411. swm.setActiveTime(time);
  412. swm.setSwitchTime(time);
  413. swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式
  414. swm.setGpsUpTime(0L);// GPS 上报时间接口
  415. swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃
  416. switchMap.put(deviceId, swm);
  417. logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
  418. }
  419. /**
  420. * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接
  421. * <p>
  422. *
  423. * @return 返回值为ture 则需要断开连接,如果返回值为false 则不需要断开连接
  424. */
  425. protected Boolean checkGpsTime(String gTime, Long time) {
  426. try {
  427. DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss");
  428. Date date = fmt.parse(gTime);
  429. if ((time - date.getTime()) > 3 * 3600 * 1000) {
  430. return Boolean.TRUE;
  431. }
  432. } catch (Exception e) {
  433. logger.error(e.getMessage(), e);
  434. }
  435. return Boolean.FALSE;
  436. }
  437. /**
  438. * APOI 上报数据 更新活跃时间
  439. *
  440. * @param deviceId
  441. */
  442. protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) {
  443. if (deviceId == null) {
  444. return null;
  445. }
  446. Long time = System.currentTimeMillis();
  447. SwitchWorkModel swm = switchMap.getOrDefault(deviceId, new SwitchWorkModel());
  448. swm.setGpsUpTime(time);
  449. if ("A".equals(gpsState)) {
  450. swm.setActiveTime(time);
  451. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  452. if (swm.getFirstRemove()) {
  453. sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
  454. } else {
  455. swm.setFirstRemove(Boolean.TRUE);
  456. logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId);
  457. }
  458. } else {
  459. logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
  460. }
  461. switchMap.put(deviceId, swm);
  462. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  463. return swm;
  464. }
  465. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  466. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  467. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  468. dataByteBufCopy.readBytes(dataByteArray);
  469. dataByteBufCopy.release();
  470. }
  471. /**
  472. * 登录管理
  473. *
  474. * @param channel
  475. */
  476. private void resolveLoginMSG(String msg, Channel channel) {
  477. /*IWAP00353456789012345# */
  478. String message = String.valueOf(msg);
  479. String factory = message.substring(0, 2);
  480. String deviceId = message.substring(6, 21);
  481. String deviceIdInMap = channelDeviceMap.get(channel);
  482. MDC.put(MDC_DEVICEID, deviceId);
  483. if (!deviceId.equals(deviceIdInMap)) {
  484. manageChannel(channel, deviceId);
  485. }
  486. String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
  487. normalReply(factory, channel, "BP00," + date + ",8");
  488. Date loginTime = new Date();
  489. initSwitchMap(deviceId);
  490. executorService.execute(new Runnable() {
  491. @Override
  492. public void run() {
  493. Date currentTime = new Date();
  494. try {
  495. Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
  496. if (secondsSinceLogin < 5L) {
  497. TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
  498. }
  499. normalReplyModel(factory, deviceId, channel, URGENCY);
  500. WorkModel wm = new WorkModel();
  501. wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
  502. wm.setUrgentType(OTHER);// 默认指令模式为正常模式
  503. modelMap.put(deviceId, wm);
  504. } catch (InterruptedException e) {
  505. logger.error(e.getMessage());
  506. }
  507. }
  508. });
  509. }
  510. // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
  511. private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
  512. StringBuilder replyCommand = new StringBuilder();
  513. replyCommand.append(factory).append("BP33").append(",")
  514. .append(deviceId).append(",")
  515. .append("080835").append(",")
  516. .append(workType).append("#");
  517. String replyCommandStr = replyCommand.toString();
  518. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  519. buffer.writeBytes(replyCommandStr.getBytes());
  520. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  521. channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
  522. }
  523. /**
  524. * 回复
  525. *
  526. * @param channel
  527. * @content 回复内容
  528. */
  529. private void normalReply(String factory, Channel channel, String content) {
  530. // gps ==== >IW BP01#
  531. // 登录 ==== >IW BP00,20150101125223,8#
  532. // 心跳 ==== >IW BP03#
  533. StringBuilder replyCommand = new StringBuilder();
  534. replyCommand.append(factory);
  535. replyCommand.append(content);
  536. replyCommand.append("#");
  537. String replyCommandStr = replyCommand.toString();
  538. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  539. buffer.writeBytes(replyCommandStr.getBytes());
  540. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  541. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  542. }
  543. @Override
  544. public void startAcceptor() {
  545. EventLoopGroup bossGroup = new NioEventLoopGroup();
  546. EventLoopGroup workerGroup = new NioEventLoopGroup();
  547. try {
  548. ServerBootstrap b = new ServerBootstrap();
  549. b.group(bossGroup, workerGroup)
  550. .channel(NioServerSocketChannel.class)
  551. .option(ChannelOption.SO_BACKLOG, 1024)
  552. .handler(new LoggingHandler(LogLevel.INFO))
  553. .childHandler(new ChannelInitializer<SocketChannel>() {
  554. @Override
  555. protected void initChannel(SocketChannel ch) {
  556. ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
  557. // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
  558. ch.pipeline().addLast(new DelimiterJingWeiFrameDecoder(65535, false, delimiter));
  559. ch.pipeline().addLast(WatchJWServerHandler.this);
  560. }
  561. });
  562. ChannelFuture f = b.bind(port).sync();
  563. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  564. this.getPort());
  565. f.channel().closeFuture().sync();
  566. } catch (Exception ex) {
  567. logger.warn(ex.getMessage(), ex);
  568. } finally {
  569. cleanRedisLinkData();
  570. workerGroup.shutdownGracefully();
  571. bossGroup.shutdownGracefully();
  572. }
  573. }
  574. /**
  575. * 移除 失效的 deviceId
  576. *
  577. * @param ctx
  578. * @throws Exception
  579. */
  580. @Override
  581. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  582. Channel channel = ctx.channel();
  583. if (!channel.isActive()) {
  584. String deviceId = channelDeviceMap.get(channel);
  585. if (deviceId != null) {
  586. switchMap.remove(deviceId);
  587. modelMap.remove(deviceId);
  588. }
  589. }
  590. super.channelInactive(ctx);
  591. }
  592. class WorkModel {
  593. /**
  594. * 设置紧急模式时间
  595. */
  596. private Long urgentTime;
  597. /**
  598. * 设置紧急模式状态
  599. */
  600. private Integer urgentType;
  601. public Long getUrgentTime() {
  602. return urgentTime;
  603. }
  604. public void setUrgentTime(Long urgentTime) {
  605. this.urgentTime = urgentTime;
  606. }
  607. public Integer getUrgentType() {
  608. return urgentType;
  609. }
  610. public void setUrgentType(Integer urgentType) {
  611. this.urgentType = urgentType;
  612. }
  613. }
  614. /**
  615. * 开关工作模式
  616. */
  617. class SwitchWorkModel {
  618. /**
  619. * 有效数据时间 毫秒
  620. */
  621. private Long activeTime;
  622. /**
  623. * 开关 A 有效,V 无效
  624. */
  625. private Integer workType;
  626. /**
  627. * 开关切换时间 毫秒
  628. */
  629. private Long switchTime;
  630. /**
  631. * GPS APO1 上傳时间
  632. */
  633. private Long gpsUpTime;
  634. /**
  635. * 模式切换第一条是否移除 true 不需要移除, false 需要移除
  636. */
  637. private Boolean firstRemove = Boolean.TRUE;
  638. public Long getActiveTime() {
  639. return activeTime;
  640. }
  641. public void setActiveTime(Long activeTime) {
  642. this.activeTime = activeTime;
  643. }
  644. public Integer getWorkType() {
  645. return workType;
  646. }
  647. public void setWorkType(Integer workType) {
  648. this.workType = workType;
  649. }
  650. public Long getSwitchTime() {
  651. return switchTime;
  652. }
  653. public void setSwitchTime(Long switchTime) {
  654. this.switchTime = switchTime;
  655. }
  656. public Long getGpsUpTime() {
  657. return gpsUpTime;
  658. }
  659. public void setGpsUpTime(Long gpsUpTime) {
  660. this.gpsUpTime = gpsUpTime;
  661. }
  662. public Boolean getFirstRemove() {
  663. return firstRemove;
  664. }
  665. public void setFirstRemove(Boolean firstRemove) {
  666. this.firstRemove = firstRemove;
  667. }
  668. }
  669. }