|
@@ -2,8 +2,10 @@ package com.tidecloud.dataacceptance.service;
|
|
|
|
|
|
import java.io.BufferedWriter;
|
|
|
import java.io.File;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.FileWriter;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.CharBuffer;
|
|
|
import java.nio.charset.Charset;
|
|
@@ -16,6 +18,7 @@ import java.util.UUID;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
|
+import org.apache.commons.lang.ArrayUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -26,6 +29,7 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|
|
import org.springframework.util.concurrent.SuccessCallback;
|
|
|
|
|
|
import com.smartsanitation.common.util.StringUtil;
|
|
|
+import com.tidecloud.dataacceptance.common.NumUtil;
|
|
|
import com.tidecloud.dataacceptance.entity.ConnectMsg;
|
|
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
@@ -36,25 +40,32 @@ import redis.clients.jedis.Jedis;
|
|
|
import redis.clients.jedis.JedisPool;
|
|
|
|
|
|
public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
|
|
|
-
|
|
|
+
|
|
|
protected String ip;
|
|
|
-
|
|
|
+
|
|
|
protected Integer port;
|
|
|
|
|
|
protected String dataPath;
|
|
|
|
|
|
- protected String prefixName ;
|
|
|
-
|
|
|
+ protected String prefixName;
|
|
|
+
|
|
|
private static final Logger logger = LoggerFactory.getLogger(AcceptanceInboundHandlerAdapter.class);
|
|
|
private static final Long TEN_M = 10485760l;
|
|
|
-
|
|
|
- private static String PREFIX_LINK = "s.";
|
|
|
- private static String PREFIX_DEVICE = "d.";
|
|
|
+
|
|
|
+ public static String PREFIX_LINK = "s.";
|
|
|
+ public static String PREFIX_LINK_BACK = "s.b.";
|
|
|
+ public static String PREFIX_DEVICE = "d.";
|
|
|
+
|
|
|
+ public static final Integer REDIS_INDEX_LINK = 15;
|
|
|
|
|
|
public static Map<String, String> channelMap = new HashMap<String, String>();
|
|
|
public static Map<String, Channel> manageChannelMap = new HashMap<>();
|
|
|
public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
|
|
|
public static Map<String, String> commandCopy = new HashMap<>();
|
|
|
+
|
|
|
+ public static Map<String, Channel> socketyChannelMap = new HashMap<>();
|
|
|
+ public static Map<Channel, String> channelDeviceMap = new HashMap<>();
|
|
|
+
|
|
|
private static ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
|
|
|
|
|
|
private static ExecutorService kafkaSendThreadPool = Executors.newSingleThreadExecutor();
|
|
@@ -65,57 +76,56 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
|
|
|
|
|
|
private String topic;
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
- ByteBuf byteBuf = (ByteBuf)msg;
|
|
|
- String str = byteBufferToString(byteBuf.nioBuffer());
|
|
|
- logger.info("上传数据:{}", str);
|
|
|
-
|
|
|
- try {
|
|
|
- reply(ctx, str);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
- } finally {
|
|
|
- byteBuf.release();
|
|
|
- }
|
|
|
- }
|
|
|
- abstract public void reply(ChannelHandlerContext ctx, String msg) throws Exception;
|
|
|
-
|
|
|
- protected void sendMsg(String msg,String deviceId) {
|
|
|
- kafkaSendThreadPool.execute(()->sendKakfaMsg(msg,deviceId));
|
|
|
- singleThreadPool.execute(()->dataStorage(msg));
|
|
|
- }
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
+ ByteBuf byteBuf = (ByteBuf) msg;
|
|
|
+ String str = byteBufferToString(byteBuf.nioBuffer());
|
|
|
+ logger.info("接入数据:{}", str);
|
|
|
|
|
|
+ try {
|
|
|
+ reply(ctx, str);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ } finally {
|
|
|
+ byteBuf.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ abstract public void reply(ChannelHandlerContext ctx, String msg) throws Exception;
|
|
|
+
|
|
|
|
|
|
- public void manageLink(Channel channel, String deviceId) {
|
|
|
- String channelId = UUID.randomUUID().toString();
|
|
|
- manageChannelMap.put(channelId, channel);
|
|
|
- channelMap.put(channelId, deviceId);
|
|
|
- channelMapOfChannelKey.put(deviceId, channel);
|
|
|
- logger.info("链接管理,链接id [{}]", channelId);
|
|
|
+ protected void receiveMsg(String msg, String deviceId, Channel channel) {
|
|
|
+ Channel channelInMap = channelMapOfChannelKey.get(deviceId);
|
|
|
+ if (channelInMap == null) {
|
|
|
+ manageChannel(channel, deviceId);
|
|
|
+ }
|
|
|
+ kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
|
|
|
+ singleThreadPool.execute(() -> dataStorage(msg));
|
|
|
+ }
|
|
|
|
|
|
- ConnectMsg cMsg = new ConnectMsg(this.ip, channelId);
|
|
|
+ private void manageChannel(Channel channel, String deviceId) {
|
|
|
+ String socketkey = UUID.randomUUID().toString();
|
|
|
+ socketyChannelMap.put(socketkey, channel);
|
|
|
+ channelDeviceMap.put(channel, deviceId);
|
|
|
+ String addressStr = ConnectMsg.ipToLong(this.ip);
|
|
|
+ ConnectMsg cMsg = new ConnectMsg(this.ip, socketkey);
|
|
|
try (Jedis jedis = jedisPool.getResource()) {
|
|
|
- jedis.select(15);
|
|
|
- String insertKey = PREFIX_LINK + ip;
|
|
|
+ jedis.select(REDIS_INDEX_LINK);
|
|
|
+ String insertKey = PREFIX_LINK + addressStr;
|
|
|
String selectKey = PREFIX_DEVICE + deviceId;
|
|
|
-
|
|
|
- jedis.set(insertKey, deviceId);
|
|
|
+ String insertBackupKey = PREFIX_LINK_BACK + addressStr;
|
|
|
+ jedis.sadd(insertKey, socketkey);
|
|
|
+ jedis.sadd(insertBackupKey, deviceId);
|
|
|
jedis.set(selectKey, StringUtil.convert2String(cMsg));
|
|
|
} catch (Exception e) {
|
|
|
- logger.error(e.getLocalizedMessage());
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
protected void sendKakfaMsg(String msg, String deviceId) {
|
|
|
Integer key = new Integer(deviceId.hashCode());
|
|
|
ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
|
|
|
-
|
|
|
+
|
|
|
// 发送成功回调
|
|
|
SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
|
|
|
@Override
|
|
@@ -162,24 +172,37 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // public static void writeDevice2File(File file, String deviceStr) {
|
|
|
+ // BufferedWriter bufferedWriter = null;
|
|
|
+ // try {
|
|
|
+ // bufferedWriter = new BufferedWriter(new FileWriter(file, true));
|
|
|
+ // bufferedWriter.write(deviceStr);
|
|
|
+ // bufferedWriter.newLine();
|
|
|
+ // bufferedWriter.flush();
|
|
|
+ // } catch (IOException e) {
|
|
|
+ // logger.error(e.getMessage());
|
|
|
+ // } finally {
|
|
|
+ // if (bufferedWriter != null) {
|
|
|
+ // try {
|
|
|
+ // bufferedWriter.close();
|
|
|
+ // } catch (IOException e) {
|
|
|
+ // logger.error("close file[{}] failed: {}", file.getName(), e.getMessage());
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+
|
|
|
public static void writeDevice2File(File file, String deviceStr) {
|
|
|
- BufferedWriter bufferedWriter = null;
|
|
|
+ Integer length = deviceStr.getBytes().length;
|
|
|
try {
|
|
|
- bufferedWriter = new BufferedWriter(new FileWriter(file, true));
|
|
|
- bufferedWriter.write(deviceStr);
|
|
|
- bufferedWriter.newLine();
|
|
|
- bufferedWriter.flush();
|
|
|
+ OutputStream outputStream = new FileOutputStream(file, true);
|
|
|
+ byte[] outPutBytes = ArrayUtils.addAll(NumUtil.int2bytes(length), deviceStr.getBytes());
|
|
|
+ outputStream.write(outPutBytes);
|
|
|
+ outputStream.flush();
|
|
|
+ outputStream.close();
|
|
|
} catch (IOException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
- } finally {
|
|
|
- if (bufferedWriter != null) {
|
|
|
- try {
|
|
|
- bufferedWriter.close();
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error("close file[{}] failed: {}", file.getName(), e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -194,6 +217,39 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
|
|
|
saveChannel(ctx);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ if (!channel.isActive()) {
|
|
|
+ String deviceId = channelDeviceMap.get(channel);
|
|
|
+ if (deviceId != null) {
|
|
|
+ channelDeviceMap.remove(channel);
|
|
|
+ // deleteLinkFromRedis(deviceId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ super.channelInactive(ctx);
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void deleteLinkFromRedis(String deviceId) {
|
|
|
+ String deleteKey = PREFIX_DEVICE + deviceId;
|
|
|
+ try (Jedis jedis = jedisPool.getResource()) {
|
|
|
+ jedis.select(REDIS_INDEX_LINK);
|
|
|
+ String connectMsg = jedis.get(deleteKey);
|
|
|
+ if (connectMsg != null) {
|
|
|
+ ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
|
|
|
+ String socketId = cmsg.getSocketId();
|
|
|
+ socketyChannelMap.remove(socketId);
|
|
|
+ String socketQueryKey = PREFIX_LINK + this.ip;
|
|
|
+ jedis.srem(socketQueryKey, socketId);
|
|
|
+ jedis.del(deleteKey);
|
|
|
+ logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getLocalizedMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void saveChannel(ChannelHandlerContext ctx) {
|
|
|
}
|
|
|
|
|
@@ -249,21 +305,27 @@ public abstract class AcceptanceInboundHandlerAdapter extends ChannelInboundHand
|
|
|
public void setTopic(String topic) {
|
|
|
this.topic = topic;
|
|
|
}
|
|
|
+
|
|
|
public String getIp() {
|
|
|
return ip;
|
|
|
}
|
|
|
+
|
|
|
public void setIp(String ip) {
|
|
|
this.ip = ip;
|
|
|
}
|
|
|
+
|
|
|
public Integer getPort() {
|
|
|
return port;
|
|
|
}
|
|
|
+
|
|
|
public void setPort(Integer port) {
|
|
|
this.port = port;
|
|
|
}
|
|
|
+
|
|
|
public String getPrefixName() {
|
|
|
return prefixName;
|
|
|
}
|
|
|
+
|
|
|
public void setPrefixName(String prefixName) {
|
|
|
this.prefixName = prefixName;
|
|
|
}
|