123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- package com.tidecloud.datacceptance.service.impl;
- import java.io.DataOutputStream;
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- import java.nio.charset.Charset;
- import java.nio.charset.CharsetDecoder;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import com.tidecloud.dataacceptance.entity.Advice;
- import com.tidecloud.dataacceptance.entity.Device;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- /**
- * Created by vinson on 2017/9/7.
- */
- @Component
- public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
-
- private String dataPath = "/home/service/collector_watch/rawdata/";
- // private String dataPath = "D:\\work\\rawdata1\\";
- private static final Logger logger = LoggerFactory.getLogger(DiscardServerHandler.class);
- private static final Long TEN_M = 10485760l;
- private static final String prefixName = "watch";
- private Map<String, Channel> channelMap = new HashMap<String, Channel>();
- private List<Channel> channelList = new ArrayList<Channel>();
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf)msg;
- String str = byteBufferToString(byteBuf.nioBuffer());
- try {
- reply(ctx, str);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- private void reply(ChannelHandlerContext ctx, String msg) throws Exception {
- logger.info("设备上传数据:" + msg);
- Advice advice = getAdevice(msg);
- String deviceId = advice.getDeviceId();
- String adviceType = advice.getAdviceType();
- Channel channel = ctx.channel();
- Channel channelInMap = channelMap.get(deviceId);
- if (channelInMap == null) {
- channelMap.put(deviceId, channel);
- }
- switch (adviceType) {
-
- case "UD":
- Device deviceUD = getDevice(msg);
- deviceUD.setDeviceId(deviceId);
- dataStorage(deviceUD);
- logger.info("正常存储设备信息:" + deviceUD.toString());
- break;
- case "UD2":
- Device deviceUD2 = getDevice(msg);
- deviceUD2.setDeviceId(deviceId);
- dataStorage(deviceUD2);
- logger.info("正常存储设备信息:" + getDevice(msg).toString());
- break;
- case "LK":
- normalReply(advice);
- break;
- default:
- break;
- }
- }
- private void normalReply(Advice advice) {
- String facotry = advice.getFacotry();
- String adviceType = advice.getAdviceType();
- String deviceId = advice.getDeviceId();
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("[");
- replyCommand.append(facotry).append("*");
- replyCommand.append(deviceId).append("*");
- replyCommand.append("0002").append("*");
- replyCommand.append(adviceType);
- replyCommand.append("]");
- String replyCommandStr = replyCommand.toString();
- logger.info("Normal reply :" + replyCommandStr);
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- Channel channel = channelMap.get(deviceId);
- channel.write(buffer);
- }
- private Advice getAdevice(Object msg) {
- Advice advice = new Advice();
- try {
- String message = String.valueOf(msg); // "【Receive from
- // 223.104.255.118
- // :61922】:[3G*3918197044*000D*LK,12642,0,93]";
- int startIndex = message.indexOf("[");
- int endIndex = message.indexOf("]");
- String data = message.substring(startIndex + 1, endIndex); // [3G*3918197044*000D*LK,12642,0,93]
- String[] bodys = data.split(",");
- String headers = bodys[0];
- String[] headersBodys = headers.split("\\*");
- advice.setFacotry(headersBodys[0]);
- advice.setDeviceId(headersBodys[1]);
- advice.setAdvicelength(headersBodys[2]);
- advice.setAdviceType(headersBodys[3]);
- logger.info("设备上传头信息:" + advice.toString());
- return advice;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /* 数据[手表具体数据] [对应位数] [数据内容]
- * 3G*3918197044*0107*UD [0] [厂商*设备 ID*内容长度*内容类型]
- 200917 [1] 日期
- 054140 [2] 时间
- V [3] gps定位是否有效[A]有效,[V]无效
- 31.171093 [4] 纬度
- N [5] 纬度表示
- 121.4215967 [6] 经度
- E [7] 经度表示
- 0.00 [8] 速度
- 0.0 [9] 方向
- 0.0 [10] 海拔
- 0 [11] 卫星个数
- 42,50 [12] gsm信号强度
- 14420 [13] 电量
- 0 [14] 计步数
- 00000010 [15] 终端状态
- 7,255 [16]
- 460 [17]
- 0 [18]
- ... ...
- */
- private Device getDevice(String msg) throws Exception {
- int startIndex = msg.indexOf("[");
- int endIndex = msg.indexOf("]");
- String data = msg.substring(startIndex + 1, endIndex);
- String[] bodys = data.split(",");
- Device device = new Device();
- String gpsState = bodys[3];
- logger.info("正在解析device,gpsState:" + gpsState);
- /*if (!"A".equals(gpsState)) {
- logger.info("gps定位为:" + gpsState + "无效");
- return null;
- }*/
- String date = bodys[1];
- String time = bodys[2];
- Date timestamp = new SimpleDateFormat("ddMMyyHHmmss").parse(date + time);
- device.setTimestamp(timestamp);
- device.setLat(getDouble(bodys[4]));
- device.setLng(getDouble(bodys[6]));
- device.setSpeed(getDouble(bodys[8]));
- device.setElectric(getDouble(bodys[13]));
- device.setStep(getInteger(bodys[14]));
- device.setItemState(bodys[16]);
- // getDouble()
- logger.info("设备上传具体监控项信息:" + device.toString());
- return device;
- }
-
- private void dataStorage(Device device) {
- String deviceStr = Device.buildDeviceStr(device);
- File path = new File(dataPath);
- File[] listFiles = path.listFiles();
- boolean isTouch = true;
- for (File sonFile : listFiles) {
- long len = sonFile.length();
- if (len < TEN_M) {
- // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
- // File file = new File(fileName);
- writeDevice2File(sonFile, deviceStr);
- logger.info("正在写入数据: " + deviceStr);
- isTouch = false;
- }
- }
- if (isTouch) {
- String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
- File file = new File(fileName);
- writeDevice2File(file, deviceStr);
- logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
- }
- }
-
- public void writeDevice2File(File file, String deviceStr){
- Integer length = deviceStr.getBytes().length;
- try {
- OutputStream outputStream = new FileOutputStream(file, true);
- outputStream.write(int2bytes(length));;
- outputStream.write(deviceStr.getBytes());
- outputStream.flush();
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- @Override
- public void channelActive(final ChannelHandlerContext ctx) throws Exception {
- saveChannel(ctx);
- }
- private void saveChannel(ChannelHandlerContext ctx) {
- // 注册
- // 认证
- // 保存channel
- Channel channel = ctx.channel();
- if (!channelList.contains(channel)) {
- channelList.add(channel);
- }
- }
- /**
- * 【Receive from 223.104.255.118 :61922】:[3G*3918197044*000D*LK,12642,0,93]
- */
- public static void main(String[] args) {
- for (int i = 0; i < 100; i++) {
- Device device = new Device();
- device.setDeviceId("3918197044");
- device.setElectric(11.2d);
- device.setItemState("125");
- device.setLat(24.4441);
- device.setLng(114.223);
- device.setSpeed(21.2);
- device.setStep(12);
- device.setTerminalState(12);
- device.setTimestamp(new Date());
- new DiscardServerHandler().dataStorage(device);
- }
- }
- public static String byteBufferToString(ByteBuffer buffer) {
- CharBuffer charBuffer = null;
- try {
- Charset charset = Charset.forName("UTF-8");
- CharsetDecoder decoder = charset.newDecoder();
- charBuffer = decoder.decode(buffer);
- buffer.flip();
- return charBuffer.toString();
- } catch (Exception ex) {
- ex.printStackTrace();
- return null;
- }
- }
- private Integer getInteger(String str) {
- if (str == null) {
- return null;
- }
- return Integer.valueOf(str);
- }
- private Double getDouble(String str) {
- if (str == null) {
- return null;
- }
- return Double.valueOf(str);
- }
-
- static byte[] int2bytes(int num)
- {
- byte[] b=new byte[4];
- //int mask=0xff;
- for(int i=0;i<4;i++){
- b[3-i]=(byte)(num>>>(24-i*8));
- }
- return b;
- }
- }
|