package com.tidecloud.dataacceptance.service.impl; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.entity.Advice; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.Date; /** * Created by jhw on 2018/7/20. */ @Sharable @Scope("prototype") @Component(WatchDYJServerHandler.name) public class WatchDYJServerHandler extends HexBinaryAcceptanceHandlerAdapter { public static final String name = "WatchDYJServerHandler"; private static final Logger logger = LoggerFactory.getLogger(WatchDYJServerHandler.class); @Override protected void handle(ByteBuf in, Channel channel) throws Exception { String msg = byteBufferToString(in.nioBuffer()); Advice advice = setAdvice(msg); switch (advice.getAdviceType()) { case "INIT": // 初始化 String init = "INIT,1"; normalReply(advice, channel, init); break; case "LK": // 连接 Date date = new Date(); String ymd = DateUtil.formatDate2StringYMD(date); String hms = DateUtil.formatDate2StringHms(date); StringBuilder sb = new StringBuilder("LK,").append(ymd).append(",").append(hms); normalReply(advice, channel, sb.toString()); break; case "UD": // 位置信息 byte[] dataByteArray = new byte[in.readableBytes()]; in.readBytes(dataByteArray); sendMsg2Kafka(dataByteArray, advice.getDeviceId(), channel); logger.info("正常存储设备信息:" +advice.toString()); normalReply(advice, channel, "UD"); break; default: // 其他 normalReply(advice, channel, advice.getAdviceType()); break; } } protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) { ByteBuf dataByteBufCopy = dataByteBuf.copy(); byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()]; dataByteBufCopy.readBytes(dataByteArray); dataByteBufCopy.release(); } /** * 回复 * * @param advice * @param channel * @content 回复内容 */ private void normalReply(Advice advice, Channel channel, String content) { String facotry = advice.getFacotry(); String adviceType = advice.getAdviceType(); String deviceId = advice.getDeviceId(); StringBuilder replyCommand = new StringBuilder(); replyCommand.append("["); replyCommand.append(facotry).append("*"); replyCommand.append(deviceId).append("*"); replyCommand.append("0002").append("*");// 指令流水 replyCommand.append( Integer.toHexString(content.length())).append("*");// 内容长度 replyCommand.append(content);// 指令内容标记 replyCommand.append("]"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } /** * 解析 Advice 数据 * * @param msg * @return */ private Advice setAdvice(String msg) { Advice advice = new Advice(); try { String message = String.valueOf(msg); int startIndex = message.indexOf("["); int endIndex = message.indexOf("]"); String data = message.substring(startIndex + 1, endIndex); String[] bodys = data.split("\\*"); advice.setFacotry(bodys[0]);// 厂商 advice.setDeviceId(bodys[1]);// 设备Id advice.setAdviceSerial(bodys[2]);//指令流水 advice.setAdvicelength(bodys[3]);//指令长度 String[] contents = bodys[4].split(",");//获取内容 advice.setAdviceType(contents[0]);// 标记 return advice; } catch (Exception e) { e.printStackTrace(); } return null; } }