WatchJWServerHandler.java 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.tidecloud.dataacceptance.DataAcceptanceApplication;
  3. import com.tidecloud.dataacceptance.common.DateUtil;
  4. import com.tidecloud.dataacceptance.entity.Advice;
  5. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.Channel;
  9. import io.netty.channel.ChannelFuture;
  10. import io.netty.channel.ChannelHandler.Sharable;
  11. import io.netty.channel.ChannelHandlerContext;
  12. import org.apache.commons.lang.StringUtils;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.slf4j.MDC;
  16. import org.springframework.boot.SpringApplication;
  17. import org.springframework.context.annotation.Scope;
  18. import org.springframework.stereotype.Component;
  19. import javax.xml.bind.DatatypeConverter;
  20. import java.text.SimpleDateFormat;
  21. import java.util.Date;
  22. /**
  23. * Created by jhw on 2018/7/20.
  24. */
  25. @Sharable
  26. @Scope("prototype")
  27. @Component(WatchJWServerHandler.name)
  28. public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  29. public static final String name = "WatchJWServerHandler";
  30. private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
  31. @Override
  32. protected void handle(ByteBuf in, Channel channel) throws Exception {
  33. String msg = byteBufferToString(in.nioBuffer());
  34. String deviceId = channelDeviceMap.get(channel);
  35. if (deviceId != null) {
  36. MDC.put(MDC_DEVICEID, deviceId);
  37. }
  38. logger.info("传入数据为:" + msg);
  39. String factory = msg.substring(0, 2);// 工厂
  40. String type = msg.substring(2, 6);// 标记
  41. switch (type) {
  42. case "AP00": // 初始化登录
  43. resolveLoginMSG(msg, channel);
  44. break;
  45. case "AP03": // 连接(心跳) BP03#
  46. normalReply(factory, channel, "BP03");
  47. break;
  48. case "AP01": // 位置信息
  49. sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
  50. normalReply(factory, channel, "BP01");
  51. break;
  52. default: // 其他
  53. logger.info("client send data without handle type ...");
  54. break;
  55. }
  56. }
  57. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  58. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  59. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  60. dataByteBufCopy.readBytes(dataByteArray);
  61. dataByteBufCopy.release();
  62. }
  63. /**
  64. * 登录管理
  65. *
  66. * @param channel
  67. */
  68. private void resolveLoginMSG(String msg, Channel channel) {
  69. /*IWAP00353456789012345# */
  70. String message = String.valueOf(msg);
  71. String factory = message.substring(0, 2);
  72. String deviceId = message.substring(6, 21);
  73. String deviceIdInMap = channelDeviceMap.get(channel);
  74. MDC.put(MDC_DEVICEID, deviceId);
  75. if (!deviceId.equals(deviceIdInMap)) {
  76. manageChannel(channel, deviceId);
  77. }
  78. String date =DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
  79. normalReply(factory, channel, "BP00," + date + ",8");
  80. // normalReplyModel(factory,deviceId,channel); TODO (无效)
  81. }
  82. // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
  83. private void normalReplyModel(String factory,String deviceId, Channel channel){
  84. StringBuilder replyCommand = new StringBuilder();
  85. replyCommand.append(factory).append("BP33").append(",")
  86. .append(deviceId).append(",")
  87. .append("080835").append(",")
  88. .append("3").append("#");
  89. String replyCommandStr = replyCommand.toString();
  90. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  91. buffer.writeBytes(replyCommandStr.getBytes());
  92. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  93. channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
  94. }
  95. /**
  96. * 回复
  97. *
  98. * @param channel
  99. * @content 回复内容
  100. */
  101. private void normalReply(String factory, Channel channel, String content) {
  102. // gps ==== >IW BP01#
  103. // 登录 ==== >IW BP00,20150101125223,8#
  104. // 心跳 ==== >IW BP03#
  105. StringBuilder replyCommand = new StringBuilder();
  106. replyCommand.append(factory);
  107. replyCommand.append(content);
  108. replyCommand.append("#");
  109. String replyCommandStr = replyCommand.toString();
  110. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  111. buffer.writeBytes(replyCommandStr.getBytes());
  112. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  113. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  114. }
  115. }