WatchJWServerHandler.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.tidecloud.dataacceptance.common.DateUtil;
  4. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  5. import io.netty.bootstrap.ServerBootstrap;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.*;
  9. import io.netty.channel.ChannelHandler.Sharable;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.SocketChannel;
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;
  13. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  14. import io.netty.handler.logging.LogLevel;
  15. import io.netty.handler.logging.LoggingHandler;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.slf4j.MDC;
  19. import org.springframework.context.annotation.Scope;
  20. import org.springframework.stereotype.Component;
  21. import java.text.DateFormat;
  22. import java.text.SimpleDateFormat;
  23. import java.util.Date;
  24. import java.util.Map;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. import java.util.concurrent.ExecutorService;
  27. import java.util.concurrent.Executors;
  28. import java.util.concurrent.TimeUnit;
  29. /**
  30. * Created by jhw on 2018/7/20.
  31. */
  32. @Sharable
  33. @Scope("prototype")
  34. @Component(WatchJWServerHandler.name)
  35. public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  36. public static final String name = "WatchJWServerHandler";
  37. private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
  38. private static ExecutorService executorService = Executors.newSingleThreadExecutor();
  39. private static final Long INTERVAL_TIME = 300000L; // 开关时间
  40. private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
  41. /**
  42. * 紧急模式
  43. */
  44. private static final Integer URGENCY = 3;
  45. /**
  46. * 其他模式
  47. */
  48. private static final Integer OTHER = 2;
  49. /**
  50. * 省电模式开关
  51. */
  52. private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
  53. /**
  54. * 设置模式
  55. */
  56. private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
  57. @Override
  58. protected void handle(ByteBuf in, Channel channel) throws Exception {
  59. //String msg = byteBufferToString(in.nioBuffer());TODO (注意)
  60. byte[] req = new byte[in.readableBytes()];
  61. in.readBytes(req);
  62. String msg = new String(req, "UTF-8");
  63. try {
  64. String deviceId = channelDeviceMap.get(channel);
  65. String factory = msg.substring(0, 2);// 工厂
  66. String type = msg.substring(2, 6);// 标记
  67. if (deviceId != null) {
  68. MDC.put(MDC_DEVICEID, deviceId);
  69. } else {
  70. logger.info("该手表没有登录:传入数据为" + msg);
  71. if (!"AP00".equals(type)) {
  72. logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
  73. channel.close();
  74. return;
  75. }
  76. }
  77. logger.info("传入数据为:" + msg);
  78. Long time = System.currentTimeMillis();
  79. WorkModel workModel = null;
  80. SwitchWorkModel swm = null;
  81. if (deviceId != null) {
  82. workModel = modelMap.get(deviceId);//已登录
  83. swm = switchMap.get(deviceId);// 开关
  84. if (workModel != null && workModel.getUrgentType() == OTHER
  85. && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
  86. workModel.setUrgentTime(time);
  87. workModel.setUrgentType(OTHER);
  88. modelMap.put(deviceId, workModel);
  89. normalReplyModel(factory, deviceId, channel, 3);
  90. logger.warn("超过指定时间没有收到回复》》》 重新设置");
  91. // 更新开关切换时间
  92. swm.setSwitchTime(time);
  93. switchMap.put(deviceId, swm);
  94. }
  95. }
  96. switch (type) {
  97. case "AP00": // 初始化登录
  98. resolveLoginMSG(msg, channel);
  99. break;
  100. case "AP03": // 连接(心跳) BP03#
  101. normalReply(factory, channel, "BP03");
  102. checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
  103. sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
  104. logger.warn("心跳报文下发 msg+" + msg);
  105. break;
  106. case "AP01": // 位置信息
  107. String gpsState = msg.substring(12, 13);
  108. // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
  109. String gDay = msg.substring(6, 12);
  110. String gTime = msg.substring(39, 45);
  111. // if (checkGpsTime(gDay + gTime, time)) {
  112. // channel.close();
  113. // }
  114. setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
  115. normalReply(factory, channel, "BP01");
  116. break;
  117. case "AP33": // 设置模式回复
  118. //IWAP33,080835,1#
  119. if (deviceId != null) {
  120. Integer moderType = getInteger(msg.substring(14, 16));// 收到回复状态
  121. if (swm.getWorkType() == URGENCY) {// 当前设置的模式
  122. if (workModel != null && moderType == URGENCY) {
  123. workModel.setUrgentType(URGENCY);
  124. logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
  125. } else {
  126. logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
  127. Thread.sleep(1000);
  128. normalReplyModel(factory, deviceId, channel, URGENCY);
  129. workModel.setUrgentType(OTHER);
  130. // 更新开关时间
  131. swm.setSwitchTime(time);
  132. switchMap.put(deviceId, swm);
  133. }
  134. modelMap.put(deviceId, workModel);
  135. }
  136. }
  137. break;
  138. default: // 其他
  139. logger.info("client send data without handle type ...");
  140. break;
  141. }
  142. } finally {
  143. MDC.clear();
  144. }
  145. }
  146. /**
  147. * 模式设置
  148. *
  149. * @param factory
  150. * @param deviceId
  151. * @param channel
  152. */
  153. protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
  154. Long nowTime = System.currentTimeMillis();// 当前时间戳
  155. if (deviceId == null || !switchMap.containsKey(deviceId)) {
  156. return;
  157. }
  158. SwitchWorkModel swm = switchMap.get(deviceId);
  159. logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
  160. if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
  161. Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY;
  162. if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
  163. workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式
  164. }
  165. if (URGENCY == workType) {
  166. //如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据
  167. swm.setFirstRemove(Boolean.FALSE);
  168. }
  169. // 如果当前状态为A 紧急模式
  170. swm.setSwitchTime(nowTime);
  171. swm.setWorkType(workType);
  172. switchMap.put(deviceId, swm);
  173. logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
  174. normalReplyModel(factory, deviceId, channel, workType);
  175. if (workType == URGENCY) {// 设置紧急模式 则更新紧急模式设置状态
  176. WorkModel wm = new WorkModel();
  177. wm.setUrgentType(OTHER);//
  178. wm.setUrgentTime(nowTime);// 设置紧急模式时间
  179. modelMap.put(deviceId, wm);
  180. }
  181. } else {
  182. logger.warn("心跳检测是下发模式:工作不更改状态");
  183. }
  184. }
  185. /**
  186. * 初始化 数据记录 注册记录
  187. *
  188. * @param deviceId
  189. */
  190. protected void initSwitchMap(String deviceId) {
  191. if (deviceId == null) {
  192. return;
  193. }
  194. Long time = System.currentTimeMillis();
  195. SwitchWorkModel swm = new SwitchWorkModel();
  196. swm.setActiveTime(time);
  197. swm.setSwitchTime(time);
  198. swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式
  199. swm.setGpsUpTime(0L);// GPS 上报时间接口
  200. swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃
  201. switchMap.put(deviceId, swm);
  202. logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
  203. }
  204. /**
  205. * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接
  206. * <p>
  207. *
  208. * @return 返回值为ture 则需要断开连接,如果返回值为false 则不需要断开连接
  209. */
  210. protected Boolean checkGpsTime(String gTime, Long time) {
  211. try {
  212. DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss");
  213. Date date = fmt.parse(gTime);
  214. if ((time - date.getTime()) > 3 * 3600 * 1000) {
  215. return Boolean.TRUE;
  216. }
  217. } catch (Exception e) {
  218. logger.error(e.getMessage(), e);
  219. }
  220. return Boolean.FALSE;
  221. }
  222. /**
  223. * APOI 上报数据 更新活跃时间
  224. *
  225. * @param deviceId
  226. */
  227. protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) {
  228. if (deviceId == null) {
  229. return null;
  230. }
  231. Long time = System.currentTimeMillis();
  232. SwitchWorkModel swm = switchMap.get(deviceId);
  233. swm.setGpsUpTime(time);
  234. if ("A".equals(gpsState)) {
  235. swm.setActiveTime(time);
  236. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  237. if (swm.getFirstRemove()) {
  238. sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
  239. } else {
  240. swm.setFirstRemove(Boolean.TRUE);
  241. logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId);
  242. }
  243. } else {
  244. logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
  245. }
  246. switchMap.put(deviceId, swm);
  247. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  248. return swm;
  249. }
  250. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  251. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  252. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  253. dataByteBufCopy.readBytes(dataByteArray);
  254. dataByteBufCopy.release();
  255. }
  256. /**
  257. * 登录管理
  258. *
  259. * @param channel
  260. */
  261. private void resolveLoginMSG(String msg, Channel channel) {
  262. /*IWAP00353456789012345# */
  263. String message = String.valueOf(msg);
  264. String factory = message.substring(0, 2);
  265. String deviceId = message.substring(6, 21);
  266. String deviceIdInMap = channelDeviceMap.get(channel);
  267. MDC.put(MDC_DEVICEID, deviceId);
  268. if (!deviceId.equals(deviceIdInMap)) {
  269. manageChannel(channel, deviceId);
  270. }
  271. String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
  272. normalReply(factory, channel, "BP00," + date + ",8");
  273. Date loginTime = new Date();
  274. initSwitchMap(deviceId);
  275. executorService.execute(new Runnable() {
  276. @Override
  277. public void run() {
  278. Date currentTime = new Date();
  279. try {
  280. Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
  281. if (secondsSinceLogin < 5L) {
  282. TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
  283. }
  284. normalReplyModel(factory, deviceId, channel, URGENCY);
  285. WorkModel wm = new WorkModel();
  286. wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
  287. wm.setUrgentType(OTHER);// 默认指令模式为正常模式
  288. modelMap.put(deviceId, wm);
  289. } catch (InterruptedException e) {
  290. logger.error(e.getMessage());
  291. }
  292. }
  293. });
  294. }
  295. // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
  296. private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
  297. StringBuilder replyCommand = new StringBuilder();
  298. replyCommand.append(factory).append("BP33").append(",")
  299. .append(deviceId).append(",")
  300. .append("080835").append(",")
  301. .append(workType).append("#");
  302. String replyCommandStr = replyCommand.toString();
  303. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  304. buffer.writeBytes(replyCommandStr.getBytes());
  305. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  306. channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
  307. }
  308. /**
  309. * 回复
  310. *
  311. * @param channel
  312. * @content 回复内容
  313. */
  314. private void normalReply(String factory, Channel channel, String content) {
  315. // gps ==== >IW BP01#
  316. // 登录 ==== >IW BP00,20150101125223,8#
  317. // 心跳 ==== >IW BP03#
  318. StringBuilder replyCommand = new StringBuilder();
  319. replyCommand.append(factory);
  320. replyCommand.append(content);
  321. replyCommand.append("#");
  322. String replyCommandStr = replyCommand.toString();
  323. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  324. buffer.writeBytes(replyCommandStr.getBytes());
  325. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  326. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  327. }
  328. @Override
  329. public void startAcceptor() {
  330. EventLoopGroup bossGroup = new NioEventLoopGroup();
  331. EventLoopGroup workerGroup = new NioEventLoopGroup();
  332. try {
  333. ServerBootstrap b = new ServerBootstrap();
  334. b.group(bossGroup, workerGroup)
  335. .channel(NioServerSocketChannel.class)
  336. .option(ChannelOption.SO_BACKLOG, 1024)
  337. .handler(new LoggingHandler(LogLevel.INFO))
  338. .childHandler(new ChannelInitializer<SocketChannel>() {
  339. @Override
  340. protected void initChannel(SocketChannel ch) {
  341. ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
  342. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
  343. ch.pipeline().addLast(WatchJWServerHandler.this);
  344. }
  345. });
  346. ChannelFuture f = b.bind(port).sync();
  347. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  348. this.getPort());
  349. f.channel().closeFuture().sync();
  350. } catch (Exception ex) {
  351. logger.warn(ex.getMessage(), ex);
  352. } finally {
  353. cleanRedisLinkData();
  354. workerGroup.shutdownGracefully();
  355. bossGroup.shutdownGracefully();
  356. }
  357. }
  358. /**
  359. * 移除 失效的 deviceId
  360. *
  361. * @param ctx
  362. * @throws Exception
  363. */
  364. @Override
  365. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  366. Channel channel = ctx.channel();
  367. if (!channel.isActive()) {
  368. String deviceId = channelDeviceMap.get(channel);
  369. if (deviceId != null) {
  370. switchMap.remove(deviceId);
  371. modelMap.remove(deviceId);
  372. }
  373. }
  374. super.channelInactive(ctx);
  375. }
  376. class WorkModel {
  377. /**
  378. * 设置紧急模式时间
  379. */
  380. private Long urgentTime;
  381. /**
  382. * 设置紧急模式状态
  383. */
  384. private Integer urgentType;
  385. public Long getUrgentTime() {
  386. return urgentTime;
  387. }
  388. public void setUrgentTime(Long urgentTime) {
  389. this.urgentTime = urgentTime;
  390. }
  391. public Integer getUrgentType() {
  392. return urgentType;
  393. }
  394. public void setUrgentType(Integer urgentType) {
  395. this.urgentType = urgentType;
  396. }
  397. }
  398. /**
  399. * 开关工作模式
  400. */
  401. class SwitchWorkModel {
  402. /**
  403. * 有效数据时间 毫秒
  404. */
  405. private Long activeTime;
  406. /**
  407. * 开关 A 有效,V 无效
  408. */
  409. private Integer workType;
  410. /**
  411. * 开关切换时间 毫秒
  412. */
  413. private Long switchTime;
  414. /**
  415. * GPS APO1 上傳时间
  416. */
  417. private Long gpsUpTime;
  418. /**
  419. * 模式切换第一条是否移除 true 不需要移除, false 需要移除
  420. */
  421. private Boolean firstRemove = Boolean.TRUE;
  422. public Long getActiveTime() {
  423. return activeTime;
  424. }
  425. public void setActiveTime(Long activeTime) {
  426. this.activeTime = activeTime;
  427. }
  428. public Integer getWorkType() {
  429. return workType;
  430. }
  431. public void setWorkType(Integer workType) {
  432. this.workType = workType;
  433. }
  434. public Long getSwitchTime() {
  435. return switchTime;
  436. }
  437. public void setSwitchTime(Long switchTime) {
  438. this.switchTime = switchTime;
  439. }
  440. public Long getGpsUpTime() {
  441. return gpsUpTime;
  442. }
  443. public void setGpsUpTime(Long gpsUpTime) {
  444. this.gpsUpTime = gpsUpTime;
  445. }
  446. public Boolean getFirstRemove() {
  447. return firstRemove;
  448. }
  449. public void setFirstRemove(Boolean firstRemove) {
  450. this.firstRemove = firstRemove;
  451. }
  452. }
  453. }