123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package com.tidecloud.dataacceptance.service.impl;
- import com.tidecloud.dataacceptance.common.DateUtil;
- import com.tidecloud.dataacceptance.entity.Advice;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.concurrent.Future;
- import io.netty.util.concurrent.GenericFutureListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import java.util.Date;
- /**
- * Created by jhw on 2018/7/20.
- */
- @Sharable
- @Scope("prototype")
- @Component(WatchDYJServerHandler.name)
- public class WatchDYJServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- public static final String name = "WatchDYJServerHandler";
- private static final Logger logger = LoggerFactory.getLogger(WatchDYJServerHandler.class);
- @Override
- protected void handle(ByteBuf in, Channel channel) throws Exception {
- String msg = byteBufferToString(in.nioBuffer());
- Advice advice = setAdvice(msg);
- switch (advice.getAdviceType()) {
- case "INIT": // 初始化
- String init = "INIT,1";
- normalReply(advice, channel, init);
- break;
- case "LK": // 连接
- Date date = new Date();
- String ymd = DateUtil.formatDate2StringYMD(date);
- String hms = DateUtil.formatDate2StringHms(date);
- StringBuilder sb = new StringBuilder("LK,").append(ymd).append(",").append(hms);
- normalReply(advice, channel, sb.toString());
- break;
- case "UD": // 位置信息
- byte[] dataByteArray = new byte[in.readableBytes()];
- in.readBytes(dataByteArray);
- sendMsg2Kafka(dataByteArray, advice.getDeviceId(), channel);
- logger.info("正常存储设备信息:" +advice.toString());
- normalReply(advice, channel, "UD");
- break;
- default: // 其他
- normalReply(advice, channel, advice.getAdviceType());
- break;
- }
- }
- protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
- ByteBuf dataByteBufCopy = dataByteBuf.copy();
- byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
- dataByteBufCopy.readBytes(dataByteArray);
- dataByteBufCopy.release();
- }
- /**
- * 回复
- *
- * @param advice
- * @param channel
- * @content 回复内容
- */
- private void normalReply(Advice advice, Channel channel, String content) {
- String facotry = advice.getFacotry();
- String adviceType = advice.getAdviceType();
- String deviceId = advice.getDeviceId();
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("[");
- replyCommand.append(facotry).append("*");
- replyCommand.append(deviceId).append("*");
- replyCommand.append("0002").append("*");// 指令流水
- replyCommand.append( Integer.toHexString(content.length())).append("*");// 内容长度
- replyCommand.append(content);// 指令内容标记
- replyCommand.append("]");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
- }
- /**
- * 解析 Advice 数据
- *
- * @param msg
- * @return
- */
- private Advice setAdvice(String msg) {
- Advice advice = new Advice();
- try {
- String message = String.valueOf(msg);
- int startIndex = message.indexOf("[");
- int endIndex = message.indexOf("]");
- String data = message.substring(startIndex + 1, endIndex);
- String[] bodys = data.split("\\*");
- advice.setFacotry(bodys[0]);// 厂商
- advice.setDeviceId(bodys[1]);// 设备Id
- advice.setAdviceSerial(bodys[2]);//指令流水
- advice.setAdvicelength(bodys[3]);//指令长度
- String[] contents = bodys[4].split(",");//获取内容
- advice.setAdviceType(contents[0]);// 标记
- return advice;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- }
|