rainbow954 7 anni fa
parent
commit
d507bcfb2c

+ 171 - 0
src/main/java/com/tidecloud/dataacceptance/service/impl/PinShenData86ServerHandler.java

@@ -0,0 +1,171 @@
+package com.tidecloud.dataacceptance.service.impl;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import com.tidecloud.dataacceptance.common.BitOperator;
+import com.tidecloud.dataacceptance.common.CRCUtil;
+import com.tidecloud.dataacceptance.common.PinShenWaterUtils;
+import com.tidecloud.dataacceptance.entity.PinShenWaterDevice;
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * @author cdk
+ */
+@Component
+@ChannelHandler.Sharable
+@Scope("prototype")
+public class PinShenData86ServerHandler extends HexBinaryAcceptanceHandlerAdapter {
+
+	private static final Logger logger = LoggerFactory.getLogger(PinShenData86ServerHandler.class);
+
+
+	@Override
+	protected void handle(ByteBuf in, Channel channel) throws Exception {
+		PinShenWaterDevice device = PinShenWaterUtils.handle(in);
+		if (device != null)
+			this.handle(device, channel, in);
+	}
+
+	private void handle(PinShenWaterDevice device, Channel channel, ByteBuf in) throws Exception {
+
+		byte typeByte = device.getTypeByte();
+		byte[] packageArr = device.getPackageArr();
+		// GPRS方式数据帧
+		if (device.getTypeByte() == PinShenWaterUtils.data_gps_type) {
+			String deviceId = device.getDeviceId();
+			sendMsg2Kafka(getOriginalData(in), deviceId, channel);
+		}
+		// GPRS方式链路帧
+		else if (typeByte == PinShenWaterUtils.link_gps_type) {
+			if (packageArr.length == 1) {
+				int link = packageArr[0] & 0xff;
+				if (link == PinShenWaterUtils.link_req_code) {
+					byte[] rspBody = { PinShenWaterUtils.link_rsp_code };
+					reply(channel, device, rspBody);
+					sendReq(channel, device, createReqDataArr(), PinShenWaterUtils.data_gps_type);
+				}
+			}
+		}
+	}
+
+	private byte[] createReqDataArr() {
+		byte[] req = new byte[] { 0x02, 0x04, 0x75, 0x31, 0x00, 0x05 };
+		byte[] crcCode = CRCUtil.getCrc16(req);
+		byte[] reqBytes = ArrayUtils.addAll(req, crcCode);
+		return reqBytes;
+	}
+
+	private void reply(Channel channel, PinShenWaterDevice device, byte[] rspBody) {
+		ByteBuf buffer = Unpooled.buffer();
+		byte[] rsp = new byte[] {};
+		rsp = ArrayUtils.addAll(rsp, device.getIdentityArr());
+		rsp = ArrayUtils.addAll(rsp, device.getLengthArr());
+		rsp = ArrayUtils.add(rsp, device.getPackageNo());
+		rsp = ArrayUtils.add(rsp, device.getTypeByte());
+		rsp = ArrayUtils.add(rsp, device.getDstAddrLength());
+		rsp = ArrayUtils.addAll(rsp, device.getDstAddr());
+		rsp = ArrayUtils.add(rsp, device.getSrcAddrLength());
+		rsp = ArrayUtils.addAll(rsp, device.getSrcAddr());
+		rsp = ArrayUtils.addAll(rsp, rspBody);
+
+		byte code = PinShenWaterUtils.getXor(rsp);
+		rsp = ArrayUtils.add(rsp, code);
+		buffer.writeBytes(rsp);
+		String result = DatatypeConverter.printHexBinary(rsp);
+		ChannelFuture channelFuture = channel.write(buffer);
+		channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
+			@Override
+			public void operationComplete(Future<? super Void> future) throws Exception {
+				logger.info("server reply [{}] to client success", result);
+			}
+		});
+	}
+
+	private void sendReq(Channel channel, PinShenWaterDevice device, byte[] rspBody, Byte gpsType) {
+		ByteBuf buffer = Unpooled.buffer();
+		byte[] rsp = new byte[] {};
+
+		rsp = ArrayUtils.addAll(rsp, device.getIdentityArr());
+		rsp = ArrayUtils.addAll(rsp, device.getLengthArr());
+		rsp = ArrayUtils.add(rsp, device.getPackageNo());
+		rsp = ArrayUtils.add(rsp, gpsType);
+		rsp = ArrayUtils.add(rsp, device.getDstAddrLength());
+		rsp = ArrayUtils.addAll(rsp, device.getDstAddr());
+		rsp = ArrayUtils.add(rsp, device.getSrcAddrLength());
+		rsp = ArrayUtils.addAll(rsp, device.getSrcAddr());
+		rsp = ArrayUtils.addAll(rsp, rspBody);
+
+		byte code = PinShenWaterUtils.getXor(rsp);
+		rsp = ArrayUtils.add(rsp, code);
+		buffer.writeBytes(rsp);
+		String result = DatatypeConverter.printHexBinary(rsp);
+		ChannelFuture channelFuture = channel.write(buffer);
+		channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
+			@Override
+			public void operationComplete(Future<? super Void> future) throws Exception {
+				logger.info("server send request [{}] to client success", result);
+			}
+		});
+	}
+
+	private byte[] getOriginalData(ByteBuf in) {
+		in.resetReaderIndex();
+		int length = in.readableBytes();
+		byte[] dataByteArray = new byte[length];
+		in.readBytes(dataByteArray, 0, in.readableBytes());
+		return dataByteArray;
+	}
+
+	public void startAcceptor() {
+		EventLoopGroup bossGroup = new NioEventLoopGroup();
+		EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+		try {
+			ServerBootstrap b = new ServerBootstrap();
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+					.childHandler(new ChannelInitializer<SocketChannel>() {
+						@Override
+						protected void initChannel(SocketChannel ch) throws Exception {
+							// ch.pipeline().addLast(new HeaderTailDelimiterFrameDecoder(65535, false,
+							// Unpooled.copiedBuffer(tailSplitBytes),
+							// Unpooled.copiedBuffer(headerSplitBytes),
+							// Unpooled.copiedBuffer(headerSplitBytes1)));
+							ch.pipeline().addLast(PinShenData86ServerHandler.this);
+						}
+					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+			ChannelFuture f = b.bind(port).sync();
+			logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
+					this.getPort());
+			f.channel().closeFuture().sync();
+
+		} catch (InterruptedException e) {
+			logger.error(e.getMessage());
+		} finally {
+			cleanRedisLinkData();
+			workerGroup.shutdownGracefully();
+			bossGroup.shutdownGracefully();
+		}
+	}
+
+}

+ 9 - 0
src/main/resources/prod/application.yml

@@ -94,6 +94,15 @@ acceptance:
     dataFileDir: /home/service/collector_vorgea/rawdata/
     handlerClass: com.tidecloud.dataacceptance.service.impl.VorgeaUR0401ServerHandler
     enable: false         
+
+   -
+    name: pinshen
+    topic: device-pinshen-water
+    ip: 10.25.19.87
+    port: 6727
+    dataFileDir: /home/service/collector_6727/rawdata/
+    handlerClass: com.tidecloud.dataacceptance.service.impl.PinShenData86ServerHandler
+    enable: true    
       
 logging:
   config: