|
@@ -1,5 +1,6 @@
|
|
|
package com.tidecloud.dataacceptance.service.impl;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
import com.tidecloud.dataacceptance.common.DateUtil;
|
|
|
import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
|
|
|
import io.netty.bootstrap.ServerBootstrap;
|
|
@@ -20,6 +21,8 @@ import org.springframework.context.annotation.Scope;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.Date;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -38,13 +41,18 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
|
|
|
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
|
|
|
|
+ private static final Long INTERVAL_TIME = 300000L; // 开关时间
|
|
|
+
|
|
|
+ private static Map<String, SwitchWorkModel> switchMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+
|
|
|
@Override
|
|
|
protected void handle(ByteBuf in, Channel channel) throws Exception {
|
|
|
|
|
|
// String msg = byteBufferToString(in.nioBuffer());TODO (注意)
|
|
|
byte[] req = new byte[in.readableBytes()];
|
|
|
in.readBytes(req);
|
|
|
- String msg = new String(req,"UTF-8");
|
|
|
+ String msg = new String(req, "UTF-8");
|
|
|
String deviceId = channelDeviceMap.get(channel);
|
|
|
if (deviceId != null) {
|
|
|
MDC.put(MDC_DEVICEID, deviceId);
|
|
@@ -59,10 +67,13 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
break;
|
|
|
case "AP03": // 连接(心跳) BP03#
|
|
|
normalReply(factory, channel, "BP03");
|
|
|
+ checkSwitchMap(factory, deviceId, channel);// 心跳校验 下发报文
|
|
|
break;
|
|
|
case "AP01": // 位置信息
|
|
|
+ String gpsState = msg.substring(12, 13);
|
|
|
sendMsg2Kafka((msg + deviceId).getBytes(), deviceId, channel);
|
|
|
normalReply(factory, channel, "BP01");
|
|
|
+ setSwitchMap(deviceId, gpsState);// 采集数据
|
|
|
break;
|
|
|
default: // 其他
|
|
|
logger.info("client send data without handle type ...");
|
|
@@ -74,6 +85,65 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
}
|
|
|
|
|
|
|
|
|
+ // 心跳包check map 数据
|
|
|
+ protected void checkSwitchMap(String factory, String deviceId, Channel channel) {
|
|
|
+ Long nowTime = System.currentTimeMillis();// 当前时间戳
|
|
|
+ if (deviceId == null || !switchMap.containsKey(deviceId)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ SwitchWorkModel swm = switchMap.get(deviceId);
|
|
|
+ logger.warn("心跳检测是否更改终端模式:" + (nowTime - swm.getActiveTime() + ";" + (nowTime - swm.getSwitchTime())));
|
|
|
+ if (nowTime - swm.getActiveTime() > INTERVAL_TIME
|
|
|
+ && nowTime - swm.getSwitchTime() > INTERVAL_TIME) {
|
|
|
+ // 如果当前状态为A 紧急模式
|
|
|
+ Integer workType = (3 == swm.getWorkType()) ? 1 : 3;
|
|
|
+ swm.setSwitchTime(nowTime);
|
|
|
+ swm.setWorkType(workType);
|
|
|
+ switchMap.put(deviceId, swm);
|
|
|
+ logger.warn("心跳检测是下发模式:工作状态" + JSON.toJSONString(swm));
|
|
|
+ normalReplyModel(factory, deviceId, channel, workType);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化 数据记录 注册记录
|
|
|
+ *
|
|
|
+ * @param deviceId
|
|
|
+ */
|
|
|
+ protected void initSwitchMap(String deviceId) {
|
|
|
+ if (deviceId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Long time = System.currentTimeMillis();
|
|
|
+ SwitchWorkModel swm = new SwitchWorkModel();
|
|
|
+ swm.setActiveTime(time);
|
|
|
+ swm.setSwitchTime(time);
|
|
|
+ swm.setWorkType(3);
|
|
|
+ switchMap.put(deviceId, swm);
|
|
|
+ logger.warn("初始化数据》》》》》》》》》》" + JSON.toJSONString(swm));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * APOI 上报数据 更新活跃时间
|
|
|
+ *
|
|
|
+ * @param deviceId
|
|
|
+ */
|
|
|
+ protected void setSwitchMap(String deviceId, String gpsState) {
|
|
|
+ if (deviceId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if ("A".equals(gpsState)) {
|
|
|
+ SwitchWorkModel swm = switchMap.get(deviceId);
|
|
|
+ Long time = System.currentTimeMillis();
|
|
|
+ swm.setActiveTime(time);
|
|
|
+ switchMap.put(deviceId, swm);
|
|
|
+ logger.warn("更新A 活跃时间》》》》》》》》》》" + JSON.toJSONString(swm));
|
|
|
+ } else {
|
|
|
+ logger.warn("当前上报数据为V 时间不更新!!>>>>>>>>>>>>>>>");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
|
|
|
ByteBuf dataByteBufCopy = dataByteBuf.copy();
|
|
|
byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
|
|
@@ -99,6 +169,7 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
String date = DateUtil.getForamtDateUtc0();// DateUtil.formatDate2String(new Date(), "yyyyMMddHHmmss");
|
|
|
normalReply(factory, channel, "BP00," + date + ",8");
|
|
|
Date loginTime = new Date();
|
|
|
+ initSwitchMap(deviceId);
|
|
|
executorService.execute(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -108,7 +179,7 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
if (secondsSinceLogin < 5L) {
|
|
|
TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
|
|
|
}
|
|
|
- normalReplyModel(factory, deviceId, channel);
|
|
|
+ normalReplyModel(factory, deviceId, channel, 3);
|
|
|
} catch (InterruptedException e) {
|
|
|
logger.error(e.getMessage());
|
|
|
}
|
|
@@ -119,12 +190,12 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
|
|
|
|
|
|
// IWBP33,353456789012345,080835,3(设备工作模式,1:正常模式,2:省电模式,3:紧急模式)#
|
|
|
- private void normalReplyModel(String factory, String deviceId, Channel channel) {
|
|
|
+ private void normalReplyModel(String factory, String deviceId, Channel channel, Integer workType) {
|
|
|
StringBuilder replyCommand = new StringBuilder();
|
|
|
replyCommand.append(factory).append("BP33").append(",")
|
|
|
.append(deviceId).append(",")
|
|
|
.append("080835").append(",")
|
|
|
- .append("3").append("#");
|
|
|
+ .append(workType).append("#");
|
|
|
String replyCommandStr = replyCommand.toString();
|
|
|
ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
|
|
|
buffer.writeBytes(replyCommandStr.getBytes());
|
|
@@ -185,4 +256,68 @@ public class WatchJWServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
}
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
+ * 移除 失效的 deviceId
|
|
|
+ *
|
|
|
+ * @param ctx
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ if (!channel.isActive()) {
|
|
|
+ String deviceId = channelDeviceMap.get(channel);
|
|
|
+ if (deviceId != null) {
|
|
|
+ switchMap.remove(deviceId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ super.channelInactive(ctx);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 开关工作模式
|
|
|
+ */
|
|
|
+ class SwitchWorkModel {
|
|
|
+ /**
|
|
|
+ * 有效数据时间 毫秒
|
|
|
+ */
|
|
|
+ private Long activeTime;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 开关 A 有效,V 无效
|
|
|
+ */
|
|
|
+ private Integer workType;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 开关切换时间 毫秒
|
|
|
+ */
|
|
|
+ private Long switchTime;
|
|
|
+
|
|
|
+ public Long getActiveTime() {
|
|
|
+ return activeTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setActiveTime(Long activeTime) {
|
|
|
+ this.activeTime = activeTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Integer getWorkType() {
|
|
|
+ return workType;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setWorkType(Integer workType) {
|
|
|
+ this.workType = workType;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long getSwitchTime() {
|
|
|
+ return switchTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setSwitchTime(Long switchTime) {
|
|
|
+ this.switchTime = switchTime;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|