|
@@ -0,0 +1,298 @@
|
|
|
+package com.tidecloud.dataacceptance.service.impl;
|
|
|
+import java.io.File;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
+import javax.xml.bind.DatatypeConverter;
|
|
|
+
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import com.smartsanitation.common.util.StringUtil;
|
|
|
+import com.tidecloud.dataacceptance.common.NumUtil;
|
|
|
+import com.tidecloud.dataacceptance.entity.ConnectMsg;
|
|
|
+import com.tidecloud.dataacceptance.entity.YiTongGPSDevice;
|
|
|
+import com.tidecloud.dataacceptance.entity.YuGuangGPSDevice;
|
|
|
+import com.tidecloud.dataacceptance.service.AcceptanceInboundHandlerAdapter;
|
|
|
+
|
|
|
+import io.netty.bootstrap.ServerBootstrap;
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.channel.ChannelInitializer;
|
|
|
+import io.netty.channel.ChannelOption;
|
|
|
+import io.netty.channel.EventLoopGroup;
|
|
|
+import io.netty.channel.nio.NioEventLoopGroup;
|
|
|
+import io.netty.channel.socket.SocketChannel;
|
|
|
+import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
|
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
|
|
+import redis.clients.jedis.Jedis;
|
|
|
+import redis.clients.jedis.JedisPool;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author cdk
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@ChannelHandler.Sharable
|
|
|
+public class YuGuangGpsServerHandler extends AcceptanceInboundHandlerAdapter {
|
|
|
+
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(YuGuangGpsServerHandler.class);
|
|
|
+ public static String PREFIX_LINK = "s.";
|
|
|
+ public static String PREFIX_LINK_BACK = "s.b.";
|
|
|
+ public static String PREFIX_DEVICE = "d.";
|
|
|
+ private static Boolean ISINIT = true;
|
|
|
+
|
|
|
+ private static final Integer START_BITS = 2;
|
|
|
+ private static final Byte START_BIT = (byte)0xA3;
|
|
|
+
|
|
|
+ private static final Integer TEN_M = 10485760;
|
|
|
+
|
|
|
+ private static final byte LOGIN_MSG = 0x01;
|
|
|
+ private static final byte LOCATION_MSG = (byte)0xA3;
|
|
|
+ private static final byte OIL_MSG = (byte)0x18;
|
|
|
+ private static final byte STATUS_MSG = (byte)0xb1;
|
|
|
+
|
|
|
+
|
|
|
+ private static final String DATA_PATH = "/home/service/collector_7510/rawdata/";
|
|
|
+ private static final String PREFIX_NAME = "yuguang";
|
|
|
+ public static final Integer REDIS_INDEX_LINK = 15;
|
|
|
+ private static File WRITE_FILE = null;
|
|
|
+
|
|
|
+
|
|
|
+ public static Map<String, Channel> socketyChannelMap = new HashMap<>();
|
|
|
+ public static Map<Channel, String> channelDeviceMap = new HashMap<>();
|
|
|
+ public static Map<String, String> commandCopy = new HashMap<>();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private JedisPool jedisPool;
|
|
|
+
|
|
|
+ @Value("${server.localaddress}")
|
|
|
+ private String address;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
+ ByteBuf in = (ByteBuf) msg;
|
|
|
+ ByteBuf copy = in.copy();
|
|
|
+ byte[] bytes = new byte[copy.readableBytes()];
|
|
|
+ copy.readBytes(bytes);
|
|
|
+ String printHexBinary = DatatypeConverter.printHexBinary(bytes);
|
|
|
+ logger.info("传入数据为:" + printHexBinary);
|
|
|
+ try {
|
|
|
+ byte type = in.readByte();
|
|
|
+ short length = in.readShort();
|
|
|
+ handle(in, length, ctx.channel(), type);
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ ctx.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handle(ByteBuf in, int length, Channel channel, Byte type) throws Exception{
|
|
|
+ if (in.isReadable()) {
|
|
|
+ in = in.readBytes(length);
|
|
|
+ if (LOCATION_MSG == type) {
|
|
|
+ resolveLoginMSG(in, channel);
|
|
|
+ } else if (STATUS_MSG == type) {
|
|
|
+ reply(channel);
|
|
|
+ } else {
|
|
|
+ logger.info("client send data without handle type ...");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void resolveLoginMSG(ByteBuf in, Channel channel) {
|
|
|
+ byte[] deviceIdBytes = new byte[4];
|
|
|
+ in.readBytes(deviceIdBytes);
|
|
|
+
|
|
|
+ byte fistDeviceIdByte = deviceIdBytes[0];
|
|
|
+ byte secondDeviceIdByte = (byte) (deviceIdBytes[1] - 0x80);
|
|
|
+ byte thirdDeviceIdByte = (byte) (deviceIdBytes[2] - 0x80);
|
|
|
+ byte lastByte = deviceIdBytes[3];
|
|
|
+ StringBuilder stringBuilder = new StringBuilder();
|
|
|
+ stringBuilder.append(fistDeviceIdByte).append(secondDeviceIdByte)
|
|
|
+ .append(thirdDeviceIdByte).append(lastByte);
|
|
|
+
|
|
|
+ String deviceId = stringBuilder.toString();
|
|
|
+ logger.info("deviceId: [{}]", deviceId);
|
|
|
+
|
|
|
+ byte[] dateBytes = new byte[6];
|
|
|
+ in.readBytes(dateBytes);
|
|
|
+ String date = DatatypeConverter.printHexBinary(dateBytes);
|
|
|
+ logger.info("date: [{}]", date);
|
|
|
+
|
|
|
+ byte[] latBytes = new byte[4];
|
|
|
+ in.readBytes(latBytes);
|
|
|
+ String lat = DatatypeConverter.printHexBinary(latBytes);
|
|
|
+ logger.info("lat: [{}]", lat);
|
|
|
+
|
|
|
+ byte[] lngBytes = new byte[4];
|
|
|
+ in.readBytes(lngBytes);
|
|
|
+ String lng = DatatypeConverter.printHexBinary(lngBytes);
|
|
|
+ logger.info("lng: [{}]", lng);
|
|
|
+
|
|
|
+ byte[] speedBytes = new byte[2];
|
|
|
+ in.readBytes(speedBytes);
|
|
|
+ String speed = DatatypeConverter.printHexBinary(speedBytes);
|
|
|
+ logger.info("speed: [{}]", speed);
|
|
|
+
|
|
|
+ byte[] directionBytes = new byte[2];
|
|
|
+ in.readBytes(directionBytes);
|
|
|
+ String direction = DatatypeConverter.printHexBinary(directionBytes);
|
|
|
+ logger.info("direction: [{}]", direction);
|
|
|
+
|
|
|
+ byte state = in.readByte();
|
|
|
+ logger.info("state: [{}]", state);
|
|
|
+
|
|
|
+ byte[] mileageBytes = new byte[3];
|
|
|
+ in.readBytes(mileageBytes);
|
|
|
+ int mileage = NumUtil.threeBytesToInteger(mileageBytes);
|
|
|
+ logger.info("mileage: [{}]", mileage);
|
|
|
+
|
|
|
+ byte[] Bytes = new byte[12];
|
|
|
+ in.readBytes(Bytes);
|
|
|
+
|
|
|
+
|
|
|
+ YuGuangGPSDevice yiTongGPSDevice = new YuGuangGPSDevice();
|
|
|
+ if (in.isReadable(2)) {
|
|
|
+ int oilLength = in.readShort();
|
|
|
+ ByteBuf oilByteBuf = in.readBytes(oilLength);
|
|
|
+ byte sonType = oilByteBuf.readByte();
|
|
|
+
|
|
|
+ if (OIL_MSG == sonType) {
|
|
|
+ byte length = oilByteBuf.readByte();
|
|
|
+ int oilAD1 = oilByteBuf.readShort();
|
|
|
+ int oilAD2 = oilByteBuf.readShort();
|
|
|
+ int oilAD3 = oilByteBuf.readShort();
|
|
|
+ int oilAD4 = oilByteBuf.readShort();
|
|
|
+ yiTongGPSDevice.setOil(oilAD4);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ yiTongGPSDevice.setDate(date);
|
|
|
+ yiTongGPSDevice.setDeviceId(deviceId);
|
|
|
+ yiTongGPSDevice.setLat(lat);
|
|
|
+ yiTongGPSDevice.setLng(lng);
|
|
|
+ yiTongGPSDevice.setMileage(String.valueOf(mileage));
|
|
|
+ yiTongGPSDevice.setSpeed(speed);
|
|
|
+ dataStorage(YuGuangGPSDevice.buildDeviceStr(yiTongGPSDevice));
|
|
|
+
|
|
|
+ // 回复和链接管理
|
|
|
+ String deviceIdInMap = channelDeviceMap.get(channel);
|
|
|
+ if (!deviceId.equals(deviceIdInMap)) {
|
|
|
+ manageChannel(channel, deviceId);
|
|
|
+ }
|
|
|
+ reply(channel);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void manageChannel(Channel channel, String deviceId) {
|
|
|
+ String socketkey = UUID.randomUUID().toString();
|
|
|
+ socketyChannelMap.put(socketkey, channel);
|
|
|
+ channelDeviceMap.put(channel, deviceId);
|
|
|
+ String addressStr = ConnectMsg.ipToLong(address);
|
|
|
+ ConnectMsg cMsg = new ConnectMsg(address, socketkey);
|
|
|
+ Jedis jedis = jedisPool.getResource();
|
|
|
+ try {
|
|
|
+ jedis.select(REDIS_INDEX_LINK);
|
|
|
+ String insertKey = PREFIX_LINK + addressStr;
|
|
|
+ String selectKey = PREFIX_DEVICE + deviceId;
|
|
|
+ String insertBackupKey = PREFIX_LINK_BACK + addressStr;
|
|
|
+ jedis.sadd(insertKey, socketkey);
|
|
|
+ jedis.sadd(insertBackupKey, deviceId);
|
|
|
+ jedis.set(selectKey, StringUtil.convert2String(cMsg));
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ } finally {
|
|
|
+ jedis.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void reply(Channel channel) {
|
|
|
+ logger.info("server reply to client ...");
|
|
|
+ ByteBuf buffer = Unpooled.buffer();
|
|
|
+ byte[] bytes = new byte[]{0x29, 0x29, 0x21, 0x00, 0x05, (byte)0x82, (byte)0xb1, 0x0C, 0x1b, 0x0d};
|
|
|
+ buffer.writeBytes(bytes);
|
|
|
+ channel.write(buffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ cause.printStackTrace();
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ if (!channel.isActive()) {
|
|
|
+ String deviceId = channelDeviceMap.get(channel);
|
|
|
+ if (deviceId != null) {
|
|
|
+ channelDeviceMap.remove(channel);
|
|
|
+ deleteLinkFromRedis(deviceId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ super.channelInactive(ctx);
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void deleteLinkFromRedis(String deviceId) {
|
|
|
+ String deleteKey = PREFIX_DEVICE + deviceId;
|
|
|
+ try(Jedis jedis = jedisPool.getResource()) {
|
|
|
+ jedis.select(REDIS_INDEX_LINK);
|
|
|
+ String connectMsg = jedis.get(deleteKey);
|
|
|
+ if (connectMsg != null) {
|
|
|
+ ConnectMsg cmsg = StringUtil.convert2Object(connectMsg, ConnectMsg.class);
|
|
|
+ String socketId = cmsg.getSocketId();
|
|
|
+ socketyChannelMap.remove(socketId);
|
|
|
+ String socketQueryKey = PREFIX_LINK + address;
|
|
|
+ jedis.srem(socketQueryKey, socketId);
|
|
|
+ jedis.del(deleteKey);
|
|
|
+ logger.info("delete link [{}] from redis and memory deviceId is [{}]", socketId, deviceId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getLocalizedMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ public void startAcceptor() {
|
|
|
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
|
|
|
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
|
|
|
+ byte[] splitBytes = new byte[]{0x29, 0x29};
|
|
|
+ try {
|
|
|
+ ServerBootstrap b = new ServerBootstrap();
|
|
|
+ b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
|
|
|
+ .childHandler(new ChannelInitializer<SocketChannel>() {
|
|
|
+ @Override
|
|
|
+ protected void initChannel(SocketChannel ch) throws Exception {
|
|
|
+ ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, Unpooled.copiedBuffer(splitBytes)));
|
|
|
+ ch.pipeline().addLast(this);
|
|
|
+ }
|
|
|
+ }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
|
|
|
+ ChannelFuture f = b.bind(port).sync();
|
|
|
+ f.channel().closeFuture().sync();
|
|
|
+
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ } finally {
|
|
|
+ cleanRedisLinkData();
|
|
|
+ workerGroup.shutdownGracefully();
|
|
|
+ bossGroup.shutdownGracefully();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void reply(ChannelHandlerContext ctx, String msg) throws Exception {
|
|
|
+ // TODO Auto-generated method stub
|
|
|
+
|
|
|
+ }
|
|
|
+}
|