DiscardServerHandler.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package com.tidecloud.datacceptance.service.impl;
  2. import java.io.DataOutputStream;
  3. import java.io.File;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.io.OutputStream;
  7. import java.nio.ByteBuffer;
  8. import java.nio.CharBuffer;
  9. import java.nio.charset.Charset;
  10. import java.nio.charset.CharsetDecoder;
  11. import java.text.SimpleDateFormat;
  12. import java.util.ArrayList;
  13. import java.util.Date;
  14. import java.util.HashMap;
  15. import java.util.List;
  16. import java.util.Map;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.annotation.Value;
  20. import org.springframework.stereotype.Component;
  21. import com.tidecloud.dataacceptance.entity.Advice;
  22. import com.tidecloud.dataacceptance.entity.Device;
  23. import io.netty.buffer.ByteBuf;
  24. import io.netty.buffer.Unpooled;
  25. import io.netty.channel.Channel;
  26. import io.netty.channel.ChannelHandlerContext;
  27. import io.netty.channel.ChannelInboundHandlerAdapter;
  28. /**
  29. * Created by vinson on 2017/9/7.
  30. */
  31. @Component
  32. public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
  33. private String dataPath = "/home/service/collector_watch/rawdata/";
  34. // private String dataPath = "D:\\work\\rawdata1\\";
  35. private static final Logger logger = LoggerFactory.getLogger(DiscardServerHandler.class);
  36. private static final Long TEN_M = 10485760l;
  37. private static final String prefixName = "watch";
  38. private Map<String, Channel> channelMap = new HashMap<String, Channel>();
  39. private List<Channel> channelList = new ArrayList<Channel>();
  40. @Override
  41. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  42. ByteBuf byteBuf = (ByteBuf)msg;
  43. String str = byteBufferToString(byteBuf.nioBuffer());
  44. try {
  45. reply(ctx, str);
  46. } catch (Exception e) {
  47. logger.error(e.getMessage());
  48. }
  49. }
  50. private void reply(ChannelHandlerContext ctx, String msg) throws Exception {
  51. logger.info("设备上传数据:" + msg);
  52. Advice advice = getAdevice(msg);
  53. String deviceId = advice.getDeviceId();
  54. String adviceType = advice.getAdviceType();
  55. Channel channel = ctx.channel();
  56. Channel channelInMap = channelMap.get(deviceId);
  57. if (channelInMap == null) {
  58. channelMap.put(deviceId, channel);
  59. }
  60. switch (adviceType) {
  61. case "UD":
  62. Device deviceUD = getDevice(msg);
  63. deviceUD.setDeviceId(deviceId);
  64. dataStorage(deviceUD);
  65. logger.info("正常存储设备信息:" + deviceUD.toString());
  66. break;
  67. case "UD2":
  68. Device deviceUD2 = getDevice(msg);
  69. deviceUD2.setDeviceId(deviceId);
  70. dataStorage(deviceUD2);
  71. logger.info("正常存储设备信息:" + getDevice(msg).toString());
  72. break;
  73. case "LK":
  74. normalReply(advice);
  75. break;
  76. default:
  77. break;
  78. }
  79. }
  80. private void normalReply(Advice advice) {
  81. String facotry = advice.getFacotry();
  82. String adviceType = advice.getAdviceType();
  83. String deviceId = advice.getDeviceId();
  84. StringBuilder replyCommand = new StringBuilder();
  85. replyCommand.append("[");
  86. replyCommand.append(facotry).append("*");
  87. replyCommand.append(deviceId).append("*");
  88. replyCommand.append("0002").append("*");
  89. replyCommand.append(adviceType);
  90. replyCommand.append("]");
  91. String replyCommandStr = replyCommand.toString();
  92. logger.info("Normal reply :" + replyCommandStr);
  93. ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
  94. buffer.writeBytes(replyCommandStr.getBytes());
  95. Channel channel = channelMap.get(deviceId);
  96. channel.write(buffer);
  97. }
  98. private Advice getAdevice(Object msg) {
  99. Advice advice = new Advice();
  100. try {
  101. String message = String.valueOf(msg); // "【Receive from
  102. // 223.104.255.118
  103. // :61922】:[3G*3918197044*000D*LK,12642,0,93]";
  104. int startIndex = message.indexOf("[");
  105. int endIndex = message.indexOf("]");
  106. String data = message.substring(startIndex + 1, endIndex); // [3G*3918197044*000D*LK,12642,0,93]
  107. String[] bodys = data.split(",");
  108. String headers = bodys[0];
  109. String[] headersBodys = headers.split("\\*");
  110. advice.setFacotry(headersBodys[0]);
  111. advice.setDeviceId(headersBodys[1]);
  112. advice.setAdvicelength(headersBodys[2]);
  113. advice.setAdviceType(headersBodys[3]);
  114. logger.info("设备上传头信息:" + advice.toString());
  115. return advice;
  116. } catch (Exception e) {
  117. e.printStackTrace();
  118. }
  119. return null;
  120. }
  121. /* 数据[手表具体数据] [对应位数] [数据内容]
  122. * 3G*3918197044*0107*UD [0] [厂商*设备 ID*内容长度*内容类型]
  123. 200917 [1] 日期
  124. 054140 [2] 时间
  125. V [3] gps定位是否有效[A]有效,[V]无效
  126. 31.171093 [4] 纬度
  127. N [5] 纬度表示
  128. 121.4215967 [6] 经度
  129. E [7] 经度表示
  130. 0.00 [8] 速度
  131. 0.0 [9] 方向
  132. 0.0 [10] 海拔
  133. 0 [11] 卫星个数
  134. 42,50 [12] gsm信号强度
  135. 14420 [13] 电量
  136. 0 [14] 计步数
  137. 00000010 [15] 终端状态
  138. 7,255 [16]
  139. 460 [17]
  140. 0 [18]
  141. ... ...
  142. */
  143. private Device getDevice(String msg) throws Exception {
  144. int startIndex = msg.indexOf("[");
  145. int endIndex = msg.indexOf("]");
  146. String data = msg.substring(startIndex + 1, endIndex);
  147. String[] bodys = data.split(",");
  148. Device device = new Device();
  149. String gpsState = bodys[3];
  150. logger.info("正在解析device,gpsState:" + gpsState);
  151. /*if (!"A".equals(gpsState)) {
  152. logger.info("gps定位为:" + gpsState + "无效");
  153. return null;
  154. }*/
  155. String date = bodys[1];
  156. String time = bodys[2];
  157. Date timestamp = new SimpleDateFormat("ddMMyyHHmmss").parse(date + time);
  158. device.setTimestamp(timestamp);
  159. device.setLat(getDouble(bodys[4]));
  160. device.setLng(getDouble(bodys[6]));
  161. device.setSpeed(getDouble(bodys[8]));
  162. device.setElectric(getDouble(bodys[13]));
  163. device.setStep(getInteger(bodys[14]));
  164. device.setItemState(bodys[16]);
  165. // getDouble()
  166. logger.info("设备上传具体监控项信息:" + device.toString());
  167. return device;
  168. }
  169. private void dataStorage(Device device) {
  170. String deviceStr = Device.buildDeviceStr(device);
  171. File path = new File(dataPath);
  172. File[] listFiles = path.listFiles();
  173. boolean isTouch = true;
  174. for (File sonFile : listFiles) {
  175. long len = sonFile.length();
  176. if (len < TEN_M) {
  177. // String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
  178. // File file = new File(fileName);
  179. writeDevice2File(sonFile, deviceStr);
  180. logger.info("正在写入数据: " + deviceStr);
  181. isTouch = false;
  182. }
  183. }
  184. if (isTouch) {
  185. String fileName = dataPath + prefixName + new SimpleDateFormat("yyMMddHHmmss").format(new Date());
  186. File file = new File(fileName);
  187. writeDevice2File(file, deviceStr);
  188. logger.info("满10M,创建新的文件 正在写入数据:" + deviceStr + "timestamp:" + new SimpleDateFormat("yyMMddHHmmss").format(new Date()));
  189. }
  190. }
  191. public void writeDevice2File(File file, String deviceStr){
  192. Integer length = deviceStr.getBytes().length;
  193. try {
  194. OutputStream outputStream = new FileOutputStream(file, true);
  195. outputStream.write(int2bytes(length));;
  196. outputStream.write(deviceStr.getBytes());
  197. outputStream.flush();
  198. outputStream.close();
  199. } catch (IOException e) {
  200. e.printStackTrace();
  201. }
  202. }
  203. @Override
  204. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  205. cause.printStackTrace();
  206. ctx.close();
  207. }
  208. @Override
  209. public void channelActive(final ChannelHandlerContext ctx) throws Exception {
  210. saveChannel(ctx);
  211. }
  212. private void saveChannel(ChannelHandlerContext ctx) {
  213. // 注册
  214. // 认证
  215. // 保存channel
  216. Channel channel = ctx.channel();
  217. if (!channelList.contains(channel)) {
  218. channelList.add(channel);
  219. }
  220. }
  221. /**
  222. * 【Receive from 223.104.255.118 :61922】:[3G*3918197044*000D*LK,12642,0,93]
  223. */
  224. public static void main(String[] args) {
  225. for (int i = 0; i < 100; i++) {
  226. Device device = new Device();
  227. device.setDeviceId("3918197044");
  228. device.setElectric(11.2d);
  229. device.setItemState("125");
  230. device.setLat(24.4441);
  231. device.setLng(114.223);
  232. device.setSpeed(21.2);
  233. device.setStep(12);
  234. device.setTerminalState(12);
  235. device.setTimestamp(new Date());
  236. new DiscardServerHandler().dataStorage(device);
  237. }
  238. }
  239. public static String byteBufferToString(ByteBuffer buffer) {
  240. CharBuffer charBuffer = null;
  241. try {
  242. Charset charset = Charset.forName("UTF-8");
  243. CharsetDecoder decoder = charset.newDecoder();
  244. charBuffer = decoder.decode(buffer);
  245. buffer.flip();
  246. return charBuffer.toString();
  247. } catch (Exception ex) {
  248. ex.printStackTrace();
  249. return null;
  250. }
  251. }
  252. private Integer getInteger(String str) {
  253. if (str == null) {
  254. return null;
  255. }
  256. return Integer.valueOf(str);
  257. }
  258. private Double getDouble(String str) {
  259. if (str == null) {
  260. return null;
  261. }
  262. return Double.valueOf(str);
  263. }
  264. static byte[] int2bytes(int num)
  265. {
  266. byte[] b=new byte[4];
  267. //int mask=0xff;
  268. for(int i=0;i<4;i++){
  269. b[3-i]=(byte)(num>>>(24-i*8));
  270. }
  271. return b;
  272. }
  273. }