|
@@ -1,370 +1,395 @@
|
|
|
-package com.tidecloud.dataacceptance.service.impl;
|
|
|
-
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileOutputStream;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.OutputStream;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
-import java.nio.CharBuffer;
|
|
|
-import java.nio.charset.Charset;
|
|
|
-import java.nio.charset.CharsetDecoder;
|
|
|
-import java.text.SimpleDateFormat;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.UUID;
|
|
|
-
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import com.smartsanitation.common.util.StringUtil;
|
|
|
-import com.tidecloud.dataacceptance.entity.Advice;
|
|
|
-import com.tidecloud.dataacceptance.entity.ConnectMsg;
|
|
|
-import com.tidecloud.dataacceptance.entity.Device;
|
|
|
-import com.tidecloud.dataacceptance.entity.HrtStart;
|
|
|
-
|
|
|
-import io.netty.buffer.ByteBuf;
|
|
|
-import io.netty.buffer.Unpooled;
|
|
|
-import io.netty.channel.Channel;
|
|
|
-import io.netty.channel.ChannelFuture;
|
|
|
-import io.netty.channel.ChannelHandler.Sharable;
|
|
|
-import io.netty.util.concurrent.Future;
|
|
|
-import io.netty.util.concurrent.GenericFutureListener;
|
|
|
-import io.netty.channel.ChannelHandlerContext;
|
|
|
-import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
-import redis.clients.jedis.Jedis;
|
|
|
-import redis.clients.jedis.JedisPool;
|
|
|
-
|
|
|
-/**
|
|
|
- * Created by vinson on 2017/9/7.
|
|
|
- */
|
|
|
-@Sharable
|
|
|
-@Component
|
|
|
-public class WatchServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
-
|
|
|
- private String dataPath = "/home/service/collector_watch/rawdata/";
|
|
|
- // private String dataPath = "D:\\work\\rawdata1\\";
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(WatchServerHandler.class);
|
|
|
- private static final Long TEN_M = 10485760l;
|
|
|
- private static final String prefixName = "watch";
|
|
|
- private static String PREFIX_LINK = "s.";
|
|
|
- private static String PREFIX_DEVICE = "d.";
|
|
|
-
|
|
|
- public static Map<String, String> channelMap = new HashMap<String, String>();
|
|
|
- public static Map<String, Channel> manageChannelMap = new HashMap<>();
|
|
|
- public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
|
|
|
- public static Map<String, String> commandCopy = new HashMap<>();
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private JedisPool jedisPool;
|
|
|
-
|
|
|
- @Override
|
|
|
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
- ByteBuf byteBuf = (ByteBuf)msg;
|
|
|
- String str = byteBufferToString(byteBuf.nioBuffer());
|
|
|
- logger.info("上传数据:", str);
|
|
|
- try {
|
|
|
- reply(ctx, str);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void reply(ChannelHandlerContext ctx, String msg) throws Exception {
|
|
|
- logger.info("设备上传数据:" + msg);
|
|
|
- Advice advice = getAdevice(msg);
|
|
|
- String deviceId = advice.getDeviceId();
|
|
|
- String adviceType = advice.getAdviceType();
|
|
|
- Channel channel = ctx.channel();
|
|
|
- Channel channelInMap = channelMapOfChannelKey.get(deviceId);
|
|
|
- if (channelInMap == null) {
|
|
|
- manageLink(channel, deviceId);
|
|
|
- }
|
|
|
- switch (adviceType) {
|
|
|
-
|
|
|
- case "UD":
|
|
|
- Device deviceUD = getDevice(msg);
|
|
|
- deviceUD.setDeviceId(deviceId);
|
|
|
- dataStorage(deviceUD);
|
|
|
- logger.info("正常存储设备信息:" + deviceUD.toString());
|
|
|
- break;
|
|
|
- case "UD2":
|
|
|
- Device deviceUD2 = getDevice(msg);
|
|
|
- deviceUD2.setDeviceId(deviceId);
|
|
|
- dataStorage(deviceUD2);
|
|
|
- logger.info("正常存储设备信息:" + getDevice(msg).toString());
|
|
|
- break;
|
|
|
- case "LK":
|
|
|
- normalReply(advice, channel);
|
|
|
- break;
|
|
|
- case "UPLOAD":
|
|
|
- logger.info("device [{}] setting copy time success [{}]", deviceId, new Date());
|
|
|
- break;
|
|
|
- case "heart":
|
|
|
- dataStorageHartstart(deviceId, msg);
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private void dataStorageHartstart(String deviceId, String msg) {
|
|
|
- int startIndex = msg.indexOf("[");
|
|
|
- int endIndex = msg.indexOf("]");
|
|
|
- String data = msg.substring(startIndex + 1, endIndex);
|
|
|
- String[] bodys = data.split(",");
|
|
|
-
|
|
|
- HrtStart hrtStart = new HrtStart();
|
|
|
- hrtStart.setDate(new Date());
|
|
|
- hrtStart.setDeviceId(deviceId);
|
|
|
- hrtStart.setHrtCount(getInteger(bodys[1]));
|
|
|
- String deviceStr = HrtStart.buildHrtStart(hrtStart);
|
|
|
-
|
|
|
- File path = new File(dataPath);
|
|
|
- File[] listFiles = path.listFiles();
|
|
|
- boolean isTouch = true;
|
|
|
- for (File sonFile : listFiles) {
|
|
|
- long len = sonFile.length();
|
|
|
- if (len < TEN_M) {
|
|
|
- // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
- // File file = new File(fileName);
|
|
|
- writeDevice2File(sonFile, deviceStr);
|
|
|
- logger.info("正在写入数据: " + deviceStr);
|
|
|
- isTouch = false;
|
|
|
- }
|
|
|
- }
|
|
|
- if (isTouch) {
|
|
|
- String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
- File file = new File(fileName);
|
|
|
- writeDevice2File(file, deviceStr);
|
|
|
- logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void manageLink(Channel channel, String deviceId) {
|
|
|
- String channelId = UUID.randomUUID().toString();
|
|
|
- manageChannelMap.put(channelId, channel);
|
|
|
- channelMap.put(channelId, deviceId);
|
|
|
- channelMapOfChannelKey.put(deviceId, channel);
|
|
|
- logger.info("链接管理,链接id [{}]", channelId);
|
|
|
-
|
|
|
- ConnectMsg cMsg = new ConnectMsg("10.27.118.76", channelId);
|
|
|
- try (Jedis jedis = jedisPool.getResource()) {
|
|
|
- jedis.select(15);
|
|
|
- String insertKey = PREFIX_LINK + "10.27.118.76";
|
|
|
- String selectKey = PREFIX_DEVICE + deviceId;
|
|
|
-
|
|
|
- jedis.set(insertKey, deviceId);
|
|
|
- jedis.set(selectKey, StringUtil.convert2String(cMsg));
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getLocalizedMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void normalReply(Advice advice, Channel channel) {
|
|
|
- String facotry = advice.getFacotry();
|
|
|
- String adviceType = advice.getAdviceType();
|
|
|
- String deviceId = advice.getDeviceId();
|
|
|
- StringBuilder replyCommand = new StringBuilder();
|
|
|
- replyCommand.append("[");
|
|
|
- replyCommand.append("3g").append("*");
|
|
|
- replyCommand.append(deviceId).append("*");
|
|
|
- replyCommand.append("0002").append("*");
|
|
|
- replyCommand.append(adviceType);
|
|
|
- replyCommand.append("]");
|
|
|
- String replyCommandStr = replyCommand.toString();
|
|
|
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
|
|
|
- buffer.writeBytes(replyCommandStr.getBytes());
|
|
|
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
|
|
|
- channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void operationComplete(Future<? super Void> future) throws Exception {
|
|
|
- logger.info("Normal reply :" + replyCommandStr);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- private Advice getAdevice(Object msg) {
|
|
|
- Advice advice = new Advice();
|
|
|
- try {
|
|
|
- String message = String.valueOf(msg); // "【Receive from
|
|
|
- // 223.104.255.118
|
|
|
- // :61922】:[3G*3918197044*000D*LK,12642,0,93]";
|
|
|
- int startIndex = message.indexOf("[");
|
|
|
- int endIndex = message.indexOf("]");
|
|
|
- String data = message.substring(startIndex + 1, endIndex); // [3G*3918197044*000D*LK,12642,0,93]
|
|
|
- String[] bodys = data.split(",");
|
|
|
- String headers = bodys[0];
|
|
|
- String[] headersBodys = headers.split("\\*");
|
|
|
- advice.setFacotry(headersBodys[0]);
|
|
|
- advice.setDeviceId(headersBodys[1]);
|
|
|
- advice.setAdvicelength(headersBodys[2]);
|
|
|
- advice.setAdviceType(headersBodys[3]);
|
|
|
- logger.info("设备上传头信息:" + advice.toString());
|
|
|
- return advice;
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- /* 数据[手表具体数据] [对应位数] [数据内容]
|
|
|
- * 3G*3918197044*0107*UD [0] [厂商*设备 ID*内容长度*内容类型]
|
|
|
- 200917 [1] 日期
|
|
|
- 054140 [2] 时间
|
|
|
- V [3] gps定位是否有效[A]有效,[V]无效
|
|
|
- 31.171093 [4] 纬度
|
|
|
- N [5] 纬度表示
|
|
|
- 121.4215967 [6] 经度
|
|
|
- E [7] 经度表示
|
|
|
- 0.00 [8] 速度
|
|
|
- 0.0 [9] 方向
|
|
|
- 0.0 [10] 海拔
|
|
|
- 0 [11] 卫星个数
|
|
|
- 42,50 [12] gsm信号强度
|
|
|
- 14420 [13] 电量
|
|
|
- 0 [14] 计步数
|
|
|
- 00000010 [15] 终端状态
|
|
|
- 7,255 [16]
|
|
|
- 460 [17]
|
|
|
- 0 [18]
|
|
|
- ... ...
|
|
|
- */
|
|
|
- private Device getDevice(String msg) throws Exception {
|
|
|
- int startIndex = msg.indexOf("[");
|
|
|
- int endIndex = msg.indexOf("]");
|
|
|
- String data = msg.substring(startIndex + 1, endIndex);
|
|
|
- String[] bodys = data.split(",");
|
|
|
-
|
|
|
- Device device = new Device();
|
|
|
- String gpsState = bodys[3];
|
|
|
- logger.info("正在解析device,gpsState:" + gpsState);
|
|
|
- /*if (!"A".equals(gpsState)) {
|
|
|
- logger.info("gps定位为:" + gpsState + "无效");
|
|
|
- return null;
|
|
|
- }*/
|
|
|
- String date = bodys[1];
|
|
|
- String time = bodys[2];
|
|
|
- Date timestamp = new SimpleDateFormat("ddMMyyHHmmss").parse(date + time);
|
|
|
- device.setTimestamp(timestamp);
|
|
|
- device.setLat(getDouble(bodys[4]));
|
|
|
- device.setLng(getDouble(bodys[6]));
|
|
|
- device.setSpeed(getDouble(bodys[8]));
|
|
|
- device.setElectric(getDouble(bodys[13]));
|
|
|
- device.setStep(getInteger(bodys[14]));
|
|
|
- device.setItemState(bodys[16]);
|
|
|
- // getDouble()
|
|
|
- logger.info("设备上传具体监控项信息:" + device.toString());
|
|
|
- return device;
|
|
|
- }
|
|
|
-
|
|
|
- private void dataStorage(Device device) {
|
|
|
- String deviceStr = Device.buildDeviceStr(device);
|
|
|
- File path = new File(dataPath);
|
|
|
- File[] listFiles = path.listFiles();
|
|
|
- boolean isTouch = true;
|
|
|
- for (File sonFile : listFiles) {
|
|
|
- long len = sonFile.length();
|
|
|
- if (len < TEN_M) {
|
|
|
- // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
- // File file = new File(fileName);
|
|
|
- writeDevice2File(sonFile, deviceStr);
|
|
|
- logger.info("正在写入数据: " + deviceStr);
|
|
|
- isTouch = false;
|
|
|
- }
|
|
|
- }
|
|
|
- if (isTouch) {
|
|
|
- String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
- File file = new File(fileName);
|
|
|
- writeDevice2File(file, deviceStr);
|
|
|
- logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void writeDevice2File(File file, String deviceStr){
|
|
|
- Integer length = deviceStr.getBytes().length;
|
|
|
- try {
|
|
|
- OutputStream outputStream = new FileOutputStream(file, true);
|
|
|
- outputStream.write(int2bytes(length));;
|
|
|
- outputStream.write(deviceStr.getBytes());
|
|
|
- outputStream.flush();
|
|
|
- outputStream.close();
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
- cause.printStackTrace();
|
|
|
- ctx.close();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
|
|
- saveChannel(ctx);
|
|
|
- }
|
|
|
-
|
|
|
- private void saveChannel(ChannelHandlerContext ctx) {
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 【Receive from 223.104.255.118 :61922】:[3G*3918197044*000D*LK,12642,0,93]
|
|
|
- */
|
|
|
- public static void main(String[] args) {
|
|
|
- for (int i = 0; i < 100; i++) {
|
|
|
- Device device = new Device();
|
|
|
- device.setDeviceId("3918197044");
|
|
|
- device.setElectric(11.2d);
|
|
|
- device.setItemState("125");
|
|
|
- device.setLat(24.4441);
|
|
|
- device.setLng(114.223);
|
|
|
- device.setSpeed(21.2);
|
|
|
- device.setStep(12);
|
|
|
- device.setTerminalState(12);
|
|
|
- device.setTimestamp(new Date());
|
|
|
- new WatchServerHandler().dataStorage(device);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public static String byteBufferToString(ByteBuffer buffer) {
|
|
|
- CharBuffer charBuffer = null;
|
|
|
- try {
|
|
|
- Charset charset = Charset.forName("UTF-8");
|
|
|
- CharsetDecoder decoder = charset.newDecoder();
|
|
|
- charBuffer = decoder.decode(buffer);
|
|
|
- buffer.flip();
|
|
|
- return charBuffer.toString();
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private Integer getInteger(String str) {
|
|
|
- if (str == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return Integer.valueOf(str);
|
|
|
- }
|
|
|
-
|
|
|
- private Double getDouble(String str) {
|
|
|
- if (str == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return Double.valueOf(str);
|
|
|
- }
|
|
|
-
|
|
|
- static byte[] int2bytes(int num)
|
|
|
- {
|
|
|
- byte[] b=new byte[4];
|
|
|
- //int mask=0xff;
|
|
|
- for(int i=0;i<4;i++){
|
|
|
- b[3-i]=(byte)(num>>>(24-i*8));
|
|
|
- }
|
|
|
- return b;
|
|
|
- }
|
|
|
-}
|
|
|
+package com.tidecloud.dataacceptance.service.impl;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.nio.CharBuffer;
|
|
|
+import java.nio.charset.Charset;
|
|
|
+import java.nio.charset.CharsetDecoder;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
+import org.apache.commons.lang.ArrayUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import com.smartsanitation.common.util.StringUtil;
|
|
|
+import com.tidecloud.dataacceptance.entity.Advice;
|
|
|
+import com.tidecloud.dataacceptance.entity.ConnectMsg;
|
|
|
+import com.tidecloud.dataacceptance.entity.Device;
|
|
|
+import com.tidecloud.dataacceptance.entity.HrtStart;
|
|
|
+
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelHandler.Sharable;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
+import io.netty.util.concurrent.Future;
|
|
|
+import io.netty.util.concurrent.GenericFutureListener;
|
|
|
+import redis.clients.jedis.Jedis;
|
|
|
+import redis.clients.jedis.JedisPool;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Created by vinson on 2017/9/7.
|
|
|
+ */
|
|
|
+@Sharable
|
|
|
+@Component
|
|
|
+public class WatchServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
+
|
|
|
+ private String dataPath = "/home/service/collector_feidong/rawdata/";
|
|
|
+ // private String dataPath = "D:\\work\\rawdata1\\";
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(WatchServerHandler.class);
|
|
|
+ private static final Long TEN_M = 10485760l;
|
|
|
+ private static final String prefixName = "feidong";
|
|
|
+ private static String PREFIX_LINK = "s.";
|
|
|
+ private static String PREFIX_DEVICE = "d.";
|
|
|
+
|
|
|
+ public static Map<String, String> channelMap = new HashMap<String, String>();
|
|
|
+ public static Map<String, Channel> manageChannelMap = new HashMap<>();
|
|
|
+ public static Map<String, Channel> channelMapOfChannelKey = new HashMap<>();
|
|
|
+ public static Map<String, String> commandCopy = new HashMap<>();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private JedisPool jedisPool;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
+ ByteBuf byteBuf = (ByteBuf)msg;
|
|
|
+ String str = byteBufferToString(byteBuf.nioBuffer());
|
|
|
+ logger.info("上传数据:{}", str);
|
|
|
+ dataStorage(str);
|
|
|
+
|
|
|
+ //try {
|
|
|
+ // reply(ctx, str);
|
|
|
+ //} catch (Exception e) {
|
|
|
+ // logger.error(e.getMessage());
|
|
|
+ //}
|
|
|
+ }
|
|
|
+
|
|
|
+ private void reply(ChannelHandlerContext ctx, String msg) throws Exception {
|
|
|
+ logger.info("设备上传数据:" + msg);
|
|
|
+ Advice advice = getAdevice(msg);
|
|
|
+ String deviceId = advice.getDeviceId();
|
|
|
+ String adviceType = advice.getAdviceType();
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ Channel channelInMap = channelMapOfChannelKey.get(deviceId);
|
|
|
+ if (channelInMap == null) {
|
|
|
+ manageLink(channel, deviceId);
|
|
|
+ }
|
|
|
+ switch (adviceType) {
|
|
|
+
|
|
|
+ case "UD":
|
|
|
+ Device deviceUD = getDevice(msg);
|
|
|
+ deviceUD.setDeviceId(deviceId);
|
|
|
+ dataStorage(deviceUD);
|
|
|
+ logger.info("正常存储设备信息:" + deviceUD.toString());
|
|
|
+ break;
|
|
|
+ case "UD2":
|
|
|
+ Device deviceUD2 = getDevice(msg);
|
|
|
+ deviceUD2.setDeviceId(deviceId);
|
|
|
+ dataStorage(deviceUD2);
|
|
|
+ logger.info("正常存储设备信息:" + getDevice(msg).toString());
|
|
|
+ break;
|
|
|
+ case "LK":
|
|
|
+ normalReply(advice, channel);
|
|
|
+ break;
|
|
|
+ case "UPLOAD":
|
|
|
+ logger.info("device [{}] setting copy time success [{}]", deviceId, new Date());
|
|
|
+ break;
|
|
|
+ case "heart":
|
|
|
+ dataStorageHartstart(deviceId, msg);
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void dataStorageHartstart(String deviceId, String msg) {
|
|
|
+ int startIndex = msg.indexOf("[");
|
|
|
+ int endIndex = msg.indexOf("]");
|
|
|
+ String data = msg.substring(startIndex + 1, endIndex);
|
|
|
+ String[] bodys = data.split(",");
|
|
|
+
|
|
|
+ HrtStart hrtStart = new HrtStart();
|
|
|
+ hrtStart.setDate(new Date());
|
|
|
+ hrtStart.setDeviceId(deviceId);
|
|
|
+ hrtStart.setHrtCount(getInteger(bodys[1]));
|
|
|
+ String deviceStr = HrtStart.buildHrtStart(hrtStart);
|
|
|
+
|
|
|
+ File path = new File(dataPath);
|
|
|
+ File[] listFiles = path.listFiles();
|
|
|
+ boolean isTouch = true;
|
|
|
+ for (File sonFile : listFiles) {
|
|
|
+ long len = sonFile.length();
|
|
|
+ if (len < TEN_M) {
|
|
|
+ // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
+ // File file = new File(fileName);
|
|
|
+ writeDevice2File(sonFile, deviceStr);
|
|
|
+ logger.info("正在写入数据: " + deviceStr);
|
|
|
+ isTouch = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isTouch) {
|
|
|
+ String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
+ File file = new File(fileName);
|
|
|
+ writeDevice2File(file, deviceStr);
|
|
|
+ logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void manageLink(Channel channel, String deviceId) {
|
|
|
+ String channelId = UUID.randomUUID().toString();
|
|
|
+ manageChannelMap.put(channelId, channel);
|
|
|
+ channelMap.put(channelId, deviceId);
|
|
|
+ channelMapOfChannelKey.put(deviceId, channel);
|
|
|
+ logger.info("链接管理,链接id [{}]", channelId);
|
|
|
+
|
|
|
+ ConnectMsg cMsg = new ConnectMsg("10.27.118.76", channelId);
|
|
|
+ try (Jedis jedis = jedisPool.getResource()) {
|
|
|
+ jedis.select(15);
|
|
|
+ String insertKey = PREFIX_LINK + "10.27.118.76";
|
|
|
+ String selectKey = PREFIX_DEVICE + deviceId;
|
|
|
+
|
|
|
+ jedis.set(insertKey, deviceId);
|
|
|
+ jedis.set(selectKey, StringUtil.convert2String(cMsg));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getLocalizedMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void normalReply(Advice advice, Channel channel) {
|
|
|
+ String facotry = advice.getFacotry();
|
|
|
+ String adviceType = advice.getAdviceType();
|
|
|
+ String deviceId = advice.getDeviceId();
|
|
|
+ StringBuilder replyCommand = new StringBuilder();
|
|
|
+ replyCommand.append("[");
|
|
|
+ replyCommand.append("3g").append("*");
|
|
|
+ replyCommand.append(deviceId).append("*");
|
|
|
+ replyCommand.append("0002").append("*");
|
|
|
+ replyCommand.append(adviceType);
|
|
|
+ replyCommand.append("]");
|
|
|
+ String replyCommandStr = replyCommand.toString();
|
|
|
+ ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
|
|
|
+ buffer.writeBytes(replyCommandStr.getBytes());
|
|
|
+ ChannelFuture channelFuture = channel.writeAndFlush(buffer);
|
|
|
+ channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void operationComplete(Future<? super Void> future) throws Exception {
|
|
|
+ logger.info("Normal reply :" + replyCommandStr);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private Advice getAdevice(Object msg) {
|
|
|
+ Advice advice = new Advice();
|
|
|
+ try {
|
|
|
+ String message = String.valueOf(msg); // "【Receive from
|
|
|
+ // 223.104.255.118
|
|
|
+ // :61922】:[3G*3918197044*000D*LK,12642,0,93]";
|
|
|
+ int startIndex = message.indexOf("[");
|
|
|
+ int endIndex = message.indexOf("]");
|
|
|
+ String data = message.substring(startIndex + 1, endIndex); // [3G*3918197044*000D*LK,12642,0,93]
|
|
|
+ String[] bodys = data.split(",");
|
|
|
+ String headers = bodys[0];
|
|
|
+ String[] headersBodys = headers.split("\\*");
|
|
|
+ advice.setFacotry(headersBodys[0]);
|
|
|
+ advice.setDeviceId(headersBodys[1]);
|
|
|
+ advice.setAdvicelength(headersBodys[2]);
|
|
|
+ advice.setAdviceType(headersBodys[3]);
|
|
|
+ logger.info("设备上传头信息:" + advice.toString());
|
|
|
+ return advice;
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* 数据[手表具体数据] [对应位数] [数据内容]
|
|
|
+ * 3G*3918197044*0107*UD [0] [厂商*设备 ID*内容长度*内容类型]
|
|
|
+ 200917 [1] 日期
|
|
|
+ 054140 [2] 时间
|
|
|
+ V [3] gps定位是否有效[A]有效,[V]无效
|
|
|
+ 31.171093 [4] 纬度
|
|
|
+ N [5] 纬度表示
|
|
|
+ 121.4215967 [6] 经度
|
|
|
+ E [7] 经度表示
|
|
|
+ 0.00 [8] 速度
|
|
|
+ 0.0 [9] 方向
|
|
|
+ 0.0 [10] 海拔
|
|
|
+ 0 [11] 卫星个数
|
|
|
+ 42,50 [12] gsm信号强度
|
|
|
+ 14420 [13] 电量
|
|
|
+ 0 [14] 计步数
|
|
|
+ 00000010 [15] 终端状态
|
|
|
+ 7,255 [16]
|
|
|
+ 460 [17]
|
|
|
+ 0 [18]
|
|
|
+ ... ...
|
|
|
+ */
|
|
|
+ private Device getDevice(String msg) throws Exception {
|
|
|
+ int startIndex = msg.indexOf("[");
|
|
|
+ int endIndex = msg.indexOf("]");
|
|
|
+ String data = msg.substring(startIndex + 1, endIndex);
|
|
|
+ String[] bodys = data.split(",");
|
|
|
+
|
|
|
+ Device device = new Device();
|
|
|
+ String gpsState = bodys[3];
|
|
|
+ logger.info("正在解析device,gpsState:" + gpsState);
|
|
|
+ /*if (!"A".equals(gpsState)) {
|
|
|
+ logger.info("gps定位为:" + gpsState + "无效");
|
|
|
+ return null;
|
|
|
+ }*/
|
|
|
+ String date = bodys[1];
|
|
|
+ String time = bodys[2];
|
|
|
+ Date timestamp = new SimpleDateFormat("ddMMyyHHmmss").parse(date + time);
|
|
|
+ device.setTimestamp(timestamp);
|
|
|
+ device.setLat(getDouble(bodys[4]));
|
|
|
+ device.setLng(getDouble(bodys[6]));
|
|
|
+ device.setSpeed(getDouble(bodys[8]));
|
|
|
+ device.setElectric(getDouble(bodys[13]));
|
|
|
+ device.setStep(getInteger(bodys[14]));
|
|
|
+ device.setItemState(bodys[16]);
|
|
|
+ // getDouble()
|
|
|
+ logger.info("设备上传具体监控项信息:" + device.toString());
|
|
|
+ return device;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dataStorage(Device device) {
|
|
|
+ String deviceStr = Device.buildDeviceStr(device);
|
|
|
+ dataStorage(deviceStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dataStorage(String deviceStr) {
|
|
|
+ File path = new File(dataPath);
|
|
|
+ File[] listFiles = path.listFiles();
|
|
|
+ boolean isTouch = true;
|
|
|
+ if (listFiles != null) {
|
|
|
+ for (File sonFile : listFiles) {
|
|
|
+ long len = sonFile.length();
|
|
|
+ if (len < TEN_M) {
|
|
|
+ // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
+ // File file = new File(fileName);
|
|
|
+ writeDevice2File(sonFile, deviceStr);
|
|
|
+ logger.info("正在写入数据: " + deviceStr);
|
|
|
+ isTouch = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isTouch) {
|
|
|
+ String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
|
|
|
+ File file = new File(fileName);
|
|
|
+ writeDevice2File(file, deviceStr);
|
|
|
+ logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void writeDevice2File(File file, String deviceStr){
|
|
|
+ Integer length = deviceStr.getBytes().length;
|
|
|
+ try {
|
|
|
+ OutputStream outputStream = new FileOutputStream(file, true);
|
|
|
+ byte[] lengthBytes = int2bytes(length);
|
|
|
+ byte[] deviceBytes = deviceStr.getBytes();
|
|
|
+ byte[] dataBytes = ArrayUtils.addAll(lengthBytes, deviceBytes);
|
|
|
+ outputStream.write(dataBytes);
|
|
|
+ outputStream.flush();
|
|
|
+ outputStream.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+// public void writeDevice2File(File file, String deviceStr){
|
|
|
+// Integer length = deviceStr.getBytes().length;
|
|
|
+// try {
|
|
|
+// OutputStream outputStream = new FileOutputStream(file, true);
|
|
|
+// outputStream.write(int2bytes(length));;
|
|
|
+// outputStream.write(deviceStr.getBytes());
|
|
|
+// outputStream.flush();
|
|
|
+// outputStream.close();
|
|
|
+// } catch (IOException e) {
|
|
|
+// e.printStackTrace();
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ cause.printStackTrace();
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
|
|
+ saveChannel(ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveChannel(ChannelHandlerContext ctx) {
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 【Receive from 223.104.255.118 :61922】:[3G*3918197044*000D*LK,12642,0,93]
|
|
|
+ */
|
|
|
+ public static void main(String[] args) {
|
|
|
+ for (int i = 0; i < 100; i++) {
|
|
|
+ Device device = new Device();
|
|
|
+ device.setDeviceId("3918197044");
|
|
|
+ device.setElectric(11.2d);
|
|
|
+ device.setItemState("125");
|
|
|
+ device.setLat(24.4441);
|
|
|
+ device.setLng(114.223);
|
|
|
+ device.setSpeed(21.2);
|
|
|
+ device.setStep(12);
|
|
|
+ device.setTerminalState(12);
|
|
|
+ device.setTimestamp(new Date());
|
|
|
+ new WatchServerHandler().dataStorage(device);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String byteBufferToString(ByteBuffer buffer) {
|
|
|
+ CharBuffer charBuffer = null;
|
|
|
+ try {
|
|
|
+ Charset charset = Charset.forName("UTF-8");
|
|
|
+ CharsetDecoder decoder = charset.newDecoder();
|
|
|
+ charBuffer = decoder.decode(buffer);
|
|
|
+ buffer.flip();
|
|
|
+ return charBuffer.toString();
|
|
|
+ } catch (Exception ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Integer getInteger(String str) {
|
|
|
+ if (str == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return Integer.valueOf(str);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Double getDouble(String str) {
|
|
|
+ if (str == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return Double.valueOf(str);
|
|
|
+ }
|
|
|
+
|
|
|
+ static byte[] int2bytes(int num)
|
|
|
+ {
|
|
|
+ byte[] b=new byte[4];
|
|
|
+ //int mask=0xff;
|
|
|
+ for(int i=0;i<4;i++){
|
|
|
+ b[3-i]=(byte)(num>>>(24-i*8));
|
|
|
+ }
|
|
|
+ return b;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|