AcceptanceInboundHandlerAdapter.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. package com.tidecloud.dataacceptance.service;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileOutputStream;
  5. import java.io.FileWriter;
  6. import java.io.IOException;
  7. import java.io.OutputStream;
  8. import java.nio.ByteBuffer;
  9. import java.nio.CharBuffer;
  10. import java.nio.charset.Charset;
  11. import java.nio.charset.CharsetDecoder;
  12. import java.text.SimpleDateFormat;
  13. import java.util.Date;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. import java.util.Set;
  17. import java.util.UUID;
  18. import java.util.concurrent.ExecutorService;
  19. import java.util.concurrent.Executors;
  20. import java.util.concurrent.TimeUnit;
  21. import org.apache.commons.lang.ArrayUtils;
  22. import org.slf4j.Logger;
  23. import org.slf4j.LoggerFactory;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.kafka.core.KafkaTemplate;
  26. import org.springframework.kafka.support.SendResult;
  27. import org.springframework.util.concurrent.FailureCallback;
  28. import org.springframework.util.concurrent.ListenableFuture;
  29. import org.springframework.util.concurrent.SuccessCallback;
  30. import com.smartsanitation.common.util.StringUtil;
  31. import com.tidecloud.dataacceptance.common.NumUtil;
  32. import com.tidecloud.dataacceptance.entity.ConnectMsg;
  33. import com.tidecloud.dataacceptance.entity.Session;
  34. import com.tidecloud.dataacceptance.entity.SessionManager;
  35. import com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler;
  36. import io.netty.bootstrap.ServerBootstrap;
  37. import io.netty.buffer.ByteBuf;
  38. import io.netty.channel.Channel;
  39. import io.netty.channel.ChannelFuture;
  40. import io.netty.channel.ChannelHandlerContext;
  41. import io.netty.channel.ChannelInboundHandlerAdapter;
  42. import io.netty.channel.ChannelInitializer;
  43. import io.netty.channel.ChannelOption;
  44. import io.netty.channel.EventLoopGroup;
  45. import io.netty.channel.nio.NioEventLoopGroup;
  46. import io.netty.channel.socket.SocketChannel;
  47. import io.netty.channel.socket.nio.NioServerSocketChannel;
  48. import io.netty.handler.timeout.IdleState;
  49. import io.netty.handler.timeout.IdleStateEvent;
  50. import io.netty.handler.timeout.IdleStateHandler;
  51. import redis.clients.jedis.Jedis;
  52. import redis.clients.jedis.JedisPool;
  53. public class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
  54. protected String ip;
  55. protected Integer port;
  56. protected String dataPath;
  57. protected String prefixName;
  58. private static final Logger logger = LoggerFactory.getLogger(AcceptanceInboundHandlerAdapter.class);
  59. private static final Long TEN_M = 10485760l;
  60. public static String PREFIX_LINK = "s.";
  61. public static String PREFIX_LINK_BACK = "s.b.";
  62. public static String PREFIX_DEVICE = "d.";
  63. public static final Integer REDIS_INDEX_LINK = 15;
  64. public static Map<String, String> channelMap = new HashMap<String, String>();
  65. public static Map<String, Channel> manageChannelMap = new HashMap<>();
  66. public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
  67. public static Map<String, String> commandCopy = new HashMap<>();
  68. public static Map<String, Channel> socketyChannelMap = new HashMap<>();
  69. public static Map<Channel, String> channelDeviceMap = new HashMap<>();
  70. private final SessionManager sessionManager = SessionManager.getInstance();
  71. private static ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
  72. private static ExecutorService kafkaSendThreadPool = Executors.newSingleThreadExecutor();
  73. @Autowired
  74. private JedisPool jedisPool;
  75. @Autowired
  76. private KafkaTemplate<Integer, String> kafkaTemplate;
  77. private String topic;
  78. protected void receiveMsg(String msg, String deviceId, Channel channel) {
  79. manageChannel(channel, deviceId);
  80. kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
  81. //singleThreadPool.execute(() -> dataStorage(msg));
  82. }
  83. protected void manageChannel(Channel channel, String deviceId) {
  84. String socketkey = channel.id().asLongText();
  85. Channel channelInMap = socketyChannelMap.get(socketkey);
  86. String deviceIdInMap = channelDeviceMap.get(channel);
  87. if (channelInMap != null && deviceIdInMap != null) {
  88. logger.debug("device [{}] has link [{}]", deviceId, socketkey);
  89. return;
  90. }
  91. socketyChannelMap.put(socketkey, channel);
  92. channelDeviceMap.put(channel, deviceId);
  93. String addressStr = ConnectMsg.ipToLong(ip);
  94. ConnectMsg cMsg = new ConnectMsg(ip, socketkey);
  95. try (Jedis jedis = jedisPool.getResource()){
  96. jedis.select(REDIS_INDEX_LINK);
  97. String insertKey = PREFIX_LINK + addressStr;
  98. String selectKey = PREFIX_DEVICE + deviceId;
  99. String insertBackupKey = PREFIX_LINK_BACK + addressStr;
  100. jedis.sadd(insertKey, socketkey);
  101. jedis.sadd(insertBackupKey, deviceId);
  102. jedis.set(selectKey, StringUtil.convert2String(cMsg));
  103. } catch (Exception e) {
  104. logger.error(e.getMessage());
  105. }
  106. }
  107. protected void sendKakfaMsg(String msg, String deviceId) {
  108. Integer key = new Integer(deviceId.hashCode());
  109. ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
  110. // 发送成功回调
  111. SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
  112. @Override
  113. public void onSuccess(SendResult<Integer, String> result) {
  114. // 成功业务逻辑
  115. logger.info("发送kafka成功.deviceId:{}, msg:{}", key, msg);
  116. }
  117. };
  118. // 发送失败回调
  119. FailureCallback failureCallback = new FailureCallback() {
  120. @Override
  121. public void onFailure(Throwable ex) {
  122. // 失败业务逻辑
  123. logger.error("发送kafka失败.deviceId:{}, msg:{}", key, msg);
  124. }
  125. };
  126. listenableFuture.addCallback(successCallback, failureCallback);
  127. }
  128. public void dataStorage(String deviceStr) {
  129. File path = new File(dataPath);
  130. File[] listFiles = path.listFiles();
  131. boolean isTouch = true;
  132. if (listFiles != null) {
  133. for (File sonFile : listFiles) {
  134. long len = sonFile.length();
  135. if (len < TEN_M) {
  136. // String fileName = dataPath + prefixName + new
  137. // SimpleDateFormat("yyMMddHHmmss").format(new Date());
  138. // File file = new File(fileName);
  139. writeDevice2File(sonFile, deviceStr);
  140. logger.info("正在写入数据: " + deviceStr);
  141. isTouch = false;
  142. }
  143. }
  144. }
  145. if (isTouch) {
  146. String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
  147. File file = new File(fileName);
  148. writeDevice2File(file, deviceStr);
  149. logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:"
  150. + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
  151. }
  152. }
  153. public static void writeDevice2File(File file, String deviceStr) {
  154. Integer length = deviceStr.getBytes().length;
  155. try {
  156. OutputStream outputStream = new FileOutputStream(file, true);
  157. byte[] outPutBytes = ArrayUtils.addAll(NumUtil.int2bytes(length), deviceStr.getBytes());
  158. outputStream.write(outPutBytes);
  159. outputStream.flush();
  160. outputStream.close();
  161. } catch (IOException e) {
  162. logger.error(e.getMessage());
  163. }
  164. }
  165. @Override
  166. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  167. cause.printStackTrace();
  168. ctx.close();
  169. }
  170. @Override
  171. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  172. Session session = Session.buildSession(ctx.channel());
  173. sessionManager.put(session.getId(), session);
  174. logger.debug("client linking server session : [{}]", StringUtil.convert2String(session));
  175. }
  176. @Override
  177. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  178. Channel channel = ctx.channel();
  179. if (!channel.isActive()) {
  180. String deviceId = channelDeviceMap.get(channel);
  181. if (deviceId != null) {
  182. channelDeviceMap.remove(channel);
  183. deleteLinkFromRedis(deviceId);
  184. }
  185. }
  186. super.channelInactive(ctx);
  187. ctx.close();
  188. final String sessionId = ctx.channel().id().asLongText();
  189. Session session = sessionManager.findBySessionId(sessionId);
  190. this.sessionManager.removeBySessionId(sessionId);
  191. logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session));
  192. }
  193. protected void deleteLinkFromRedis(String deviceId) {
  194. String deleteKey = PREFIX_DEVICE + deviceId;
  195. try (Jedis jedis = jedisPool.getResource()) {
  196. jedis.select(REDIS_INDEX_LINK);
  197. String connectMsg = jedis.get(deleteKey);
  198. if (connectMsg != null) {
  199. ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
  200. String socketId = cmsg.getSocketId();
  201. socketyChannelMap.remove(socketId);
  202. String socketQueryKey = PREFIX_LINK + this.ip;
  203. jedis.srem(socketQueryKey, socketId);
  204. jedis.del(deleteKey);
  205. logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
  206. }
  207. } catch (Exception e) {
  208. logger.error(e.getLocalizedMessage());
  209. }
  210. }
  211. @Override
  212. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  213. if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
  214. IdleStateEvent event = (IdleStateEvent) evt;
  215. if (event.state() == IdleState.READER_IDLE) {
  216. Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
  217. logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session));
  218. ctx.close();
  219. }
  220. }
  221. }
  222. public static String byteBufferToString(ByteBuffer buffer) {
  223. CharBuffer charBuffer = null;
  224. try {
  225. Charset charset = Charset.forName("UTF-8");
  226. CharsetDecoder decoder = charset.newDecoder();
  227. charBuffer = decoder.decode(buffer);
  228. buffer.flip();
  229. return charBuffer.toString();
  230. } catch (Exception ex) {
  231. ex.printStackTrace();
  232. return null;
  233. }
  234. }
  235. protected Integer getInteger(String str) {
  236. if (str == null) {
  237. return null;
  238. }
  239. return Integer.valueOf(str);
  240. }
  241. protected Double getDouble(String str) {
  242. if (str == null) {
  243. return null;
  244. }
  245. return Double.valueOf(str);
  246. }
  247. static byte[] int2bytes(int num) {
  248. byte[] b = new byte[4];
  249. // int mask=0xff;
  250. for (int i = 0; i < 4; i++) {
  251. b[3 - i] = (byte) (num >>> (24 - i * 8));
  252. }
  253. return b;
  254. }
  255. public void startAcceptor() {
  256. EventLoopGroup bossGroup = new NioEventLoopGroup();
  257. EventLoopGroup workerGroup = new NioEventLoopGroup();
  258. try {
  259. ServerBootstrap b = new ServerBootstrap();
  260. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  261. .childHandler(new ChannelInitializer<SocketChannel>() {
  262. @Override
  263. protected void initChannel(SocketChannel ch) throws Exception {
  264. ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES));
  265. ch.pipeline().addLast(AcceptanceInboundHandlerAdapter.this);
  266. }
  267. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  268. ChannelFuture f = b.bind(this.getPort()).sync();
  269. logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
  270. this.getPort());
  271. f.channel().closeFuture().sync();
  272. } catch (Exception e) {
  273. logger.error(e.getMessage());
  274. } finally {
  275. cleanRedisLinkData();
  276. workerGroup.shutdownGracefully();
  277. bossGroup.shutdownGracefully();
  278. }
  279. }
  280. protected void cleanRedisLinkData() {
  281. try (Jedis jedis = jedisPool.getResource()) {
  282. jedis.select(YiTongGpsServerHandler.REDIS_INDEX_LINK);
  283. String addressStr = ConnectMsg.ipToLong(this.ip);
  284. String selectKey = YiTongGpsServerHandler.PREFIX_LINK_BACK + addressStr;
  285. Set<String> values = jedis.smembers(selectKey);
  286. for (String deviceId : values) {
  287. String deleteKeyOfDevice = YiTongGpsServerHandler.PREFIX_DEVICE + deviceId;
  288. String deleteKeyOfLink = YiTongGpsServerHandler.PREFIX_LINK + addressStr;
  289. String connectMsgStr = jedis.get(deleteKeyOfDevice);
  290. if (connectMsgStr != null) {
  291. ConnectMsg connectMsg = StringUtil.convert2Object(connectMsgStr, ConnectMsg.class);
  292. String socketId = connectMsg.getSocketId();
  293. jedis.del(deleteKeyOfDevice);
  294. jedis.srem(deleteKeyOfLink, socketId);
  295. } else {
  296. logger.error("error deviceId [{}] in select [{}] key [{}]", deviceId, 15, deleteKeyOfDevice);
  297. }
  298. }
  299. jedis.del(selectKey);
  300. } catch (Exception e) {
  301. logger.error(e.getLocalizedMessage());
  302. }
  303. }
  304. public String getDataPath() {
  305. return dataPath;
  306. }
  307. public void setDataPath(String dataPath) {
  308. this.dataPath = dataPath;
  309. }
  310. public String getTopic() {
  311. return topic;
  312. }
  313. public void setTopic(String topic) {
  314. this.topic = topic;
  315. }
  316. public String getIp() {
  317. return ip;
  318. }
  319. public void setIp(String ip) {
  320. this.ip = ip;
  321. }
  322. public Integer getPort() {
  323. return port;
  324. }
  325. public void setPort(Integer port) {
  326. this.port = port;
  327. }
  328. public String getPrefixName() {
  329. return prefixName;
  330. }
  331. public void setPrefixName(String prefixName) {
  332. this.prefixName = prefixName;
  333. }
  334. }