WatchJWServerHandler.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.tidecloud.dataacceptance.common.DateUtil;
  4. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  5. import io.netty.bootstrap.ServerBootstrap;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.*;
  9. import io.netty.channel.ChannelHandler.Sharable;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.SocketChannel;
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;
  13. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  14. import io.netty.handler.logging.LogLevel;
  15. import io.netty.handler.logging.LoggingHandler;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.slf4j.MDC;
  19. import org.springframework.context.annotation.Scope;
  20. import org.springframework.stereotype.Component;
  21. import java.util.Date;
  22. import java.util.Map;
  23. import java.util.concurrent.ConcurrentHashMap;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import java.util.concurrent.TimeUnit;
  27. /**
  28. * Created by jhw on 2018/7/20.
  29. */
  30. @Sharable
  31. @Scope("prototype")
  32. @Component(WatchJWServerHandler.name)
  33. public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  34. public static final String name = "WatchJWServerHandler";
  35. private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
  36. private static ExecutorService executorService = Executors.newSingleThreadExecutor();
  37. private static final Long INTERVAL_TIME = 300000L; // 开关时间
  38. private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
  39. /**
  40. * 省电模式开关
  41. */
  42. private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
  43. /**
  44. * 设置模式
  45. */
  46. private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
  47. @Override
  48. protected void handle(ByteBuf in, Channel channel) throws Exception {
  49. // String msg = byteBufferToString(in.nioBuffer());TODO (注意)
  50. byte[] req = new byte[in.readableBytes()];
  51. in.readBytes(req);
  52. String msg = new String(req, "UTF-8");
  53. try {
  54. String deviceId = channelDeviceMap.get(channel);
  55. logger.info("传入数据为:" + msg);
  56. String factory = msg.substring(0, 2);// 工厂
  57. String type = msg.substring(2, 6);// 标记
  58. if (deviceId != null) {
  59. MDC.put(MDC_DEVICEID, deviceId);
  60. } else {
  61. logger.info("该手表没有登录:传入数据为" + msg);
  62. if (!"AP00".equals(type)) {
  63. logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
  64. return;
  65. }
  66. }
  67. Long time = System.currentTimeMillis();
  68. WorkModel workModel = null;
  69. SwitchWorkModel swm = null;
  70. if (deviceId != null) {
  71. workModel = modelMap.get(deviceId);//已登录
  72. swm = switchMap.get(deviceId);// 开关
  73. if (workModel != null && workModel.getUrgentType() == 2
  74. && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
  75. workModel.setUrgentTime(time);
  76. workModel.setUrgentType(2);
  77. modelMap.put(deviceId, workModel);
  78. normalReplyModel(factory, deviceId, channel, 3);
  79. logger.warn("超过指定时间没有收到回复》》》 重新设置");
  80. // 更新开关切换时间
  81. swm.setSwitchTime(time);
  82. switchMap.put(deviceId, swm);
  83. }
  84. }
  85. switch (type) {
  86. case "AP00": // 初始化登录
  87. resolveLoginMSG(msg, channel);
  88. break;
  89. case "AP03": // 连接(心跳) BP03#
  90. normalReply(factory, channel, "BP03");
  91. checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
  92. break;
  93. case "AP01": // 位置信息
  94. String gpsState = msg.substring(12, 13);
  95. setSwitchMap(deviceId, gpsState);// 采集数据
  96. sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
  97. normalReply(factory, channel, "BP01");
  98. break;
  99. case "AP33": // 设置模式回复
  100. //IWAP33,080835,1#
  101. if (deviceId != null) {
  102. Integer moderType = getInteger(msg.substring(14, 16));// 收到回复状态
  103. if (swm.getWorkType() == 3) {// 当前设置的模式
  104. if (workModel != null && moderType == 3) {
  105. workModel.setUrgentType(3);
  106. logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
  107. } else {
  108. logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
  109. Thread.sleep(1000);
  110. normalReplyModel(factory, deviceId, channel, 3);
  111. workModel.setUrgentType(2);
  112. // 更新开关时间
  113. swm.setSwitchTime(time);
  114. switchMap.put(deviceId, swm);
  115. }
  116. modelMap.put(deviceId, workModel);
  117. }
  118. }
  119. break;
  120. default: // 其他
  121. logger.info("client send data without handle type ...");
  122. break;
  123. }
  124. } finally {
  125. MDC.clear();
  126. }
  127. }
  128. /**
  129. * 模式设置
  130. *
  131. * @param factory
  132. * @param deviceId
  133. * @param channel
  134. */
  135. protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
  136. Long nowTime = System.currentTimeMillis();// 当前时间戳
  137. if (deviceId == null || !switchMap.containsKey(deviceId)) {
  138. return;
  139. }
  140. SwitchWorkModel swm = switchMap.get(deviceId);
  141. logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
  142. if (nowTime - swm.getActiveTime() > INTERVAL_TIME
  143. && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
  144. Integer workType = (3 == swm.getWorkType()) ? 2 : 3;
  145. if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
  146. workType = 3; // APO1上传间隔大于5分种再次设置为紧急模式
  147. }
  148. // 如果当前状态为A 紧急模式
  149. swm.setSwitchTime(nowTime);
  150. swm.setWorkType(workType);
  151. switchMap.put(deviceId, swm);
  152. logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
  153. normalReplyModel(factory, deviceId, channel, workType);
  154. if (workType == 3) {// 设置紧急模式 则更新紧急模式设置状态
  155. WorkModel wm = new WorkModel();
  156. wm.setUrgentType(2);//
  157. wm.setUrgentTime(nowTime);// 设置紧急模式时间
  158. modelMap.put(deviceId, wm);
  159. }
  160. } else {
  161. logger.warn("心跳检测是下发模式:工作不更改状态");
  162. }
  163. }
  164. /**
  165. * 初始化 数据记录 注册记录
  166. *
  167. * @param deviceId
  168. */
  169. protected void initSwitchMap(String deviceId) {
  170. if (deviceId == null) {
  171. return;
  172. }
  173. Long time = System.currentTimeMillis();
  174. SwitchWorkModel swm = new SwitchWorkModel();
  175. swm.setActiveTime(time);
  176. swm.setSwitchTime(time);
  177. swm.setWorkType(3);//1:正常模式,2:省电模式,3:紧急模式
  178. swm.setGpsUpTime(0L);// GPS 上报时间接口
  179. switchMap.put(deviceId, swm);
  180. logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
  181. }
  182. /**
  183. * APOI 上报数据 更新活跃时间
  184. *
  185. * @param deviceId
  186. */
  187. protected void setSwitchMap(String deviceId, String gpsState) {
  188. if (deviceId == null) {
  189. return;
  190. }
  191. Long time = System.currentTimeMillis();
  192. SwitchWorkModel swm = switchMap.get(deviceId);
  193. swm.setGpsUpTime(time);
  194. if ("A".equals(gpsState)) {
  195. swm.setActiveTime(time);
  196. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  197. } else {
  198. logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
  199. }
  200. switchMap.put(deviceId, swm);
  201. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  202. }
  203. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  204. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  205. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  206. dataByteBufCopy.readBytes(dataByteArray);
  207. dataByteBufCopy.release();
  208. }
  209. /**
  210. * 登录管理
  211. *
  212. * @param channel
  213. */
  214. private void resolveLoginMSG(String msg, Channel channel) {
  215. /*IWAP00353456789012345# */
  216. String message = String.valueOf(msg);
  217. String factory = message.substring(0, 2);
  218. String deviceId = message.substring(6, 21);
  219. String deviceIdInMap = channelDeviceMap.get(channel);
  220. MDC.put(MDC_DEVICEID, deviceId);
  221. if (!deviceId.equals(deviceIdInMap)) {
  222. manageChannel(channel, deviceId);
  223. }
  224. String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
  225. normalReply(factory, channel, "BP00," + date + ",8");
  226. Date loginTime = new Date();
  227. initSwitchMap(deviceId);
  228. executorService.execute(new Runnable() {
  229. @Override
  230. public void run() {
  231. Date currentTime = new Date();
  232. try {
  233. Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
  234. if (secondsSinceLogin < 5L) {
  235. TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
  236. }
  237. normalReplyModel(factory, deviceId, channel, 3);
  238. WorkModel wm = new WorkModel();
  239. wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
  240. wm.setUrgentType(2);// 默认指令模式为正常模式
  241. modelMap.put(deviceId, wm);
  242. } catch (InterruptedException e) {
  243. logger.error(e.getMessage());
  244. }
  245. }
  246. });
  247. }
  248. // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
  249. private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
  250. StringBuilder replyCommand = new StringBuilder();
  251. replyCommand.append(factory).append("BP33").append(",")
  252. .append(deviceId).append(",")
  253. .append("080835").append(",")
  254. .append(workType).append("#");
  255. String replyCommandStr = replyCommand.toString();
  256. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  257. buffer.writeBytes(replyCommandStr.getBytes());
  258. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  259. channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
  260. }
  261. /**
  262. * 回复
  263. *
  264. * @param channel
  265. * @content 回复内容
  266. */
  267. private void normalReply(String factory, Channel channel, String content) {
  268. // gps ==== >IW BP01#
  269. // 登录 ==== >IW BP00,20150101125223,8#
  270. // 心跳 ==== >IW BP03#
  271. StringBuilder replyCommand = new StringBuilder();
  272. replyCommand.append(factory);
  273. replyCommand.append(content);
  274. replyCommand.append("#");
  275. String replyCommandStr = replyCommand.toString();
  276. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  277. buffer.writeBytes(replyCommandStr.getBytes());
  278. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  279. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  280. }
  281. @Override
  282. public void startAcceptor() {
  283. EventLoopGroup bossGroup = new NioEventLoopGroup();
  284. EventLoopGroup workerGroup = new NioEventLoopGroup();
  285. try {
  286. ServerBootstrap b = new ServerBootstrap();
  287. b.group(bossGroup, workerGroup)
  288. .channel(NioServerSocketChannel.class)
  289. .option(ChannelOption.SO_BACKLOG, 1024)
  290. .handler(new LoggingHandler(LogLevel.INFO))
  291. .childHandler(new ChannelInitializer<SocketChannel>() {
  292. @Override
  293. protected void initChannel(SocketChannel ch) {
  294. ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
  295. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
  296. ch.pipeline().addLast(WatchJWServerHandler.this);
  297. }
  298. });
  299. ChannelFuture f = b.bind(port).sync();
  300. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  301. this.getPort());
  302. f.channel().closeFuture().sync();
  303. } catch (Exception ex) {
  304. logger.warn(ex.getMessage(), ex);
  305. } finally {
  306. cleanRedisLinkData();
  307. workerGroup.shutdownGracefully();
  308. bossGroup.shutdownGracefully();
  309. }
  310. }
  311. /**
  312. * 移除 失效的 deviceId
  313. *
  314. * @param ctx
  315. * @throws Exception
  316. */
  317. @Override
  318. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  319. Channel channel = ctx.channel();
  320. if (!channel.isActive()) {
  321. String deviceId = channelDeviceMap.get(channel);
  322. if (deviceId != null) {
  323. switchMap.remove(deviceId);
  324. modelMap.remove(deviceId);
  325. }
  326. }
  327. super.channelInactive(ctx);
  328. }
  329. class WorkModel {
  330. /**
  331. * 设置紧急模式时间
  332. */
  333. private Long urgentTime;
  334. /**
  335. * 设置紧急模式状态
  336. */
  337. private Integer urgentType;
  338. public Long getUrgentTime() {
  339. return urgentTime;
  340. }
  341. public void setUrgentTime(Long urgentTime) {
  342. this.urgentTime = urgentTime;
  343. }
  344. public Integer getUrgentType() {
  345. return urgentType;
  346. }
  347. public void setUrgentType(Integer urgentType) {
  348. this.urgentType = urgentType;
  349. }
  350. }
  351. /**
  352. * 开关工作模式
  353. */
  354. class SwitchWorkModel {
  355. /**
  356. * 有效数据时间 毫秒
  357. */
  358. private Long activeTime;
  359. /**
  360. * 开关 A 有效,V 无效
  361. */
  362. private Integer workType;
  363. /**
  364. * 开关切换时间 毫秒
  365. */
  366. private Long switchTime;
  367. /**
  368. * GPS APO1 上傳时间
  369. */
  370. private Long gpsUpTime;
  371. public Long getActiveTime() {
  372. return activeTime;
  373. }
  374. public void setActiveTime(Long activeTime) {
  375. this.activeTime = activeTime;
  376. }
  377. public Integer getWorkType() {
  378. return workType;
  379. }
  380. public void setWorkType(Integer workType) {
  381. this.workType = workType;
  382. }
  383. public Long getSwitchTime() {
  384. return switchTime;
  385. }
  386. public void setSwitchTime(Long switchTime) {
  387. this.switchTime = switchTime;
  388. }
  389. public Long getGpsUpTime() {
  390. return gpsUpTime;
  391. }
  392. public void setGpsUpTime(Long gpsUpTime) {
  393. this.gpsUpTime = gpsUpTime;
  394. }
  395. }
  396. }