|
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
|
|
import org.apache.commons.lang.ArrayUtils;
|
|
import org.apache.commons.lang.ArrayUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
+import org.slf4j.MDC;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.support.SendResult;
|
|
import org.springframework.kafka.support.SendResult;
|
|
@@ -94,9 +95,13 @@ public class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
|
|
|
|
|
|
private String topic;
|
|
private String topic;
|
|
|
|
|
|
-
|
|
|
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
|
|
|
|
+
|
|
|
|
+ MDC.put("strategyName", this.getPrefixName());
|
|
|
|
+ }
|
|
|
|
|
|
protected void receiveMsg(String msg, String deviceId, Channel channel) {
|
|
protected void receiveMsg(String msg, String deviceId, Channel channel) {
|
|
|
|
+
|
|
manageChannel(channel, deviceId);
|
|
manageChannel(channel, deviceId);
|
|
kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
|
|
kafkaSendThreadPool.execute(() -> sendKakfaMsg(msg, deviceId));
|
|
//singleThreadPool.execute(() -> dataStorage(msg));
|
|
//singleThreadPool.execute(() -> dataStorage(msg));
|
|
@@ -130,12 +135,14 @@ public class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
|
|
|
|
|
|
protected void sendKakfaMsg(String msg, String deviceId) {
|
|
protected void sendKakfaMsg(String msg, String deviceId) {
|
|
Integer key = new Integer(deviceId.hashCode());
|
|
Integer key = new Integer(deviceId.hashCode());
|
|
|
|
+ MDC.put("strategyName", this.getPrefixName());
|
|
ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
|
|
ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send(topic, key, msg);
|
|
|
|
|
|
// 发送成功回调
|
|
// 发送成功回调
|
|
SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
|
|
SuccessCallback<SendResult<Integer, String>> successCallback = new SuccessCallback<SendResult<Integer, String>>() {
|
|
@Override
|
|
@Override
|
|
public void onSuccess(SendResult<Integer, String> result) {
|
|
public void onSuccess(SendResult<Integer, String> result) {
|
|
|
|
+ MDC.put("strategyName", AcceptanceInboundHandlerAdapter.this.getPrefixName());
|
|
// 成功业务逻辑
|
|
// 成功业务逻辑
|
|
logger.info("发送kafka成功.deviceId:{}, msg:{}", key, msg);
|
|
logger.info("发送kafka成功.deviceId:{}, msg:{}", key, msg);
|
|
}
|
|
}
|
|
@@ -144,6 +151,7 @@ public class AcceptanceInboundHandlerAdapter extends ChannelInboundHandlerAdapt
|
|
FailureCallback failureCallback = new FailureCallback() {
|
|
FailureCallback failureCallback = new FailureCallback() {
|
|
@Override
|
|
@Override
|
|
public void onFailure(Throwable ex) {
|
|
public void onFailure(Throwable ex) {
|
|
|
|
+ MDC.put("strategyName", AcceptanceInboundHandlerAdapter.this.getPrefixName());
|
|
// 失败业务逻辑
|
|
// 失败业务逻辑
|
|
logger.error("发送kafka失败.deviceId:{}, msg:{}", key, msg);
|
|
logger.error("发送kafka失败.deviceId:{}, msg:{}", key, msg);
|
|
singleThreadPool.execute(() -> dataStorage(msg));
|
|
singleThreadPool.execute(() -> dataStorage(msg));
|