package com.tidecloud.dataacceptance.service.impl; import com.tidecloud.dataacceptance.DataAcceptanceApplication; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.entity.Advice; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.boot.SpringApplication; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import javax.xml.bind.DatatypeConverter; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by jhw on 2018/7/20. */ @Sharable @Scope("prototype") @Component(WatchJWServerHandler.name) public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter { public static final String name = "WatchJWServerHandler"; private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class); @Override protected void handle(ByteBuf in, Channel channel) throws Exception { String msg = byteBufferToString(in.nioBuffer()); String deviceId = channelDeviceMap.get(channel); if (deviceId != null) { MDC.put(MDC_DEVICEID, deviceId); } logger.info("传入数据为:" + msg); String factory = msg.substring(0, 2);// 工厂 String type = msg.substring(2, 6);// 标记 switch (type) { case "AP00": // 初始化登录 resolveLoginMSG(msg, channel); break; case "AP03": // 连接(心跳) BP03# normalReply(factory, channel, "BP03"); break; case "AP01": // 位置信息 sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel); normalReply(factory, channel, "BP01"); break; default: // 其他 logger.info("client send data without handle type ..."); break; } } protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) { ByteBuf dataByteBufCopy = dataByteBuf.copy(); byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()]; dataByteBufCopy.readBytes(dataByteArray); dataByteBufCopy.release(); } /** * 登录管理 * * @param channel */ private void resolveLoginMSG(String msg, Channel channel) { /*IWAP00353456789012345# */ String message = String.valueOf(msg); String factory = message.substring(0, 2); String deviceId = message.substring(6, 21); String deviceIdInMap = channelDeviceMap.get(channel); MDC.put(MDC_DEVICEID, deviceId); if (!deviceId.equals(deviceIdInMap)) { manageChannel(channel, deviceId); } String date =DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss"); normalReply(factory, channel, "BP00," + date + ",8"); // normalReplyModel(factory,deviceId,channel); TODO (无效) } // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)# private void normalReplyModel(String factory,String deviceId, Channel channel){ StringBuilder replyCommand = new StringBuilder(); replyCommand.append(factory).append("BP33").append(",") .append(deviceId).append(",") .append("080835").append(",") .append("3").append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr)); } /** * 回复 * * @param channel * @content 回复内容 */ private void normalReply(String factory, Channel channel, String content) { // gps ==== >IW BP01# // 登录 ==== >IW BP00,20150101125223,8# // 心跳 ==== >IW BP03# StringBuilder replyCommand = new StringBuilder(); replyCommand.append(factory); replyCommand.append(content); replyCommand.append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } }