|
@@ -1,28 +1,18 @@
|
|
|
package com.tidecloud.dataacceptance.service;
|
|
|
|
|
|
-import java.io.BufferedWriter;
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileOutputStream;
|
|
|
-import java.io.FileWriter;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.OutputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.CharBuffer;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.nio.charset.CharsetDecoder;
|
|
|
-import java.text.SimpleDateFormat;
|
|
|
-import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
-import java.util.UUID;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import javax.xml.bind.DatatypeConverter;
|
|
|
|
|
|
-import org.apache.commons.lang.ArrayUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.slf4j.MDC;
|
|
@@ -34,14 +24,13 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|
|
import org.springframework.util.concurrent.SuccessCallback;
|
|
|
|
|
|
import com.smartsanitation.common.util.StringUtil;
|
|
|
-import com.tidecloud.dataacceptance.common.NumUtil;
|
|
|
import com.tidecloud.dataacceptance.entity.ConnectMsg;
|
|
|
import com.tidecloud.dataacceptance.entity.Session;
|
|
|
import com.tidecloud.dataacceptance.entity.SessionManager;
|
|
|
import com.tidecloud.dataacceptance.service.impl.YiTongGpsServerHandler;
|
|
|
+import com.tidecloud.dataacceptance.util.FileUtils;
|
|
|
|
|
|
import io.netty.bootstrap.ServerBootstrap;
|
|
|
-import io.netty.buffer.ByteBuf;
|
|
|
import io.netty.channel.Channel;
|
|
|
import io.netty.channel.ChannelFuture;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
@@ -162,52 +151,67 @@ public class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
|
|
|
MDC.put("strategyName", AcceptanceInboundHandlerAdapter.this.getPrefixName());
|
|
|
// 失败业务逻辑
|
|
|
logger.error("发送kafka失败.deviceId:{}, msg:{}", key, msg);
|
|
|
- singleThreadPool.execute(() -> dataStorage(msg));
|
|
|
+ singleThreadPool.execute(() -> FileUtils.dataStorage(dataByteArray,dataPath,prefixName));
|
|
|
}
|
|
|
};
|
|
|
listenableFuture.addCallback(successCallback, failureCallback);
|
|
|
|
|
|
}
|
|
|
-
|
|
|
- public void dataStorage(String deviceStr) {
|
|
|
- File path = new File(dataPath);
|
|
|
- File[] listFiles = path.listFiles();
|
|
|
- boolean isTouch = true;
|
|
|
- if (listFiles != null) {
|
|
|
- for (File sonFile : listFiles) {
|
|
|
- long len = sonFile.length();
|
|
|
- if (len < TEN_M) {
|
|
|
- // String fileName = dataPath + prefixName + new
|
|
|
- // SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
- // File file = new File(fileName);
|
|
|
- writeDevice2File(sonFile, deviceStr);
|
|
|
- logger.info("正在写入数据: " + deviceStr);
|
|
|
- isTouch = false;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (isTouch) {
|
|
|
- String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
- File file = new File(fileName);
|
|
|
- writeDevice2File(file, deviceStr);
|
|
|
- logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:"
|
|
|
- + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public static void writeDevice2File(File file, String deviceStr) {
|
|
|
- Integer length = deviceStr.getBytes().length;
|
|
|
- try {
|
|
|
- OutputStream outputStream = new FileOutputStream(file, true);
|
|
|
- byte[] outPutBytes = ArrayUtils.addAll(NumUtil.int2bytes(length), deviceStr.getBytes());
|
|
|
- outputStream.write(outPutBytes);
|
|
|
- outputStream.flush();
|
|
|
- outputStream.close();
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+// public void dataStorage(byte[] deviceByteArr) {
|
|
|
+//
|
|
|
+// File path = new File(dataPath);
|
|
|
+// File[] listFiles = path.listFiles();
|
|
|
+// boolean isTouch = true;
|
|
|
+// if (listFiles != null) {
|
|
|
+// for (File sonFile : listFiles) {
|
|
|
+// long len = sonFile.length();
|
|
|
+// if (len < TEN_M) {
|
|
|
+// // String fileName = dataPath + prefixName + new
|
|
|
+// // SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
+// // File file = new File(fileName);
|
|
|
+// writeDevice2File(sonFile, deviceByteArr);
|
|
|
+// logger.info("正在写入数据: " + deviceByteArr);
|
|
|
+// isTouch = false;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// if (isTouch) {
|
|
|
+// String fileName = dataPath +File.separator+ prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
+// File file = new File(fileName);
|
|
|
+// writeDevice2File(file, deviceByteArr);
|
|
|
+// logger.info("满10M,创建新的文件 正在写入数据:" + deviceByteArr + "timestamp:"
|
|
|
+// + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// public static void writeDevice2File(File file, byte[] deviceByteArr){
|
|
|
+// FileOutputStream fos = null;
|
|
|
+// try {
|
|
|
+// fos = new FileOutputStream(file, true);
|
|
|
+// Integer length = deviceByteArr.length;
|
|
|
+// byte[] lengthBytes = NumUtil.int2bytes(length);
|
|
|
+// byte[] deviceBytes = deviceByteArr;
|
|
|
+// byte[] dataBytes = ArrayUtils.addAll(lengthBytes, deviceBytes);
|
|
|
+//
|
|
|
+// FileChannel fc = fos.getChannel();
|
|
|
+//
|
|
|
+// ByteBuffer bbf = ByteBuffer.wrap(dataBytes);
|
|
|
+// // bbf.flip();
|
|
|
+// fc.write(bbf) ;
|
|
|
+// fc.close();
|
|
|
+// fos.flush();
|
|
|
+// } catch (IOException e) {
|
|
|
+// logger.error(e.getMessage());
|
|
|
+// } finally {
|
|
|
+// if (fos != null) {
|
|
|
+// try {
|
|
|
+// fos.close();
|
|
|
+// } catch (IOException e) {
|
|
|
+// logger.error(e.getMessage());
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
@Override
|
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
cause.printStackTrace();
|