WatchDYJServerHandler.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package com.tidecloud.dataacceptance.service.impl;
  2. import com.tidecloud.dataacceptance.common.DateUtil;
  3. import com.tidecloud.dataacceptance.entity.Advice;
  4. import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
  5. import io.netty.buffer.ByteBuf;
  6. import io.netty.buffer.Unpooled;
  7. import io.netty.channel.Channel;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.ChannelHandler.Sharable;
  10. import io.netty.channel.ChannelHandlerContext;
  11. import io.netty.util.concurrent.Future;
  12. import io.netty.util.concurrent.GenericFutureListener;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.context.annotation.Scope;
  16. import org.springframework.stereotype.Component;
  17. import java.util.Date;
  18. /**
  19. * Created by jhw on 2018/7/20.
  20. */
  21. @Sharable
  22. @Scope("prototype")
  23. @Component(WatchDYJServerHandler.name)
  24. public class WatchDYJServerHandler extends HexBinaryAcceptanceHandlerAdapter {
  25. public static final String name = "WatchDYJServerHandler";
  26. private static final Logger logger = LoggerFactory.getLogger(WatchDYJServerHandler.class);
  27. @Override
  28. protected void handle(ByteBuf in, Channel channel) throws Exception {
  29. String msg = byteBufferToString(in.nioBuffer());
  30. Advice advice = setAdvice(msg);
  31. switch (advice.getAdviceType()) {
  32. case "INIT": // 初始化
  33. String init = "INIT,1";
  34. normalReply(advice, channel, init);
  35. break;
  36. case "LK": // 连接
  37. Date date = new Date();
  38. String ymd = DateUtil.formatDate2StringYMD(date);
  39. String hms = DateUtil.formatDate2StringHms(date);
  40. StringBuilder sb = new StringBuilder("LK,").append(ymd).append(",").append(hms);
  41. normalReply(advice, channel, sb.toString());
  42. break;
  43. case "UD": // 位置信息
  44. byte[] dataByteArray = new byte[in.readableBytes()];
  45. in.readBytes(dataByteArray);
  46. sendMsg2Kafka(dataByteArray, advice.getDeviceId(), channel);
  47. logger.info("正常存储设备信息:" +advice.toString());
  48. normalReply(advice, channel, "UD");
  49. break;
  50. default: // 其他
  51. normalReply(advice, channel, advice.getAdviceType());
  52. break;
  53. }
  54. }
  55. protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
  56. ByteBuf dataByteBufCopy = dataByteBuf.copy();
  57. byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
  58. dataByteBufCopy.readBytes(dataByteArray);
  59. dataByteBufCopy.release();
  60. }
  61. /**
  62. * 回复
  63. *
  64. * @param advice
  65. * @param channel
  66. * @content 回复内容
  67. */
  68. private void normalReply(Advice advice, Channel channel, String content) {
  69. String facotry = advice.getFacotry();
  70. String adviceType = advice.getAdviceType();
  71. String deviceId = advice.getDeviceId();
  72. StringBuilder replyCommand = new StringBuilder();
  73. replyCommand.append("[");
  74. replyCommand.append(facotry).append("*");
  75. replyCommand.append(deviceId).append("*");
  76. replyCommand.append("0002").append("*");// 指令流水
  77. replyCommand.append( Integer.toHexString(content.length())).append("*");// 内容长度
  78. replyCommand.append(content);// 指令内容标记
  79. replyCommand.append("]");
  80. String replyCommandStr = replyCommand.toString();
  81. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  82. buffer.writeBytes(replyCommandStr.getBytes());
  83. ChannelFuture channelFuture = channel.writeAndFlush(buffer);
  84. channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
  85. }
  86. /**
  87. * 解析 Advice 数据
  88. *
  89. * @param msg
  90. * @return
  91. */
  92. private Advice setAdvice(String msg) {
  93. Advice advice = new Advice();
  94. try {
  95. String message = String.valueOf(msg);
  96. int startIndex = message.indexOf("[");
  97. int endIndex = message.indexOf("]");
  98. String data = message.substring(startIndex + 1, endIndex);
  99. String[] bodys = data.split("\\*");
  100. advice.setFacotry(bodys[0]);// 厂商
  101. advice.setDeviceId(bodys[1]);// 设备Id
  102. advice.setAdviceSerial(bodys[2]);//指令流水
  103. advice.setAdvicelength(bodys[3]);//指令长度
  104. String[] contents = bodys[4].split(",");//获取内容
  105. advice.setAdviceType(contents[0]);// 标记
  106. return advice;
  107. } catch (Exception e) {
  108. e.printStackTrace();
  109. }
  110. return null;
  111. }
  112. }