123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736 |
- package com.tidecloud.dataacceptance.service.impl;
- import com.accept.client.DeviceCronClient;
- import com.accept.client.VoiceMsgClient;
- import com.accept.model.VoiceMsgVo;
- import com.alibaba.fastjson.JSON;
- import com.tidecloud.dataacceptance.common.DateUtil;
- import com.tidecloud.dataacceptance.service.DelimiterJingWeiFrameDecoder;
- import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.util.ByteProcessor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- /**
- * Created by jhw on 2018/7/20.
- */
- @Sharable
- @Scope("prototype")
- @Component(WatchJWServerHandler.name)
- public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
- public static final String name = "WatchJWServerHandler";
- private static final Logger logger = LoggerFactory.getLogger(WatchJWServerHandler.class);
- private static ExecutorService executorService = Executors.newSingleThreadExecutor();
- private static final Long INTERVAL_TIME = 300000L; // 开关时间
- private static final Long URGENT_OUT_TIME = 120000L; // 超时时间120S
- /**
- * 紧急模式
- */
- private static final Integer URGENCY = 3;
- /**
- * 其他模式
- */
- private static final Integer OTHER = 2;
- /**
- * 省电模式开关
- */
- private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
- /**
- * 设置模式
- */
- private static Map<String, WorkModel> modelMap = new ConcurrentHashMap<>();
- /**
- * 心率
- */
- private static Map<String, Integer> beatMap = new ConcurrentHashMap<>();
- private static final ByteProcessor FIND_COMMA = new ByteProcessor.IndexOfProcessor((byte) ',');
- @Autowired
- private VoiceMsgClient voiceMsgClient;
- @Autowired
- private DeviceCronClient deviceCronClient;
- @Override
- protected void handle(ByteBuf in, Channel channel) throws Exception {
- //String msg = byteBufferToString(in.nioBuffer());TODO (注意)
- byte[] req = new byte[in.readableBytes()];
- in.readBytes(req);
- String msg = new String(req, "UTF-8");
- try {
- String deviceId = channelDeviceMap.get(channel);
- String factory = msg.substring(0, 2);// 工厂
- String type = msg.substring(2, 6);// 标记
- if (deviceId != null) {
- MDC.put(MDC_DEVICEID, deviceId);
- } else {
- logger.info("该手表没有登录:传入数据为" + msg);
- if (!"AP00".equals(type)) {
- logger.info("该手表没有登录:且当前报文不是登录报文,不处理!,return" + msg);
- channel.close();
- return;
- }
- }
- logger.info("传入数据为:" + msg);
- Long time = System.currentTimeMillis();
- WorkModel workModel = null;
- SwitchWorkModel swm = null;
- if (deviceId != null) {
- workModel = modelMap.get(deviceId);//已登录
- swm = switchMap.get(deviceId);// 开关
- if (workModel != null && workModel.getUrgentType() == OTHER
- && (time - workModel.getUrgentTime() > URGENT_OUT_TIME)) {
- workModel.setUrgentTime(time);
- workModel.setUrgentType(OTHER);
- modelMap.put(deviceId, workModel);
- normalReplyModel(factory, deviceId, channel, 3);
- logger.warn("超过指定时间没有收到回复》》》 重新设置");
- // 更新开关切换时间
- swm.setSwitchTime(time);
- switchMap.put(deviceId, swm);
- }
- voiceReplyToClient(deviceId, channel);//语音下行发送
- }
- switch (type) {
- case "AP00": // 初始化登录
- resolveLoginMSG(msg, channel);
- break;
- case "AP03": // 连接(心跳) BP03#
- normalReply(factory, channel, "BP03");
- checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
- sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心跳报文下发
- logger.warn("心跳报文下发 msg+" + msg);
- sendApthReplyToClient(deviceId, channel);
- break;
- case "AP01": // 位置信息
- String gpsState = msg.substring(12, 13);
- // 如果当前GPS 上报时间和当前服务器时间超过1小时直接断开连接重新连接
- setSwitchMap(deviceId, gpsState, msg, channel);// 采集数据
- normalReply(factory, channel, "BP01");
- break;
- case "AP33": // 设置模式回复
- //IWAP33,080835,1# IWAP33,080835,03#
- if (deviceId != null) {
- Integer moderType = getInteger(msg.substring(14, msg.indexOf("#")));// 收到回复状态
- if (swm.getWorkType() == URGENCY) {// 当前设置的模式
- if (workModel != null && moderType == URGENCY) {
- workModel.setUrgentType(URGENCY);
- logger.warn("紧急模式设置成功!>>>>>>>>>>>>>>" + msg);
- } else {
- logger.warn("紧急模式重新设置!>>>>>>>>>>>>>>");
- Thread.sleep(1000);
- normalReplyModel(factory, deviceId, channel, URGENCY);
- workModel.setUrgentType(OTHER);
- // 更新开关时间
- swm.setSwitchTime(time);
- switchMap.put(deviceId, swm);
- }
- modelMap.put(deviceId, workModel);
- }
- }
- break;
- case "APHT": // 心率测量
- sendMsg2Kafka((msg + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
- normalReply(factory, channel, "BPHT");
- if (beatMap.containsKey(deviceId)) {
- beatMap.put(deviceId, 2);
- logger.warn("标记APHT=2参数状态变更deviceId=" + deviceId);
- }
- break;
- case "AP49": // 心率上传 IWAP49,68#
- String heatBeat = msg.substring(7, 9);// 工厂
- StringBuffer sb = new StringBuffer("IWAPHT,");// 伪装 APHT 一样的数据格式
- sb.append(heatBeat).append(",").append("0,0,#");
- sendMsg2Kafka((sb.toString() + deviceId + "," + DateUtil.formatDate2String(new Date())).getBytes(), deviceId, channel);// 心率报文下发
- normalReply(factory, channel, "BP49");
- if (beatMap.containsKey(deviceId)) {
- beatMap.put(deviceId, 2);
- logger.warn("标记AP49=2参数状态变更deviceId=" + deviceId);
- }
- break;
- case "APXL": // 心率主动下发,终端回复
- beatMap.put(deviceId, 1);// 标记下发成功
- logger.warn("标记APXL = 1参数状态变更deviceId=" + deviceId);
- break;
- case "AP07": // 语音上传
- // TODO 设置语音上传 IWAP07,20140818064408,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
- VoiceMsgVo ap07Msg = splitAP07VoiceMsg(msg, deviceId, in);
- // 调用接口 写库
- VoiceMsgVo vo = voiceMsgClient.insVoiceMsg(ap07Msg);
- normalVoiceReply(channel, vo);
- break;
- case "AP28": // 语音下行终端回复
- // 调用接口 查询回复内容
- VoiceMsgVo ap28Msg = splitAP28VoiceMsg(msg, deviceId);
- VoiceMsgVo voiceMsg = null;
- if (ap28Msg.getLag() == 1) {
- if (ap28Msg.getNu() < ap28Msg.getTotal()) {
- // 回复下个语音包
- voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu() + 1);
- } else {
- // 更新数据状态为发送完毕
- voiceMsgClient.updateVoiceMsgSendFinish(deviceId, ap28Msg.getMsgId(), 4);
- }
- } else {
- // 回复上个语音包
- voiceMsg = voiceMsgClient.queryVoiceMsg(deviceId, ap28Msg.getNu());
- }
- if (voiceMsg != null && voiceMsg.getLag() == 1) {
- normalBP28Reply(channel, voiceMsg);
- }
- break;
- default: // 其他
- logger.info("client send data without handle type ...");
- break;
- }
- } finally {
- MDC.clear();
- }
- }
- /**
- * 语音下行发送
- *
- * @param deviceId
- * @param channel
- */
- protected void voiceReplyToClient(String deviceId, Channel channel) {
- try {
- VoiceMsgVo voiceF = voiceMsgClient.queryVoiceMsg(deviceId, 1);
- if (voiceF.getLag() == 1) {
- normalBP28Reply(channel, voiceF);
- // 更新数据状态为发送完毕
- voiceMsgClient.updateVoiceMsgSendFinish(deviceId, voiceF.getMsgId(), 3);
- }
- } catch (Exception e) {
- logger.error("语音下行发送异常!!!!! deviceId=" + deviceId);
- }
- }
- /**
- * 心率下行发送
- * beatMap (1 下发成功,2 下发完成)
- *
- * @param deviceId
- * @param channel
- */
- protected void sendApthReplyToClient(String deviceId, Channel channel) {
- try {
- // 时间校验 1: 是否需要下发心率获取
- Boolean lag = deviceCronClient.getSendApThByDeviceId(deviceId);
- if (lag) {
- if (beatMap.isEmpty()
- || !beatMap.containsKey(deviceId)
- || 1 == beatMap.get(deviceId)) {
- setBPXLToClient(deviceId, channel);
- }
- } else {
- beatMap.remove(deviceId);
- logger.warn("标记参数移除deviceId=" + deviceId);
- }
- } catch (Exception e) {
- logger.error("心率下行发送异常!!!!! deviceId=" + deviceId);
- }
- }
- /**
- * 获取心率数据 下行指令
- *
- * @param deviceId
- * @param channel
- */
- protected void setBPXLToClient(String deviceId, Channel channel) {
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("IWBPXL").append(",");
- replyCommand.append(deviceId).append(",");
- replyCommand.append("080835");//指令流水
- replyCommand.append("#");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
- }
- /**
- * 语音上行回复
- *
- * @param channel
- * @content 回复内容 IWBP07,20140818064408,6,1,1#
- */
- private void normalVoiceReply(Channel channel, VoiceMsgVo voiceMsg) {
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("IWBP07").append(",");
- replyCommand.append(voiceMsg.getVoiceTime()).append(",");
- replyCommand.append(voiceMsg.getTotal()).append(",");
- replyCommand.append(voiceMsg.getNu()).append(",");
- replyCommand.append(voiceMsg.getLag());// 接受成功1:成功,0 失败
- replyCommand.append("#");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
- }
- /**
- * 语音下行回复
- *
- * @param channel
- * @content 回复内容 获取下一条代发送终端语音 IWBP28, D3590D54,XXXX,6,1,1024,XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX#
- */
- private void normalBP28Reply(Channel channel, VoiceMsgVo voiceMsg) {
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append("IWBP28").append(",");
- replyCommand.append("D3590D54").append(",");
- replyCommand.append(voiceMsg.getMsgId()).append(",");
- replyCommand.append(voiceMsg.getTotal()).append(",");
- replyCommand.append(voiceMsg.getNu()).append(",");
- replyCommand.append(voiceMsg.getLength()).append(",");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- buffer.writeBytes(voiceMsg.getMsg());
- buffer.readerIndex(0);
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr + JSON.toJSON(voiceMsg.getMsg())));
- }
- /**
- * 上行语音包才解析
- *
- * @param msg IWAP07,20140818064408,6,1,1024,XXXXXXXX#
- * @param deviceId
- * @return
- */
- protected VoiceMsgVo splitAP07VoiceMsg(String msg, String deviceId, ByteBuf byteBufIn) {
- VoiceMsgVo voiceMsg = new VoiceMsgVo();
- String[] msgArr = msg.split(",");
- voiceMsg.setDeviceId(deviceId);
- voiceMsg.setVoiceTime(msgArr[1]);
- voiceMsg.setTotal(Integer.valueOf(msgArr[2]));
- voiceMsg.setNu(Integer.valueOf(msgArr[3]));
- voiceMsg.setLength(Integer.valueOf(msgArr[4]));
- try {
- byteBufIn.readerIndex(0);
- // 按照逗号切割
- int cutLength = 0;// 存储最后一次查询条件
- for (int i = 0; i < 5; i++) {
- int commaLength = byteBufIn.forEachByte(byteBufIn.readerIndex(), 30, FIND_COMMA);
- cutLength = commaLength;
- byteBufIn.readerIndex(commaLength + 1);
- }
- byteBufIn.readerIndex(cutLength + 1);
- byte[] bytes = new byte[voiceMsg.getLength() + 1];
- byteBufIn.readBytes(bytes);
- String msg1 = new String(bytes, "UTF-8");
- voiceMsg.setMsg(bytes);
- } catch (Exception e) {
- logger.warn("语音解析错无" + JSON.toJSONString(voiceMsg) + ":" + e.getStackTrace());
- }
- return voiceMsg;
- }
- /**
- * 下行语音包才解析
- *
- * @param msg IWAP28,D3590D54,XXXX,6,1,1#
- * @param deviceId
- * @return
- */
- protected VoiceMsgVo splitAP28VoiceMsg(String msg, String deviceId) {
- VoiceMsgVo voiceMsg = new VoiceMsgVo();
- String[] msgArr = msg.split(",");
- voiceMsg.setDeviceId(deviceId);
- voiceMsg.setTotal(Integer.valueOf(msgArr[3]));
- voiceMsg.setNu(Integer.valueOf(msgArr[4]));
- voiceMsg.setLag(Integer.valueOf(msgArr[5].substring(0, 1)));
- voiceMsg.setMsgId(Integer.valueOf(msgArr[2]));
- return voiceMsg;
- }
- /**
- * 模式设置
- *
- * @param factory
- * @param deviceId
- * @param channel
- */
- protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
- Long nowTime = System.currentTimeMillis();// 当前时间戳
- if (deviceId == null || !switchMap.containsKey(deviceId)) {
- return;
- }
- SwitchWorkModel swm = switchMap.get(deviceId);
- logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
- if (nowTime - swm.getActiveTime() > INTERVAL_TIME && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
- Integer workType = (URGENCY == swm.getWorkType()) ? OTHER : URGENCY;
- if (nowTime - swm.getGpsUpTime() > INTERVAL_TIME) {
- workType = URGENCY; // APO1上传间隔大于5分种再次设置为紧急模式
- }
- if (URGENCY == workType) {
- //如果当前设置模式为3 紧急模式,则需要移除第一条GPS 数据
- swm.setFirstRemove(Boolean.FALSE);
- }
- // 如果当前状态为A 紧急模式
- swm.setSwitchTime(nowTime);
- swm.setWorkType(workType);
- switchMap.put(deviceId, swm);
- logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
- normalReplyModel(factory, deviceId, channel, workType);
- if (workType == URGENCY) {// 设置紧急模式 则更新紧急模式设置状态
- WorkModel wm = new WorkModel();
- wm.setUrgentType(OTHER);//
- wm.setUrgentTime(nowTime);// 设置紧急模式时间
- modelMap.put(deviceId, wm);
- }
- } else {
- logger.warn("心跳检测是下发模式:工作不更改状态");
- }
- }
- /**
- * 初始化 数据记录 注册记录
- *
- * @param deviceId
- */
- protected void initSwitchMap(String deviceId) {
- if (deviceId == null) {
- return;
- }
- Long time = System.currentTimeMillis();
- SwitchWorkModel swm = new SwitchWorkModel();
- swm.setActiveTime(time);
- swm.setSwitchTime(time);
- swm.setWorkType(URGENCY);//1:正常模式,2:省电模式,3:紧急模式
- swm.setGpsUpTime(0L);// GPS 上报时间接口
- swm.setFirstRemove(Boolean.FALSE);//默认第一条数据丢弃
- switchMap.put(deviceId, swm);
- logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
- }
- /**
- * 查看GPS 上报时间和当前服务器时间超过2小时 则关闭当前连接
- * <p>
- *
- * @return 返回值为ture 则需要断开连接,如果返回值为false 则不需要断开连接
- */
- protected Boolean checkGpsTime(String gTime, Long time) {
- try {
- DateFormat fmt = new SimpleDateFormat("yyMMddHHmmss");
- Date date = fmt.parse(gTime);
- if ((time - date.getTime()) > 3 * 3600 * 1000) {
- return Boolean.TRUE;
- }
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- return Boolean.FALSE;
- }
- /**
- * APOI 上报数据 更新活跃时间
- *
- * @param deviceId
- */
- protected SwitchWorkModel setSwitchMap(String deviceId, String gpsState, String msg, Channel channel) {
- if (deviceId == null) {
- return null;
- }
- Long time = System.currentTimeMillis();
- SwitchWorkModel swm = switchMap.getOrDefault(deviceId, new SwitchWorkModel());
- swm.setGpsUpTime(time);
- if ("A".equals(gpsState)) {
- swm.setActiveTime(time);
- logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
- if (swm.getFirstRemove()) {
- sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
- } else {
- swm.setFirstRemove(Boolean.TRUE);
- logger.info("第一条数据移除>>>>>>>>>>>>>>>" + deviceId);
- }
- } else {
- logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
- }
- switchMap.put(deviceId, swm);
- logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
- return swm;
- }
- protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
- ByteBuf dataByteBufCopy = dataByteBuf.copy();
- byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
- dataByteBufCopy.readBytes(dataByteArray);
- dataByteBufCopy.release();
- }
- /**
- * 登录管理
- *
- * @param channel
- */
- private void resolveLoginMSG(String msg, Channel channel) {
- /*IWAP00353456789012345# */
- String message = String.valueOf(msg);
- String factory = message.substring(0, 2);
- String deviceId = message.substring(6, 21);
- String deviceIdInMap = channelDeviceMap.get(channel);
- MDC.put(MDC_DEVICEID, deviceId);
- if (!deviceId.equals(deviceIdInMap)) {
- manageChannel(channel, deviceId);
- }
- String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
- normalReply(factory, channel, "BP00," + date + ",8");
- Date loginTime = new Date();
- initSwitchMap(deviceId);
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- Date currentTime = new Date();
- try {
- Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
- if (secondsSinceLogin < 5L) {
- TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
- }
- normalReplyModel(factory, deviceId, channel, URGENCY);
- WorkModel wm = new WorkModel();
- wm.setUrgentTime(currentTime.getTime());// 设置紧急模式时间
- wm.setUrgentType(OTHER);// 默认指令模式为正常模式
- modelMap.put(deviceId, wm);
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
- }
- });
- }
- // IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
- private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append(factory).append("BP33").append(",")
- .append(deviceId).append(",")
- .append("080835").append(",")
- .append(workType).append("#");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("设置工作模式:" + replyCommandStr));
- }
- /**
- * 回复
- *
- * @param channel
- * @content 回复内容
- */
- private void normalReply(String factory, Channel channel, String content) {
- // gps ==== >IW BP01#
- // 登录 ==== >IW BP00,20150101125223,8#
- // 心跳 ==== >IW BP03#
- StringBuilder replyCommand = new StringBuilder();
- replyCommand.append(factory);
- replyCommand.append(content);
- replyCommand.append("#");
- String replyCommandStr = replyCommand.toString();
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
- buffer.writeBytes(replyCommandStr.getBytes());
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
- }
- @Override
- public void startAcceptor() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
- // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
- ch.pipeline().addLast(new DelimiterJingWeiFrameDecoder(65535, false, delimiter));
- ch.pipeline().addLast(WatchJWServerHandler.this);
- }
- });
- ChannelFuture f = b.bind(port).sync();
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
- this.getPort());
- f.channel().closeFuture().sync();
- } catch (Exception ex) {
- logger.warn(ex.getMessage(), ex);
- } finally {
- cleanRedisLinkData();
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- /**
- * 移除 失效的 deviceId
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- if (!channel.isActive()) {
- String deviceId = channelDeviceMap.get(channel);
- if (deviceId != null) {
- switchMap.remove(deviceId);
- modelMap.remove(deviceId);
- }
- }
- super.channelInactive(ctx);
- }
- class WorkModel {
- /**
- * 设置紧急模式时间
- */
- private Long urgentTime;
- /**
- * 设置紧急模式状态
- */
- private Integer urgentType;
- public Long getUrgentTime() {
- return urgentTime;
- }
- public void setUrgentTime(Long urgentTime) {
- this.urgentTime = urgentTime;
- }
- public Integer getUrgentType() {
- return urgentType;
- }
- public void setUrgentType(Integer urgentType) {
- this.urgentType = urgentType;
- }
- }
- /**
- * 开关工作模式
- */
- class SwitchWorkModel {
- /**
- * 有效数据时间 毫秒
- */
- private Long activeTime;
- /**
- * 开关 A 有效,V 无效
- */
- private Integer workType;
- /**
- * 开关切换时间 毫秒
- */
- private Long switchTime;
- /**
- * GPS APO1 上傳时间
- */
- private Long gpsUpTime;
- /**
- * 模式切换第一条是否移除 true 不需要移除, false 需要移除
- */
- private Boolean firstRemove = Boolean.TRUE;
- public Long getActiveTime() {
- return activeTime;
- }
- public void setActiveTime(Long activeTime) {
- this.activeTime = activeTime;
- }
- public Integer getWorkType() {
- return workType;
- }
- public void setWorkType(Integer workType) {
- this.workType = workType;
- }
- public Long getSwitchTime() {
- return switchTime;
- }
- public void setSwitchTime(Long switchTime) {
- this.switchTime = switchTime;
- }
- public Long getGpsUpTime() {
- return gpsUpTime;
- }
- public void setGpsUpTime(Long gpsUpTime) {
- this.gpsUpTime = gpsUpTime;
- }
- public Boolean getFirstRemove() {
- return firstRemove;
- }
- public void setFirstRemove(Boolean firstRemove) {
- this.firstRemove = firstRemove;
- }
- }
- }
|