|
@@ -0,0 +1,129 @@
|
|
|
+package com.tidecloud.dataacceptance.service.impl;
|
|
|
+
|
|
|
+import com.tidecloud.dataacceptance.common.DateUtil;
|
|
|
+import com.tidecloud.dataacceptance.entity.Advice;
|
|
|
+import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelHandler.Sharable;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.util.concurrent.Future;
|
|
|
+import io.netty.util.concurrent.GenericFutureListener;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.context.annotation.Scope;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.Date;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Created by jhw on 2018/7/20.
|
|
|
+ */
|
|
|
+@Sharable
|
|
|
+@Scope("prototype")
|
|
|
+@Component(WatchDYJServerHandler.name)
|
|
|
+public class WatchDYJServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
+
|
|
|
+ public static final String name = "WatchDYJServerHandler";
|
|
|
+
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(WatchDYJServerHandler.class);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void handle(ByteBuf in, Channel channel) throws Exception {
|
|
|
+ String msg = byteBufferToString(in.nioBuffer());
|
|
|
+ Advice advice = setAdvice(msg);
|
|
|
+ switch (advice.getAdviceType()) {
|
|
|
+ case "INIT": // 初始化
|
|
|
+ String init = "INIT,1";
|
|
|
+ normalReply(advice, channel, init);
|
|
|
+ break;
|
|
|
+ case "LK": // 连接
|
|
|
+ Date date = new Date();
|
|
|
+ String ymd = DateUtil.formatDate2StringYMD(date);
|
|
|
+ String hms = DateUtil.formatDate2StringHms(date);
|
|
|
+ StringBuilder sb = new StringBuilder("LK,").append(ymd).append(",").append(hms);
|
|
|
+ normalReply(advice, channel, sb.toString());
|
|
|
+ break;
|
|
|
+ case "UD": // 位置信息
|
|
|
+ byte[] dataByteArray = new byte[in.readableBytes()];
|
|
|
+ in.readBytes(dataByteArray);
|
|
|
+ sendMsg2Kafka(dataByteArray, advice.getDeviceId(), channel);
|
|
|
+ // logger.info("正常存储设备信息:" + getDevice(msg).toString());
|
|
|
+ normalReply(advice, channel, "UD");
|
|
|
+ break;
|
|
|
+ default: // 其他
|
|
|
+ normalReply(advice, channel, advice.getAdviceType());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
|
|
|
+ ByteBuf dataByteBufCopy = dataByteBuf.copy();
|
|
|
+ byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
|
|
|
+ dataByteBufCopy.readBytes(dataByteArray);
|
|
|
+ dataByteBufCopy.release();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 回复
|
|
|
+ *
|
|
|
+ * @param advice
|
|
|
+ * @param channel
|
|
|
+ * @content 回复内容
|
|
|
+ */
|
|
|
+ private void normalReply(Advice advice, Channel channel, String content) {
|
|
|
+ String facotry = advice.getFacotry();
|
|
|
+ String adviceType = advice.getAdviceType();
|
|
|
+ String deviceId = advice.getDeviceId();
|
|
|
+ StringBuilder replyCommand = new StringBuilder();
|
|
|
+ replyCommand.append("[");
|
|
|
+ replyCommand.append(facotry).append("*");
|
|
|
+ replyCommand.append(deviceId).append("*");
|
|
|
+ replyCommand.append("0002").append("*");// 指令流水
|
|
|
+ replyCommand.append( Integer.toHexString(content.length())).append("*");// 内容长度
|
|
|
+ replyCommand.append(content);// 指令内容标记
|
|
|
+ replyCommand.append("]");
|
|
|
+
|
|
|
+ String replyCommandStr = replyCommand.toString();
|
|
|
+ ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
|
|
|
+ buffer.writeBytes(replyCommandStr.getBytes());
|
|
|
+ ChannelFuture channelFuture = channel.writeAndFlush(buffer);
|
|
|
+ channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
|
|
|
+ @Override
|
|
|
+ public void operationComplete(Future<? super Void> future) throws Exception {
|
|
|
+ logger.info("Normal reply :" + replyCommandStr);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解析 Advice 数据
|
|
|
+ *
|
|
|
+ * @param msg
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Advice setAdvice(String msg) {
|
|
|
+ Advice advice = new Advice();
|
|
|
+ try {
|
|
|
+ String message = String.valueOf(msg);
|
|
|
+ int startIndex = message.indexOf("[");
|
|
|
+ int endIndex = message.indexOf("]");
|
|
|
+ String data = message.substring(startIndex + 1, endIndex);
|
|
|
+ String[] bodys = data.split("\\*");
|
|
|
+ advice.setFacotry(bodys[0]);// 厂商
|
|
|
+ advice.setDeviceId(bodys[1]);// 设备Id
|
|
|
+ advice.setAdviceSerial(bodys[2]);//指令流水
|
|
|
+ advice.setAdvicelength(bodys[3]);//指令长度
|
|
|
+ String[] contents = bodys[4].split(",");//获取内容
|
|
|
+ advice.setAdviceType(contents[0]);// 标记
|
|
|
+ return advice;
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+}
|