BaseMsgProcessService.java 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package com.tidecloud.dataacceptance.service;
  2. import javax.xml.bind.DatatypeConverter;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import com.tidecloud.dataacceptance.entity.Session;
  6. import com.tidecloud.dataacceptance.entity.SessionManager;
  7. import io.netty.buffer.ByteBuf;
  8. import io.netty.buffer.PooledByteBufAllocator;
  9. import io.netty.buffer.Unpooled;
  10. import io.netty.channel.Channel;
  11. import io.netty.channel.ChannelFuture;
  12. /**
  13. * @author: chudk
  14. * @date: 2017年11月8日 下午4:03:23
  15. */
  16. public class BaseMsgProcessService {
  17. protected final Logger log = LoggerFactory.getLogger(getClass());
  18. protected SessionManager sessionManager;
  19. public BaseMsgProcessService() {
  20. this.sessionManager = SessionManager.getInstance();
  21. }
  22. protected ByteBuf getByteBuf(byte[] arr) {
  23. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(arr.length);
  24. byteBuf.writeBytes(arr);
  25. return byteBuf;
  26. }
  27. public void send2Client(Channel channel, byte[] arr) throws InterruptedException {
  28. ChannelFuture future = channel.writeAndFlush(Unpooled.copiedBuffer(arr)).sync();
  29. String copyStr = DatatypeConverter.printHexBinary(arr);
  30. log.info("send copy message [{}] >>>> client", copyStr);
  31. if (!future.isSuccess()) {
  32. log.error("发送数据出错:{}", future.cause());
  33. }
  34. }
  35. protected int getFlowId(Channel channel, int defaultValue) {
  36. Session session = this.sessionManager.findBySessionId(Session.buildId(channel));
  37. if (session == null) {
  38. return defaultValue;
  39. }
  40. return session.currentFlowId();
  41. }
  42. protected int getFlowId(Channel channel) {
  43. return this.getFlowId(channel, 0);
  44. }
  45. }