123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- package com.tidecloud.dataacceptance.service.impl;
- import com.alibaba.fastjson.JSON;
- import com.tidecloud.dataacceptance.common.DateUtil;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.DelimiterBasedFrameDecoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- /**
- * Created by jhw on 2018/7/20.
- */
- @Sharable
- @Scope("prototype")
- @Component(WatchJWServerHandler.name)
- public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- public static final String name = "WatchJWServerHandler";
- private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
- private static ExecutorService executorService = Executors.newSingleThreadExecutor();
- private static final Long INTERVAL_TIME = 300000L; // 开关时间
- private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
- /**
- * 紧急模式
- */
- private static final Integer URGENCY = 3;
- /**
- * 其他模式
- */
- private static final Integer OTHER = 2;
- /**
- * 省电模式开关
- */
- private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
- /**
- * 设置模式
- */
- private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
- @Override
- protected void handle(ByteBuf in, Channel channel) throws Exception {
- //String msg = byteBufferToString(in.nioBuffer());TODO (注意)
- byte[] req = new byte[in.readableBytes()];
- in.readBytes(req);
- String msg = new String(req, "UTF-8");
- try {
- String deviceId = channelDeviceMap.get(channel);
- String factory = msg.substring(0, 2);// 工厂
- String type = msg.substring(2, 6);// 标记
- if (deviceId != null) {
- MDC.put(MDC_DEVICEID, deviceId);
- } else {
- logger.info("该手表没有登录:传入数据为" + msg);
- if (!"AP00".equals(type)) {
- logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
- channel.close();
- return;
- }
- }
- logger.info("传入数据为:" + msg);
- Long time = System.currentTimeMillis();
- WorkModel workModel = null;
- SwitchWorkModel swm = null;
- if (deviceId != null) {
- workModel = modelMap.get(deviceId);//已登录
- swm = switchMap.get(deviceId);// 开关
- if (workModel != null && workModel.getUrgentType() == OTHER
- && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
- workModel.setUrgentTime(time);
- workModel.setUrgentType(OTHER);
- modelMap.put(deviceId, workModel);
- normalReplyModel(factory, deviceId, channel, 3);
- logger.warn("超过指定时间没有收到回复》》》 重新设置");
- // 更新开关切换时间
- swm.setSwitchTime(time);
- switchMap.put(deviceId, swm);
- }
- }
- switch (type) {
- case "AP00": // 初始化登录
- resolveLoginMSG(msg, channel);
- break;
- case "AP03": // 连接(心跳) BP03#
- normalReply(factory, channel, "BP03");
- checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
- sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
- logger.warn("心跳报文下发 msg+" + msg);
- break;
- case "AP01": // 位置信息
- String gpsState = msg.substring(12, 13);
- // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
- String gDay = msg.substring(6, 12);
- String gTime = msg.substring(39, 45);
- // if (checkGpsTime(gDay + gTime, time)) {
- // channel.close();
- // }
- setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
- normalReply(factory, channel, "BP01");
- break;
- case "AP33": // 设置模式回复
- //IWAP33,080835,1#
- if (deviceId != null) {
- Integer moderType = getInteger(msg.substring(14, 16));// 收到回复状态
- if (swm.getWorkType() == URGENCY) {// 当前设置的模式
- if (workModel != null && moderType == URGENCY) {
- workModel.setUrgentType(URGENCY);
- logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
- } else {
- logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
- Thread.sleep(1000);
- normalReplyModel(factory, deviceId, channel, URGENCY);
- workModel.setUrgentType(OTHER);
- // 更新开关时间
- swm.setSwitchTime(time);
- switchMap.put(deviceId, swm);
- }
- modelMap.put(deviceId, workModel);
- }
- }
- break;
- default: // 其他
- logger.info("client send data without handle type ...");
- break;
- }
- } finally {
- MDC.clear();
- }
- }
- /**
- * 模式设置
- *
- * @param factory
- * @param deviceId
- * @param channel
- */
- protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
- Long nowTime = System.currentTimeMillis();// 当前时间戳
- if (deviceId == null || !switchMap.containsKey(deviceId)) {
- return;
- }
- SwitchWorkModel swm = switchMap.get(deviceId);
- logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
- if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
- Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY;
- if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
- workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式
- }
- if (URGENCY == workType) {
- //如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据
- swm.setFirstRemove(Boolean.FALSE);
- }
- // 如果当前状态为A 紧急模式
- swm.setSwitchTime(nowTime);
- swm.setWorkType(workType);
- switchMap.put(deviceId, swm);
- logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
- normalReplyModel(factory, deviceId, channel, workType);
- if (workType == URGENCY) {// 设置紧急模式 则更新紧急模式设置状态
- WorkModel wm = new WorkModel();
- wm.setUrgentType(OTHER);//
- wm.setUrgentTime(nowTime);// 设置紧急模式时间
- modelMap.put(deviceId, wm);
- }
- } else {
- logger.warn("心跳检测是下发模式:工作不更改状态");
- }
- }
- /**
- * 初始化 数据记录 注册记录
- *
- * @param deviceId
- */
- protected void initSwitchMap(String deviceId) {
- if (deviceId == null) {
- return;
- }
- Long time = System.currentTimeMillis();
- SwitchWorkModel swm = new SwitchWorkModel();
- swm.setActiveTime(time);
- swm.setSwitchTime(time);
- swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式
- swm.setGpsUpTime(0L);// GPS 上报时间接口
- swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃
- switchMap.put(deviceId, swm);
- logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
- }
- /**
- * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接
- * <p>
- *
- * @return 返回值为ture 则需要断开连接,如果返回值为false 则不需要断开连接
- */
- protected Boolean checkGpsTime(String gTime, Long time) {
- try {
- DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss");
- Date date = fmt.parse(gTime);
- if ((time - date.getTime()) > 3 * 3600 * 1000) {
- return Boolean.TRUE;
- }
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- return Boolean.FALSE;
- }
- /**
- * APOI 上报数据 更新活跃时间
- *
- * @param deviceId
- */
- protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) {
- if (deviceId == null) {
- return null;
- }
- Long time = System.currentTimeMillis();
- SwitchWorkModel swm = switchMap.get(deviceId);
- swm.setGpsUpTime(time);
- if ("A".equals(gpsState)) {
- swm.setActiveTime(time);
- logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
- if (swm.getFirstRemove()) {
- sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
- } else {
- swm.setFirstRemove(Boolean.TRUE);
- logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId);
- }
- } else {
- logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
- }
- switchMap.put(deviceId, swm);
- logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
- return swm;
- }
- protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
- ByteBuf dataByteBufCopy = dataByteBuf.copy();
- byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
- dataByteBufCopy.readBytes(dataByteArray);
- dataByteBufCopy.release();
- }
- /**
- * 登录管理
- *
- * @param channel
- */
- private void resolveLoginMSG(String msg, Channel channel) {
- /*IWAP00353456789012345# */
- String message = String.valueOf(msg);
- String factory = message.substring(0, 2);
- String deviceId = message.substring(6, 21);
- String deviceIdInMap = channelDeviceMap.get(channel);
- MDC.put(MDC_DEVICEID, deviceId);
- if (!deviceId.equals(deviceIdInMap)) {
- manageChannel(channel, deviceId);
- }
- String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
- normalReply(factory, channel, "BP00," + date + ",8");
- Date loginTime = new Date();
- initSwitchMap(deviceId);
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- Date currentTime = new Date();
- try {
- Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
- if (secondsSinceLogin < 5L) {
- TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
- }
- normalReplyModel(factory, deviceId, channel, URGENCY);
- WorkModel wm = new WorkModel();
- wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
- wm.setUrgentType(OTHER);// 默认指令模式为正常模式
- modelMap.put(deviceId, wm);
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
- }
- });
- }
- // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
- private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append(factory).append("BP33").append(",")
- .append(deviceId).append(",")
- .append("080835").append(",")
- .append(workType).append("#");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
- }
- /**
- * 回复
- *
- * @param channel
- * @content 回复内容
- */
- private void normalReply(String factory, Channel channel, String content) {
- // gps ==== >IW BP01#
- // 登录 ==== >IW BP00,20150101125223,8#
- // 心跳 ==== >IW BP03#
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append(factory);
- replyCommand.append(content);
- replyCommand.append("#");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
- }
- @Override
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
- ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
- ch.pipeline().addLast(WatchJWServerHandler.this);
- }
- });
- ChannelFuture f = b.bind(port).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (Exception ex) {
- logger.warn(ex.getMessage(), ex);
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- /**
- * 移除 失效的 deviceId
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- if (!channel.isActive()) {
- String deviceId = channelDeviceMap.get(channel);
- if (deviceId != null) {
- switchMap.remove(deviceId);
- modelMap.remove(deviceId);
- }
- }
- super.channelInactive(ctx);
- }
- class WorkModel {
- /**
- * 设置紧急模式时间
- */
- private Long urgentTime;
- /**
- * 设置紧急模式状态
- */
- private Integer urgentType;
- public Long getUrgentTime() {
- return urgentTime;
- }
- public void setUrgentTime(Long urgentTime) {
- this.urgentTime = urgentTime;
- }
- public Integer getUrgentType() {
- return urgentType;
- }
- public void setUrgentType(Integer urgentType) {
- this.urgentType = urgentType;
- }
- }
- /**
- * 开关工作模式
- */
- class SwitchWorkModel {
- /**
- * 有效数据时间 毫秒
- */
- private Long activeTime;
- /**
- * 开关 A 有效,V 无效
- */
- private Integer workType;
- /**
- * 开关切换时间 毫秒
- */
- private Long switchTime;
- /**
- * GPS APO1 上傳时间
- */
- private Long gpsUpTime;
- /**
- * 模式切换第一条是否移除 true 不需要移除, false 需要移除
- */
- private Boolean firstRemove = Boolean.TRUE;
- public Long getActiveTime() {
- return activeTime;
- }
- public void setActiveTime(Long activeTime) {
- this.activeTime = activeTime;
- }
- public Integer getWorkType() {
- return workType;
- }
- public void setWorkType(Integer workType) {
- this.workType = workType;
- }
- public Long getSwitchTime() {
- return switchTime;
- }
- public void setSwitchTime(Long switchTime) {
- this.switchTime = switchTime;
- }
- public Long getGpsUpTime() {
- return gpsUpTime;
- }
- public void setGpsUpTime(Long gpsUpTime) {
- this.gpsUpTime = gpsUpTime;
- }
- public Boolean getFirstRemove() {
- return firstRemove;
- }
- public void setFirstRemove(Boolean firstRemove) {
- this.firstRemove = firstRemove;
- }
- }
- }
|