package com.tidecloud.dataacceptance.service.impl; import com.alibaba.fastjson.JSON; import com.tidecloud.dataacceptance.common.DateUtil; import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created by jhw on 2018/7/20. */ @Sharable @Scope("prototype") @Component(WatchJWServerHandler.name) public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter { public static final String name = "WatchJWServerHandler"; private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class); private static ExecutorService executorService = Executors.newSingleThreadExecutor(); private static final Long INTERVAL_TIME = 300000L; // 开关时间 private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S /** * 省电模式开关 */ private static Map switchMap = new ConcurrentHashMap<>(); /** * 设置模式 */ private static Map modelMap = new ConcurrentHashMap<>(); @Override protected void handle(ByteBuf in, Channel channel) throws Exception { // String msg = byteBufferToString(in.nioBuffer());TODO (注意) byte[] req = new byte[in.readableBytes()]; in.readBytes(req); String msg = new String(req, "UTF-8"); try { String deviceId = channelDeviceMap.get(channel); logger.info("传入数据为:" + msg); String factory = msg.substring(0, 2);// 工厂 String type = msg.substring(2, 6);// 标记 if (deviceId != null) { MDC.put(MDC_DEVICEID, deviceId); } else { logger.info("该手表没有登录:传入数据为" + msg); if (!"AP00".equals(type)) { logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg); return; } } Long time = System.currentTimeMillis(); WorkModel workModel = null; SwitchWorkModel swm = null; if (deviceId != null) { workModel = modelMap.get(deviceId);//已登录 swm = switchMap.get(deviceId);// 开关 if (workModel != null && workModel.getUrgentType() == 2 && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) { workModel.setUrgentTime(time); workModel.setUrgentType(2); modelMap.put(deviceId, workModel); normalReplyModel(factory, deviceId, channel, 3); logger.warn("超过指定时间没有收到回复》》》 重新设置"); // 更新开关切换时间 swm.setSwitchTime(time); switchMap.put(deviceId, swm); } } switch (type) { case "AP00": // 初始化登录 resolveLoginMSG(msg, channel); break; case "AP03": // 连接(心跳) BP03# normalReply(factory, channel, "BP03"); checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文 break; case "AP01": // 位置信息 String gpsState = msg.substring(12, 13); setSwitchMap(deviceId, gpsState);// 采集数据 sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel); normalReply(factory, channel, "BP01"); break; case "AP33": // 设置模式回复 //IWAP33,080835,1# if (deviceId != null) { Integer moderType = getInteger(msg.substring(14, 16));// 收到回复状态 if (swm.getWorkType() == 3) {// 当前设置的模式 if (workModel != null && moderType == 3) { workModel.setUrgentType(3); logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg); } else { logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>"); Thread.sleep(1000); normalReplyModel(factory, deviceId, channel, 3); workModel.setUrgentType(2); // 更新开关时间 swm.setSwitchTime(time); switchMap.put(deviceId, swm); } modelMap.put(deviceId, workModel); } } break; default: // 其他 logger.info("client send data without handle type ..."); break; } } finally { MDC.clear(); } } /** * 模式设置 * * @param factory * @param deviceId * @param channel */ protected void checkSwitchMap(String factory, String deviceId, Channel channel) { Long nowTime = System.currentTimeMillis();// 当前时间戳 if (deviceId == null || !switchMap.containsKey(deviceId)) { return; } SwitchWorkModel swm = switchMap.get(deviceId); logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime()))); if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) { Integer workType = (3 == swm.getWorkType()) ? 2 : 3; if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) { workType = 3; // APO1上传间隔大于5分种再次设置为紧急模式 } // 如果当前状态为A 紧急模式 swm.setSwitchTime(nowTime); swm.setWorkType(workType); switchMap.put(deviceId, swm); logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm)); normalReplyModel(factory, deviceId, channel, workType); if (workType == 3) {// 设置紧急模式 则更新紧急模式设置状态 WorkModel wm = new WorkModel(); wm.setUrgentType(2);// wm.setUrgentTime(nowTime);// 设置紧急模式时间 modelMap.put(deviceId, wm); } } else { logger.warn("心跳检测是下发模式:工作不更改状态"); } } /** * 初始化 数据记录 注册记录 * * @param deviceId */ protected void initSwitchMap(String deviceId) { if (deviceId == null) { return; } Long time = System.currentTimeMillis(); SwitchWorkModel swm = new SwitchWorkModel(); swm.setActiveTime(time); swm.setSwitchTime(time); swm.setWorkType(3);//1:正常模式,2:省电模式,3:紧急模式 swm.setGpsUpTime(0L);// GPS 上报时间接口 switchMap.put(deviceId, swm); logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm)); } /** * APOI 上报数据 更新活跃时间 * * @param deviceId */ protected void setSwitchMap(String deviceId, String gpsState) { if (deviceId == null) { return; } Long time = System.currentTimeMillis(); SwitchWorkModel swm = switchMap.get(deviceId); swm.setGpsUpTime(time); if ("A".equals(gpsState)) { swm.setActiveTime(time); logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm)); } else { logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>"); } switchMap.put(deviceId, swm); logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm)); } protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) { ByteBuf dataByteBufCopy = dataByteBuf.copy(); byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()]; dataByteBufCopy.readBytes(dataByteArray); dataByteBufCopy.release(); } /** * 登录管理 * * @param channel */ private void resolveLoginMSG(String msg, Channel channel) { /*IWAP00353456789012345# */ String message = String.valueOf(msg); String factory = message.substring(0, 2); String deviceId = message.substring(6, 21); String deviceIdInMap = channelDeviceMap.get(channel); MDC.put(MDC_DEVICEID, deviceId); if (!deviceId.equals(deviceIdInMap)) { manageChannel(channel, deviceId); } String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss"); normalReply(factory, channel, "BP00," + date + ",8"); Date loginTime = new Date(); initSwitchMap(deviceId); executorService.execute(new Runnable() { @Override public void run() { Date currentTime = new Date(); try { Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000; if (secondsSinceLogin < 5L) { TimeUnit.SECONDS.sleep(5 - secondsSinceLogin); } normalReplyModel(factory, deviceId, channel, 3); WorkModel wm = new WorkModel(); wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间 wm.setUrgentType(2);// 默认指令模式为正常模式 modelMap.put(deviceId, wm); } catch (InterruptedException e) { logger.error(e.getMessage()); } } }); } // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)# private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) { StringBuilder replyCommand = new StringBuilder(); replyCommand.append(factory).append("BP33").append(",") .append(deviceId).append(",") .append("080835").append(",") .append(workType).append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr)); } /** * 回复 * * @param channel * @content 回复内容 */ private void normalReply(String factory, Channel channel, String content) { // gps ==== >IW BP01# // 登录 ==== >IW BP00,20150101125223,8# // 心跳 ==== >IW BP03# StringBuilder replyCommand = new StringBuilder(); replyCommand.append(factory); replyCommand.append(content); replyCommand.append("#"); String replyCommandStr = replyCommand.toString(); ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length); buffer.writeBytes(replyCommandStr.getBytes()); ChannelFuture channelFuture = channel.writeAndFlush(buffer); channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr)); } @Override public void startAcceptor() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter)); ch.pipeline().addLast(WatchJWServerHandler.this); } }); ChannelFuture f = b.bind(port).sync(); logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(), this.getPort()); f.channel().closeFuture().sync(); } catch (Exception ex) { logger.warn(ex.getMessage(), ex); } finally { cleanRedisLinkData(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } /** * 移除 失效的 deviceId * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); if (!channel.isActive()) { String deviceId = channelDeviceMap.get(channel); if (deviceId != null) { switchMap.remove(deviceId); modelMap.remove(deviceId); } } super.channelInactive(ctx); } class WorkModel { /** * 设置紧急模式时间 */ private Long urgentTime; /** * 设置紧急模式状态 */ private Integer urgentType; public Long getUrgentTime() { return urgentTime; } public void setUrgentTime(Long urgentTime) { this.urgentTime = urgentTime; } public Integer getUrgentType() { return urgentType; } public void setUrgentType(Integer urgentType) { this.urgentType = urgentType; } } /** * 开关工作模式 */ class SwitchWorkModel { /** * 有效数据时间 毫秒 */ private Long activeTime; /** * 开关 A 有效,V 无效 */ private Integer workType; /** * 开关切换时间 毫秒 */ private Long switchTime; /** * GPS APO1 上傳时间 */ private Long gpsUpTime; public Long getActiveTime() { return activeTime; } public void setActiveTime(Long activeTime) { this.activeTime = activeTime; } public Integer getWorkType() { return workType; } public void setWorkType(Integer workType) { this.workType = workType; } public Long getSwitchTime() { return switchTime; } public void setSwitchTime(Long switchTime) { this.switchTime = switchTime; } public Long getGpsUpTime() { return gpsUpTime; } public void setGpsUpTime(Long gpsUpTime) { this.gpsUpTime = gpsUpTime; } } }