|
@@ -1,5 +1,6 @@
|
|
|
package com.tidecloud.dataacceptance.service.impl;
|
|
|
|
|
|
+import com.accept.client.DeviceMsgClient;
|
|
|
import com.tidecloud.dataacceptance.common.DateUtil;
|
|
|
import com.tidecloud.dataacceptance.entity.Advice;
|
|
|
import com.tidecloud.dataacceptance.service.HexBinaryAcceptanceHandlerAdapter;
|
|
@@ -16,6 +17,7 @@ import io.netty.handler.logging.LogLevel;
|
|
|
import io.netty.handler.logging.LoggingHandler;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.annotation.Scope;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
@@ -35,208 +37,217 @@ import java.util.concurrent.TimeUnit;
|
|
|
@Component(WatchHenShengServerHandler.name)
|
|
|
public class WatchHenShengServerHandler extends HexBinaryAcceptanceHandlerAdapter {
|
|
|
|
|
|
- public static final String name = "WatchHenShengServerHandler";
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(WatchHenShengServerHandler.class);
|
|
|
- private static ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void handle(ByteBuf in, Channel channel) throws Exception {
|
|
|
- byte[] req = new byte[in.readableBytes()];
|
|
|
- in.readBytes(req);
|
|
|
- String msg = new String(req, "UTF-8");
|
|
|
- logger.info("传入数据:》》》》》》》》》" + msg);
|
|
|
- Advice advice = setAdvice(msg);
|
|
|
- if (advice == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- switch (advice.getAdviceType()) {
|
|
|
- case "INIT": // 初始化
|
|
|
- String init = "INIT,1";
|
|
|
- normalReply(advice, channel, init);
|
|
|
- //setUpload(advice, channel);
|
|
|
- break;
|
|
|
- case "LK": // 连接
|
|
|
- sendMsg2Kafka((msg + DateUtil.formatDate2String(new Date())).getBytes(), advice.getDeviceId(), channel);
|
|
|
- normalReply(advice, channel, getLinkTime());
|
|
|
- break;
|
|
|
- case "UD": // 位置信息
|
|
|
- sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
|
|
|
- logger.info("位置数据上报[UD]:" + advice.toString());
|
|
|
- normalReply(advice, channel, "UD");
|
|
|
- break;
|
|
|
- case "UD2": // 位置信息
|
|
|
- logger.info("盲点补传数据:" + advice.toString());
|
|
|
- sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
|
|
|
- //normalReply(advice, channel, "UD2");
|
|
|
- break;
|
|
|
- case "UPLOAD": // 位置信息
|
|
|
- logger.info("修改上报频率成功!");
|
|
|
- break;
|
|
|
- case "LGZONE": // 终端获取时间
|
|
|
- logger.info("终端获取时间[LGZONE]:" + advice.toString());
|
|
|
- normalReply(advice, channel, getlgZoneTime());
|
|
|
- break;
|
|
|
- default: // 其他
|
|
|
- logger.info("client send data without handle type ...");
|
|
|
- //normalReply(advice, channel, advice.getAdviceType());
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected String getLinkTime() {
|
|
|
- DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd,HH:mm:ss");
|
|
|
- Date date = new Date();
|
|
|
- TimeZone pst = TimeZone.getTimeZone("Etc/GMT+0");// 0时区
|
|
|
- dateFormatter.setTimeZone(pst);
|
|
|
- String lk_time = dateFormatter.format(date);
|
|
|
- StringBuilder sb = new StringBuilder("LK,").append(lk_time);
|
|
|
- return sb.toString();
|
|
|
- }
|
|
|
+ public static final String name = "WatchHenShengServerHandler";
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(WatchHenShengServerHandler.class);
|
|
|
+ private static ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DeviceMsgClient deviceMsgClient;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void handle(ByteBuf in, Channel channel) throws Exception {
|
|
|
+ byte[] req = new byte[in.readableBytes()];
|
|
|
+ in.readBytes(req);
|
|
|
+ String msg = new String(req, "UTF-8");
|
|
|
+ logger.info("传入数据:》》》》》》》》》" + msg);
|
|
|
+ Advice advice = setAdvice(msg);
|
|
|
+ if (advice == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ switch (advice.getAdviceType()) {
|
|
|
+ case "INIT": // 初始化
|
|
|
+ String init = "INIT,1";
|
|
|
+ normalReply(advice, channel, init);
|
|
|
+ //setUpload(advice, channel);
|
|
|
+ break;
|
|
|
+ case "LK": // 连接
|
|
|
+ sendMsg2Kafka((msg + DateUtil.formatDate2String(new Date())).getBytes(), advice.getDeviceId(), channel);
|
|
|
+ normalReply(advice, channel, getLinkTime());
|
|
|
+ deviceMsgClient.acceptDeviceMsgParam("hengsheng",
|
|
|
+ advice.getDeviceId(),
|
|
|
+ 1, msg, System.currentTimeMillis());
|
|
|
+ break;
|
|
|
+ case "UD": // 位置信息
|
|
|
+ sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
|
|
|
+ logger.info("位置数据上报[UD]:" + advice.toString());
|
|
|
+ normalReply(advice, channel, "UD");
|
|
|
+ deviceMsgClient.acceptDeviceMsgParam("hengsheng",
|
|
|
+ advice.getDeviceId(),
|
|
|
+ 2, msg, System.currentTimeMillis());
|
|
|
+ break;
|
|
|
+ case "UD2": // 位置信息
|
|
|
+ logger.info("盲点补传数据:" + advice.toString());
|
|
|
+ sendMsg2Kafka(msg.getBytes(), advice.getDeviceId(), channel);
|
|
|
+ //normalReply(advice, channel, "UD2");
|
|
|
+ break;
|
|
|
+ case "UPLOAD": // 位置信息
|
|
|
+ logger.info("修改上报频率成功!");
|
|
|
+ break;
|
|
|
+ case "LGZONE": // 终端获取时间
|
|
|
+ logger.info("终端获取时间[LGZONE]:" + advice.toString());
|
|
|
+ normalReply(advice, channel, getlgZoneTime());
|
|
|
+ break;
|
|
|
+ default: // 其他
|
|
|
+ logger.info("client send data without handle type ...");
|
|
|
+ //normalReply(advice, channel, advice.getAdviceType());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String getLinkTime() {
|
|
|
+ DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd,HH:mm:ss");
|
|
|
+ Date date = new Date();
|
|
|
+ TimeZone pst = TimeZone.getTimeZone("Etc/GMT+0");// 0时区
|
|
|
+ dateFormatter.setTimeZone(pst);
|
|
|
+ String lk_time = dateFormatter.format(date);
|
|
|
+ StringBuilder sb = new StringBuilder("LK,").append(lk_time);
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
|
|
|
// public static void main(String[] args) {
|
|
|
// getlgZoneTime();
|
|
|
// }
|
|
|
|
|
|
- protected static String getlgZoneTime() {
|
|
|
- DateFormat dateFormatter = new SimpleDateFormat("HH:mm:ss,yyyy-MM-dd");
|
|
|
- Date date = new Date();
|
|
|
- String lk_time = dateFormatter.format(date);
|
|
|
- StringBuilder sb = new StringBuilder("LGZONE,+8,").append(lk_time);
|
|
|
- return sb.toString();
|
|
|
- }
|
|
|
-
|
|
|
- protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
|
|
|
- ByteBuf dataByteBufCopy = dataByteBuf.copy();
|
|
|
- byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
|
|
|
- dataByteBufCopy.readBytes(dataByteArray);
|
|
|
- dataByteBufCopy.release();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 设置终端 上报频率
|
|
|
- *
|
|
|
- * @param advice
|
|
|
- * @param channel
|
|
|
- */
|
|
|
- protected void setUpload(Advice advice, Channel channel) {
|
|
|
- String content = "UPLOAD,1";// 一分钟一次
|
|
|
- Date loginTime = new Date();
|
|
|
- executorService.execute(new Runnable() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- Date currentTime = new Date();
|
|
|
- try {
|
|
|
- Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
|
|
|
- if (secondsSinceLogin < 5L) {
|
|
|
- TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
|
|
|
- }
|
|
|
- normalReply(advice, channel, content);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- logger.error(e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 回复
|
|
|
- * [ZJ*YYYYYYYYYY*NNNN*LEN*INIT,接收结果,]
|
|
|
- *
|
|
|
- * @param advice
|
|
|
- * @param channel
|
|
|
- * @content 回复内容
|
|
|
- */
|
|
|
- private void normalReply(Advice advice, Channel channel, String content) {
|
|
|
- String factory = advice.getFacotry();
|
|
|
- String deviceId = advice.getDeviceId();
|
|
|
- StringBuilder replyCommand = new StringBuilder();
|
|
|
- replyCommand.append("[");
|
|
|
- replyCommand.append(factory).append("*");
|
|
|
- replyCommand.append(deviceId).append("*");
|
|
|
- replyCommand.append("0001").append("*");// 指令流水
|
|
|
- replyCommand.append(numToHex16(content.length())).append("*");// 内容长度
|
|
|
- replyCommand.append(content);// 指令内容标记
|
|
|
- replyCommand.append("]");
|
|
|
- String replyCommandStr = replyCommand.toString();
|
|
|
- ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
|
|
|
- buffer.writeBytes(replyCommandStr.getBytes());
|
|
|
- ChannelFuture channelFuture = channel.writeAndFlush(buffer);
|
|
|
- channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
|
|
|
- }
|
|
|
-
|
|
|
- //使用1字节就可以表示b
|
|
|
- public static String numToHex8(int b) {
|
|
|
- return String.format("%02x", b);//2表示需要两个16进行数
|
|
|
- }
|
|
|
-
|
|
|
- //需要使用2字节表示b
|
|
|
- public static String numToHex16(int b) {
|
|
|
- return String.format("%04x", b);
|
|
|
- }
|
|
|
-
|
|
|
- public static String numToHex32(int b) {
|
|
|
- return String.format("%08x", b);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 解析 Advice 数据
|
|
|
- *
|
|
|
- * @param msg
|
|
|
- * @return
|
|
|
- */
|
|
|
- private Advice setAdvice(String msg) {
|
|
|
-
|
|
|
- Advice advice = new Advice();
|
|
|
- if (msg != null) {
|
|
|
- try {
|
|
|
- //厂商标识 设备ID 指令流水号 内容长度 指令内容
|
|
|
- String message = String.valueOf(msg);
|
|
|
- int startIndex = message.indexOf("[");
|
|
|
- int endIndex = message.indexOf("]");
|
|
|
- String data = message.substring(startIndex + 1, endIndex);
|
|
|
- String[] bodys = data.split("\\*");
|
|
|
- advice.setFacotry(bodys[0]);// 厂商
|
|
|
- advice.setDeviceId(bodys[1]);// 设备Id
|
|
|
+ protected static String getlgZoneTime() {
|
|
|
+ DateFormat dateFormatter = new SimpleDateFormat("HH:mm:ss,yyyy-MM-dd");
|
|
|
+ Date date = new Date();
|
|
|
+ String lk_time = dateFormatter.format(date);
|
|
|
+ StringBuilder sb = new StringBuilder("LGZONE,+8,").append(lk_time);
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void printAcceptanceData(ByteBuf dataByteBuf, ChannelHandlerContext ctx) {
|
|
|
+ ByteBuf dataByteBufCopy = dataByteBuf.copy();
|
|
|
+ byte[] dataByteArray = new byte[dataByteBufCopy.readableBytes()];
|
|
|
+ dataByteBufCopy.readBytes(dataByteArray);
|
|
|
+ dataByteBufCopy.release();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 设置终端 上报频率
|
|
|
+ *
|
|
|
+ * @param advice
|
|
|
+ * @param channel
|
|
|
+ */
|
|
|
+ protected void setUpload(Advice advice, Channel channel) {
|
|
|
+ String content = "UPLOAD,1";// 一分钟一次
|
|
|
+ Date loginTime = new Date();
|
|
|
+ executorService.execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ Date currentTime = new Date();
|
|
|
+ try {
|
|
|
+ Long secondsSinceLogin = (currentTime.getTime() - loginTime.getTime()) / 1000;
|
|
|
+ if (secondsSinceLogin < 5L) {
|
|
|
+ TimeUnit.SECONDS.sleep(5 - secondsSinceLogin);
|
|
|
+ }
|
|
|
+ normalReply(advice, channel, content);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 回复
|
|
|
+ * [ZJ*YYYYYYYYYY*NNNN*LEN*INIT,接收结果,]
|
|
|
+ *
|
|
|
+ * @param advice
|
|
|
+ * @param channel
|
|
|
+ * @content 回复内容
|
|
|
+ */
|
|
|
+ private void normalReply(Advice advice, Channel channel, String content) {
|
|
|
+ String factory = advice.getFacotry();
|
|
|
+ String deviceId = advice.getDeviceId();
|
|
|
+ StringBuilder replyCommand = new StringBuilder();
|
|
|
+ replyCommand.append("[");
|
|
|
+ replyCommand.append(factory).append("*");
|
|
|
+ replyCommand.append(deviceId).append("*");
|
|
|
+ replyCommand.append("0001").append("*");// 指令流水
|
|
|
+ replyCommand.append(numToHex16(content.length())).append("*");// 内容长度
|
|
|
+ replyCommand.append(content);// 指令内容标记
|
|
|
+ replyCommand.append("]");
|
|
|
+ String replyCommandStr = replyCommand.toString();
|
|
|
+ ByteBuf buffer = Unpooled.buffer(replyCommandStr.getBytes().length);
|
|
|
+ buffer.writeBytes(replyCommandStr.getBytes());
|
|
|
+ ChannelFuture channelFuture = channel.writeAndFlush(buffer);
|
|
|
+ channelFuture.addListener(future -> logger.info("Normal reply :" + replyCommandStr));
|
|
|
+ }
|
|
|
+
|
|
|
+ //使用1字节就可以表示b
|
|
|
+ public static String numToHex8(int b) {
|
|
|
+ return String.format("%02x", b);//2表示需要两个16进行数
|
|
|
+ }
|
|
|
+
|
|
|
+ //需要使用2字节表示b
|
|
|
+ public static String numToHex16(int b) {
|
|
|
+ return String.format("%04x", b);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String numToHex32(int b) {
|
|
|
+ return String.format("%08x", b);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 解析 Advice 数据
|
|
|
+ *
|
|
|
+ * @param msg
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Advice setAdvice(String msg) {
|
|
|
+
|
|
|
+ Advice advice = new Advice();
|
|
|
+ if (msg != null) {
|
|
|
+ try {
|
|
|
+ //厂商标识 设备ID 指令流水号 内容长度 指令内容
|
|
|
+ String message = String.valueOf(msg);
|
|
|
+ int startIndex = message.indexOf("[");
|
|
|
+ int endIndex = message.indexOf("]");
|
|
|
+ String data = message.substring(startIndex + 1, endIndex);
|
|
|
+ String[] bodys = data.split("\\*");
|
|
|
+ advice.setFacotry(bodys[0]);// 厂商
|
|
|
+ advice.setDeviceId(bodys[1]);// 设备Id
|
|
|
// advice.setAdviceSerial(bodys[2]);//指令流水
|
|
|
- advice.setAdvicelength(bodys[3]);//指令长度
|
|
|
- String[] contents = bodys[4].split(",");//获取内容
|
|
|
- advice.setAdviceType(contents[0]);// 标记
|
|
|
- return advice;
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void startAcceptor() {
|
|
|
- EventLoopGroup bossGroup = new NioEventLoopGroup();
|
|
|
- EventLoopGroup workerGroup = new NioEventLoopGroup();
|
|
|
- try {
|
|
|
- ServerBootstrap b = new ServerBootstrap();
|
|
|
- b.group(bossGroup, workerGroup)
|
|
|
- .channel(NioServerSocketChannel.class)
|
|
|
- .option(ChannelOption.SO_BACKLOG, 1024)
|
|
|
- .handler(new LoggingHandler(LogLevel.INFO))
|
|
|
- .childHandler(new ChannelInitializer<SocketChannel>() {
|
|
|
- @Override
|
|
|
- protected void initChannel(SocketChannel ch) {
|
|
|
- ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes());
|
|
|
- ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
|
|
|
- ch.pipeline().addLast(WatchHenShengServerHandler.this);
|
|
|
- }
|
|
|
- });
|
|
|
- ChannelFuture f = b.bind(port).sync();
|
|
|
- logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
|
|
|
- this.getPort());
|
|
|
- f.channel().closeFuture().sync();
|
|
|
- } catch (Exception ex) {
|
|
|
- logger.warn(ex.getMessage(), ex);
|
|
|
- } finally {
|
|
|
- cleanRedisLinkData();
|
|
|
- workerGroup.shutdownGracefully();
|
|
|
- bossGroup.shutdownGracefully();
|
|
|
- }
|
|
|
- }
|
|
|
+ advice.setAdvicelength(bodys[3]);//指令长度
|
|
|
+ String[] contents = bodys[4].split(",");//获取内容
|
|
|
+ advice.setAdviceType(contents[0]);// 标记
|
|
|
+ return advice;
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void startAcceptor() {
|
|
|
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
|
|
|
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
|
|
|
+ try {
|
|
|
+ ServerBootstrap b = new ServerBootstrap();
|
|
|
+ b.group(bossGroup, workerGroup)
|
|
|
+ .channel(NioServerSocketChannel.class)
|
|
|
+ .option(ChannelOption.SO_BACKLOG, 1024)
|
|
|
+ .handler(new LoggingHandler(LogLevel.INFO))
|
|
|
+ .childHandler(new ChannelInitializer<SocketChannel>() {
|
|
|
+ @Override
|
|
|
+ protected void initChannel(SocketChannel ch) {
|
|
|
+ ByteBuf delimiter = Unpooled.copiedBuffer("]".getBytes());
|
|
|
+ ch.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, false, delimiter));
|
|
|
+ ch.pipeline().addLast(WatchHenShengServerHandler.this);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ ChannelFuture f = b.bind(port).sync();
|
|
|
+ logger.info("start accept service for {}, bind address {}:{}", this.getPrefixName(), this.getIp(),
|
|
|
+ this.getPort());
|
|
|
+ f.channel().closeFuture().sync();
|
|
|
+ } catch (Exception ex) {
|
|
|
+ logger.warn(ex.getMessage(), ex);
|
|
|
+ } finally {
|
|
|
+ cleanRedisLinkData();
|
|
|
+ workerGroup.shutdownGracefully();
|
|
|
+ bossGroup.shutdownGracefully();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|