123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- package com.tidecloud.dataacceptance.service;
- import java.io.BufferedWriter;
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- import java.nio.charset.Charset;
- import java.nio.charset.CharsetDecoder;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
- import java.util.UUID;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import org.apache.commons.lang.ArrayUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.util.concurrent.FailureCallback;
- import org.springframework.util.concurrent.ListenableFuture;
- import org.springframework.util.concurrent.SuccessCallback;
- import com.smartsanitation.common.util.StringUtil;
- import com.tidecloud.dataacceptance.common.NumUtil;
- import com.tidecloud.dataacceptance.entity.ConnectMsg;
- import com.tidecloud.dataacceptance.entity.Session;
- import com.tidecloud.dataacceptance.entity.SessionManager;
- import com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import io.netty.handler.timeout.IdleStateHandler;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- public class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
- protected String ip;
- protected Integer port;
- protected String dataPath;
- protected String prefixName;
- private static final Logger logger = LoggerFactory.getLogger(AcceptanceInboundHandlerAdapter.class);
- private static final Long TEN_M = 10485760l;
- public static String PREFIX_LINK = "s.";
- public static String PREFIX_LINK_BACK = "s.b.";
- public static String PREFIX_DEVICE = "d.";
- public static final Integer REDIS_INDEX_LINK = 15;
- public static Map<String, String> channelMap = new HashMap<String, String>();
- public static Map<String, Channel> manageChannelMap = new HashMap<>();
- public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
- public static Map<String, String> commandCopy = new HashMap<>();
- public static Map<String, Channel> socketyChannelMap = new HashMap<>();
- public static Map<Channel, String> channelDeviceMap = new HashMap<>();
- private final SessionManager sessionManager = SessionManager.getInstance();
- private static ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
- private static ExecutorService kafkaSendThreadPool = Executors.newSingleThreadExecutor();
- @Autowired
- private JedisPool jedisPool;
- @Autowired
- private KafkaTemplate<Integer, String> kafkaTemplate;
- private String topic;
-
- protected void receiveMsg(String msg, String deviceId, Channel channel) {
- manageChannel(channel, deviceId);
- kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
- //singleThreadPool.execute(() -> dataStorage(msg));
- }
- protected void manageChannel(Channel channel, String deviceId) {
- String socketkey = channel.id().asLongText();
- Channel channelInMap = socketyChannelMap.get(socketkey);
- String deviceIdInMap = channelDeviceMap.get(channel);
- if (channelInMap != null && deviceIdInMap != null) {
- logger.debug("device [{}] has link [{}]", deviceId, socketkey);
- return;
- }
- socketyChannelMap.put(socketkey, channel);
- channelDeviceMap.put(channel, deviceId);
- String addressStr = ConnectMsg.ipToLong(ip);
- ConnectMsg cMsg = new ConnectMsg(ip, socketkey);
-
- try (Jedis jedis = jedisPool.getResource()){
- jedis.select(REDIS_INDEX_LINK);
- String insertKey = PREFIX_LINK + addressStr;
- String selectKey = PREFIX_DEVICE + deviceId;
- String insertBackupKey = PREFIX_LINK_BACK + addressStr;
- jedis.sadd(insertKey, socketkey);
- jedis.sadd(insertBackupKey, deviceId);
- jedis.set(selectKey, StringUtil.convert2String(cMsg));
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- protected void sendKakfaMsg(String msg, String deviceId) {
- Integer key = new Integer(deviceId.hashCode());
- ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
- // 发送成功回调
- SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
- @Override
- public void onSuccess(SendResult<Integer, String> result) {
- // 成功业务逻辑
- logger.info("发送kafka成功.deviceId:{}, msg:{}", key, msg);
- }
- };
- // 发送失败回调
- FailureCallback failureCallback = new FailureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- // 失败业务逻辑
- logger.error("发送kafka失败.deviceId:{}, msg:{}", key, msg);
- }
- };
- listenableFuture.addCallback(successCallback, failureCallback);
- }
- public void dataStorage(String deviceStr) {
- File path = new File(dataPath);
- File[] listFiles = path.listFiles();
- boolean isTouch = true;
- if (listFiles != null) {
- for (File sonFile : listFiles) {
- long len = sonFile.length();
- if (len < TEN_M) {
- // String fileName = dataPath + prefixName + new
- // SimpleDateFormat("yyMMddHHmmss").format(new Date());
- // File file = new File(fileName);
- writeDevice2File(sonFile, deviceStr);
- logger.info("正在写入数据: " + deviceStr);
- isTouch = false;
- }
- }
- }
- if (isTouch) {
- String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
- File file = new File(fileName);
- writeDevice2File(file, deviceStr);
- logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:"
- + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
- }
- }
- public static void writeDevice2File(File file, String deviceStr) {
- Integer length = deviceStr.getBytes().length;
- try {
- OutputStream outputStream = new FileOutputStream(file, true);
- byte[] outPutBytes = ArrayUtils.addAll(NumUtil.int2bytes(length), deviceStr.getBytes());
- outputStream.write(outPutBytes);
- outputStream.flush();
- outputStream.close();
- } catch (IOException e) {
- logger.error(e.getMessage());
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- Session session = Session.buildSession(ctx.channel());
- sessionManager.put(session.getId(), session);
- logger.debug("client linking server session : [{}]", StringUtil.convert2String(session));
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- if (!channel.isActive()) {
- String deviceId = channelDeviceMap.get(channel);
- if (deviceId != null) {
- channelDeviceMap.remove(channel);
- deleteLinkFromRedis(deviceId);
- }
- }
- super.channelInactive(ctx);
- ctx.close();
- final String sessionId = ctx.channel().id().asLongText();
- Session session = sessionManager.findBySessionId(sessionId);
- this.sessionManager.removeBySessionId(sessionId);
- logger.debug("client disconnect server session is : [{}]", StringUtil.convert2String(session));
- }
- protected void deleteLinkFromRedis(String deviceId) {
- String deleteKey = PREFIX_DEVICE + deviceId;
- try (Jedis jedis = jedisPool.getResource()) {
- jedis.select(REDIS_INDEX_LINK);
- String connectMsg = jedis.get(deleteKey);
- if (connectMsg != null) {
- ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
- String socketId = cmsg.getSocketId();
- socketyChannelMap.remove(socketId);
- String socketQueryKey = PREFIX_LINK + this.ip;
- jedis.srem(socketQueryKey, socketId);
- jedis.del(deleteKey);
- logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
- }
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
- logger.error("server breaking connect session : [{}]", StringUtil.convert2String(session));
- ctx.close();
- }
- }
- }
- public static String byteBufferToString(ByteBuffer buffer) {
- CharBuffer charBuffer = null;
- try {
- Charset charset = Charset.forName("UTF-8");
- CharsetDecoder decoder = charset.newDecoder();
- charBuffer = decoder.decode(buffer);
- buffer.flip();
- return charBuffer.toString();
- } catch (Exception ex) {
- ex.printStackTrace();
- return null;
- }
- }
- protected Integer getInteger(String str) {
- if (str == null) {
- return null;
- }
- return Integer.valueOf(str);
- }
- protected Double getDouble(String str) {
- if (str == null) {
- return null;
- }
- return Double.valueOf(str);
- }
- static byte[] int2bytes(int num) {
- byte[] b = new byte[4];
- // int mask=0xff;
- for (int i = 0; i < 4; i++) {
- b[3 - i] = (byte) (num >>> (24 - i * 8));
- }
- return b;
- }
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES));
- ch.pipeline().addLast(AcceptanceInboundHandlerAdapter.this);
- }
- }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture f = b.bind(this.getPort()).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- protected void cleanRedisLinkData() {
- try (Jedis jedis = jedisPool.getResource()) {
- jedis.select(YiTongGpsServerHandler.REDIS_INDEX_LINK);
- String addressStr = ConnectMsg.ipToLong(this.ip);
- String selectKey = YiTongGpsServerHandler.PREFIX_LINK_BACK + addressStr;
- Set<String> values = jedis.smembers(selectKey);
- for (String deviceId : values) {
- String deleteKeyOfDevice = YiTongGpsServerHandler.PREFIX_DEVICE + deviceId;
- String deleteKeyOfLink = YiTongGpsServerHandler.PREFIX_LINK + addressStr;
- String connectMsgStr = jedis.get(deleteKeyOfDevice);
- if (connectMsgStr != null) {
- ConnectMsg connectMsg = StringUtil.convert2Object(connectMsgStr, ConnectMsg.class);
- String socketId = connectMsg.getSocketId();
- jedis.del(deleteKeyOfDevice);
- jedis.srem(deleteKeyOfLink, socketId);
- } else {
- logger.error("error deviceId [{}] in select [{}] key [{}]", deviceId, 15, deleteKeyOfDevice);
- }
- }
- jedis.del(selectKey);
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- public String getDataPath() {
- return dataPath;
- }
- public void setDataPath(String dataPath) {
- this.dataPath = dataPath;
- }
- public String getTopic() {
- return topic;
- }
- public void setTopic(String topic) {
- this.topic = topic;
- }
- public String getIp() {
- return ip;
- }
- public void setIp(String ip) {
- this.ip = ip;
- }
- public Integer getPort() {
- return port;
- }
- public void setPort(Integer port) {
- this.port = port;
- }
- public String getPrefixName() {
- return prefixName;
- }
- public void setPrefixName(String prefixName) {
- this.prefixName = prefixName;
- }
- }
|