WatchJWServerHandler.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.tidecloud.dataacceptance.common.DateUtil;
  4. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  5. import io.netty.bootstrap.ServerBootstrap;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.*;
  9. import io.netty.channel.ChannelHandler.Sharable;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.SocketChannel;
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;
  13. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  14. import io.netty.handler.logging.LogLevel;
  15. import io.netty.handler.logging.LoggingHandler;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.slf4j.MDC;
  19. import org.springframework.context.annotation.Scope;
  20. import org.springframework.stereotype.Component;
  21. import java.util.Date;
  22. import java.util.Map;
  23. import java.util.concurrent.ConcurrentHashMap;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import java.util.concurrent.TimeUnit;
  27. /**
  28. * Created by jhw on 2018/7/20.
  29. */
  30. @Sharable
  31. @Scope("prototype")
  32. @Component(WatchJWServerHandler.name)
  33. public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  34. public static final String name = "WatchJWServerHandler";
  35. private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
  36. private static ExecutorService executorService = Executors.newSingleThreadExecutor();
  37. private static final Long INTERVAL_TIME = 300000L; // 开关时间
  38. private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
  39. /**
  40. * 省电模式开关
  41. */
  42. private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
  43. /**
  44. * 设置模式
  45. */
  46. private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
  47. @Override
  48. protected void handle(ByteBuf in, Channel channel) throws Exception {
  49. // String msg = byteBufferToString(in.nioBuffer());TODO (注意)
  50. byte[] req = new byte[in.readableBytes()];
  51. in.readBytes(req);
  52. String msg = new String(req, "UTF-8");
  53. String deviceId = channelDeviceMap.get(channel);
  54. if (deviceId != null) {
  55. MDC.put(MDC_DEVICEID, deviceId);
  56. }
  57. logger.info("传入数据为:" + msg);
  58. try {
  59. String factory = msg.substring(0, 2);// 工厂
  60. String type = msg.substring(2, 6);// 标记
  61. Long time = System.currentTimeMillis();
  62. WorkModel workModel = null;
  63. SwitchWorkModel swm = null;
  64. if (deviceId != null) {
  65. workModel = modelMap.get(deviceId);//已登录
  66. swm = switchMap.get(deviceId);// 开关
  67. if (workModel != null && workModel.getUrgentType() == 2
  68. && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
  69. workModel.setUrgentTime(time);
  70. workModel.setUrgentType(2);
  71. modelMap.put(deviceId, workModel);
  72. normalReplyModel(factory, deviceId, channel, 3);
  73. logger.warn("超过指定时间没有收到回复》》》 重新设置");
  74. // 更新开关切换时间
  75. swm.setSwitchTime(time);
  76. switchMap.put(deviceId,swm);
  77. }
  78. }
  79. switch (type) {
  80. case "AP00": // 初始化登录
  81. resolveLoginMSG(msg, channel);
  82. break;
  83. case "AP03": // 连接(心跳) BP03#
  84. normalReply(factory, channel, "BP03");
  85. checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
  86. break;
  87. case "AP01": // 位置信息
  88. String gpsState = msg.substring(12, 13);
  89. setSwitchMap(deviceId, gpsState);// 采集数据
  90. sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
  91. normalReply(factory, channel, "BP01");
  92. break;
  93. case "AP33": // 设置模式回复
  94. //IWAP33,080835,1#
  95. if (deviceId != null) {
  96. Integer moderType = getInteger(msg.substring(14, 16));// 收到回复状态
  97. if (swm.getWorkType() == 3) {// 当前设置的模式
  98. if (workModel != null && moderType == 3) {
  99. workModel.setUrgentType(3);
  100. logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" +msg);
  101. } else {
  102. logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>" );
  103. Thread.sleep(1000);
  104. normalReplyModel(factory, deviceId, channel, 3);
  105. workModel.setUrgentType(2);
  106. // 更新开关时间
  107. swm.setSwitchTime(time);
  108. switchMap.put(deviceId,swm);
  109. }
  110. modelMap.put(deviceId, workModel);
  111. }
  112. }
  113. break;
  114. default: // 其他
  115. logger.info("client send data without handle type ...");
  116. break;
  117. }
  118. } finally {
  119. MDC.clear();
  120. }
  121. }
  122. /**
  123. * 模式设置
  124. *
  125. * @param factory
  126. * @param deviceId
  127. * @param channel
  128. */
  129. protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
  130. Long nowTime = System.currentTimeMillis();// 当前时间戳
  131. if (deviceId == null || !switchMap.containsKey(deviceId)) {
  132. return;
  133. }
  134. SwitchWorkModel swm = switchMap.get(deviceId);
  135. logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
  136. if (nowTime - swm.getActiveTime() > INTERVAL_TIME
  137. && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
  138. Integer workType = (3 == swm.getWorkType()) ? 2 : 3;
  139. if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
  140. workType = 3; // APO1上传间隔大于5分种再次设置为紧急模式
  141. }
  142. // 如果当前状态为A 紧急模式
  143. swm.setSwitchTime(nowTime);
  144. swm.setWorkType(workType);
  145. switchMap.put(deviceId, swm);
  146. logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
  147. normalReplyModel(factory, deviceId, channel, workType);
  148. if (workType == 3) {// 设置紧急模式 则更新紧急模式设置状态
  149. WorkModel wm = new WorkModel();
  150. wm.setUrgentType(2);//
  151. wm.setUrgentTime(nowTime);// 设置紧急模式时间
  152. modelMap.put(deviceId, wm);
  153. }
  154. }else{
  155. logger.warn("心跳检测是下发模式:工作不更改状态");
  156. }
  157. }
  158. /**
  159. * 初始化 数据记录 注册记录
  160. *
  161. * @param deviceId
  162. */
  163. protected void initSwitchMap(String deviceId) {
  164. if (deviceId == null) {
  165. return;
  166. }
  167. Long time = System.currentTimeMillis();
  168. SwitchWorkModel swm = new SwitchWorkModel();
  169. swm.setActiveTime(time);
  170. swm.setSwitchTime(time);
  171. swm.setWorkType(3);//1:正常模式,2:省电模式,3:紧急模式
  172. swm.setGpsUpTime(0L);// GPS 上报时间接口
  173. switchMap.put(deviceId, swm);
  174. logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
  175. }
  176. /**
  177. * APOI 上报数据 更新活跃时间
  178. *
  179. * @param deviceId
  180. */
  181. protected void setSwitchMap(String deviceId, String gpsState) {
  182. if (deviceId == null) {
  183. return;
  184. }
  185. Long time = System.currentTimeMillis();
  186. SwitchWorkModel swm = switchMap.get(deviceId);
  187. swm.setGpsUpTime(time);
  188. if ("A".equals(gpsState)) {
  189. swm.setActiveTime(time);
  190. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  191. } else {
  192. logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
  193. }
  194. switchMap.put(deviceId, swm);
  195. logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
  196. }
  197. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  198. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  199. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  200. dataByteBufCopy.readBytes(dataByteArray);
  201. dataByteBufCopy.release();
  202. }
  203. /**
  204. * 登录管理
  205. *
  206. * @param channel
  207. */
  208. private void resolveLoginMSG(String msg, Channel channel) {
  209. /*IWAP00353456789012345# */
  210. String message = String.valueOf(msg);
  211. String factory = message.substring(0, 2);
  212. String deviceId = message.substring(6, 21);
  213. String deviceIdInMap = channelDeviceMap.get(channel);
  214. MDC.put(MDC_DEVICEID, deviceId);
  215. if (!deviceId.equals(deviceIdInMap)) {
  216. manageChannel(channel, deviceId);
  217. }
  218. String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
  219. normalReply(factory, channel, "BP00," + date + ",8");
  220. Date loginTime = new Date();
  221. initSwitchMap(deviceId);
  222. executorService.execute(new Runnable() {
  223. @Override
  224. public void run() {
  225. Date currentTime = new Date();
  226. try {
  227. Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
  228. if (secondsSinceLogin < 5L) {
  229. TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
  230. }
  231. normalReplyModel(factory, deviceId, channel, 3);
  232. WorkModel wm = new WorkModel();
  233. wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
  234. wm.setUrgentType(2);// 默认指令模式为正常模式
  235. modelMap.put(deviceId, wm);
  236. } catch (InterruptedException e) {
  237. logger.error(e.getMessage());
  238. }
  239. }
  240. });
  241. }
  242. // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
  243. private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
  244. StringBuilder replyCommand = new StringBuilder();
  245. replyCommand.append(factory).append("BP33").append(",")
  246. .append(deviceId).append(",")
  247. .append("080835").append(",")
  248. .append(workType).append("#");
  249. String replyCommandStr = replyCommand.toString();
  250. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  251. buffer.writeBytes(replyCommandStr.getBytes());
  252. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  253. channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
  254. }
  255. /**
  256. * 回复
  257. *
  258. * @param channel
  259. * @content 回复内容
  260. */
  261. private void normalReply(String factory, Channel channel, String content) {
  262. // gps ==== >IW BP01#
  263. // 登录 ==== >IW BP00,20150101125223,8#
  264. // 心跳 ==== >IW BP03#
  265. StringBuilder replyCommand = new StringBuilder();
  266. replyCommand.append(factory);
  267. replyCommand.append(content);
  268. replyCommand.append("#");
  269. String replyCommandStr = replyCommand.toString();
  270. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  271. buffer.writeBytes(replyCommandStr.getBytes());
  272. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  273. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  274. }
  275. @Override
  276. public void startAcceptor() {
  277. EventLoopGroup bossGroup = new NioEventLoopGroup();
  278. EventLoopGroup workerGroup = new NioEventLoopGroup();
  279. try {
  280. ServerBootstrap b = new ServerBootstrap();
  281. b.group(bossGroup, workerGroup)
  282. .channel(NioServerSocketChannel.class)
  283. .option(ChannelOption.SO_BACKLOG, 1024)
  284. .handler(new LoggingHandler(LogLevel.INFO))
  285. .childHandler(new ChannelInitializer<SocketChannel>() {
  286. @Override
  287. protected void initChannel(SocketChannel ch) {
  288. ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
  289. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
  290. ch.pipeline().addLast(WatchJWServerHandler.this);
  291. }
  292. });
  293. ChannelFuture f = b.bind(port).sync();
  294. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  295. this.getPort());
  296. f.channel().closeFuture().sync();
  297. } catch (Exception ex) {
  298. logger.warn(ex.getMessage(), ex);
  299. } finally {
  300. cleanRedisLinkData();
  301. workerGroup.shutdownGracefully();
  302. bossGroup.shutdownGracefully();
  303. }
  304. }
  305. /**
  306. * 移除 失效的 deviceId
  307. *
  308. * @param ctx
  309. * @throws Exception
  310. */
  311. @Override
  312. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  313. Channel channel = ctx.channel();
  314. if (!channel.isActive()) {
  315. String deviceId = channelDeviceMap.get(channel);
  316. if (deviceId != null) {
  317. switchMap.remove(deviceId);
  318. modelMap.remove(deviceId);
  319. }
  320. }
  321. super.channelInactive(ctx);
  322. }
  323. class WorkModel {
  324. /**
  325. * 设置紧急模式时间
  326. */
  327. private Long urgentTime;
  328. /**
  329. * 设置紧急模式状态
  330. */
  331. private Integer urgentType;
  332. public Long getUrgentTime() {
  333. return urgentTime;
  334. }
  335. public void setUrgentTime(Long urgentTime) {
  336. this.urgentTime = urgentTime;
  337. }
  338. public Integer getUrgentType() {
  339. return urgentType;
  340. }
  341. public void setUrgentType(Integer urgentType) {
  342. this.urgentType = urgentType;
  343. }
  344. }
  345. /**
  346. * 开关工作模式
  347. */
  348. class SwitchWorkModel {
  349. /**
  350. * 有效数据时间 毫秒
  351. */
  352. private Long activeTime;
  353. /**
  354. * 开关 A 有效,V 无效
  355. */
  356. private Integer workType;
  357. /**
  358. * 开关切换时间 毫秒
  359. */
  360. private Long switchTime;
  361. /**
  362. * GPS APO1 上傳时间
  363. */
  364. private Long gpsUpTime;
  365. public Long getActiveTime() {
  366. return activeTime;
  367. }
  368. public void setActiveTime(Long activeTime) {
  369. this.activeTime = activeTime;
  370. }
  371. public Integer getWorkType() {
  372. return workType;
  373. }
  374. public void setWorkType(Integer workType) {
  375. this.workType = workType;
  376. }
  377. public Long getSwitchTime() {
  378. return switchTime;
  379. }
  380. public void setSwitchTime(Long switchTime) {
  381. this.switchTime = switchTime;
  382. }
  383. public Long getGpsUpTime() {
  384. return gpsUpTime;
  385. }
  386. public void setGpsUpTime(Long gpsUpTime) {
  387. this.gpsUpTime = gpsUpTime;
  388. }
  389. }
  390. }