Skip to content

Commit 5e8e72f

Browse files
committed
提交
1 parent d915f50 commit 5e8e72f

File tree

6 files changed

+18
-137
lines changed

6 files changed

+18
-137
lines changed

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MQTTAdapterHelper.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.arex.inst.mqtt;
22

3-
import io.arex.agent.bootstrap.model.Mocker;
3+
import io.arex.agent.bootstrap.internal.Pair;
44
import io.arex.agent.bootstrap.util.StringUtil;
55
import io.arex.inst.mqtt.adapter.MessageAdapter;
66
import io.arex.inst.runtime.config.Config;
@@ -9,7 +9,6 @@
99
import io.arex.inst.runtime.listener.EventSource;
1010
import io.arex.inst.runtime.model.ArexConstants;
1111
import io.arex.inst.runtime.util.IgnoreUtils;
12-
import io.arex.inst.runtime.util.MockUtils;
1312

1413
/**
1514
* MQTTAdapterHelper
@@ -19,25 +18,26 @@ public class MQTTAdapterHelper {
1918
public static final String PROCESSED_FLAG = "arex-processed-flag";
2019

2120

22-
public static <MC, Msg> void onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
21+
public static <MC, Msg> Pair<MC, Msg> onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
2322
Msg msg = adapter.warpMessage(message);
2423
if (msg == null) {
25-
return;
24+
return null;
2625
}
2726
if (adapter.markProcessed(msg, PROCESSED_FLAG)) {
28-
return;
27+
return null;
2928
}
3029
MC mc = adapter.warpMC(messageChannel);
3130
if (mc == null){
32-
return;
31+
return null;
3332
}
3433
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
3534
if (shouldSkip(adapter, mc, msg)) {
36-
return ;
35+
return null;
3736
}
3837
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
3938
String excludeMockTemplate = adapter.getHeader(mc, msg, ArexConstants.HEADER_EXCLUDE_MOCK);
4039
CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent(EventSource.of(caseId, excludeMockTemplate)));
40+
return Pair.of(mc,msg);
4141
}
4242

4343

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/GenericMessageImpl.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapterImpl.java

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@
22

33
import io.arex.inst.mqtt.warp.GenericMessageWarp;
44
import io.arex.inst.mqtt.warp.MessageHeaderWarp;
5-
import io.arex.inst.runtime.util.LogUtil;
65
import org.springframework.messaging.Message;
76
import org.springframework.messaging.MessageChannel;
87
import org.springframework.messaging.MessageHeaders;
98
import org.springframework.messaging.support.GenericMessage;
109

11-
import java.lang.reflect.Field;
1210
import java.nio.charset.StandardCharsets;
1311

1412
/**
15-
* MessageImpl
13+
* MessageAdapterImpl
1614
*/
1715
public class MessageAdapterImpl implements MessageAdapter<MessageChannel, Message> {
1816

@@ -36,10 +34,7 @@ public MessageChannel warpMC(Object messageChannel) {
3634
if (messageChannel == null){
3735
return null;
3836
}
39-
if (messageChannel instanceof MessageChannel){
40-
return (MessageChannel) messageChannel;
41-
}
42-
return null;
37+
return (MessageChannel) messageChannel;
4338
}
4439

4540
@Override
@@ -62,10 +57,6 @@ public Message warpMessage(Object message) {
6257

6358
@Override
6459
public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) {
65-
if (msg instanceof GenericMessageWarp){
66-
GenericMessageWarp messageTemp = (GenericMessageWarp) msg;
67-
return messageTemp.getMessageHeaderWarp();
68-
}
6960
return msg.getHeaders();
7061
}
7162

@@ -83,12 +74,6 @@ public boolean markProcessed(Message message, String flagKey) {
8374

8475
@Override
8576
public String getHeader(MessageChannel messageChannel, Message message, String key) {
86-
if (message instanceof GenericMessageWarp) {
87-
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
88-
Object object = genericMessageWarp.get(key);
89-
return object != null ? object.toString() : null;
90-
}
91-
9277
if(message instanceof GenericMessage){
9378
Object obj = message.getHeaders().get(key);
9479
return obj != null ? obj.toString() : null;
@@ -111,23 +96,4 @@ public void addHeader(MessageChannel messageChannel, Message message, String key
11196
genericMessageWarp.put(key,value);
11297
}
11398
}
114-
115-
@Override
116-
public Message resetMsg(Message message) {
117-
if (message instanceof GenericMessageWarp){
118-
try {
119-
GenericMessageWarp messageWarp = (GenericMessageWarp) message;
120-
Field headers = message.getClass().getSuperclass().getDeclaredField("headers");
121-
headers.setAccessible(true);
122-
headers.set(message, messageWarp.getMessageHeaderWarp());
123-
} catch (NoSuchFieldException e) {
124-
LogUtil.warn("MessageAdapterImpl.resetMsg - NoSuchFieldException", e);
125-
} catch (IllegalAccessException e) {
126-
LogUtil.warn("MessageAdapterImpl.resetMsg - IllegalAccessException", e);
127-
}
128-
}
129-
return message;
130-
}
131-
132-
13399
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/inst/MQGenericInstrumentationV3.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import io.arex.inst.extension.MethodInstrumentation;
44
import io.arex.inst.extension.TypeInstrumentation;
55
import io.arex.inst.mqtt.MQTTAdapterHelper;
6-
import io.arex.inst.mqtt.adapter.GenericMessageImpl;
6+
import io.arex.inst.mqtt.adapter.MessageAdapterImpl;
77
import net.bytebuddy.asm.Advice;
88
import net.bytebuddy.description.method.MethodDescription;
99
import net.bytebuddy.description.type.TypeDescription;
@@ -41,13 +41,13 @@ public static class ArrivedAdvice {
4141
@Advice.OnMethodEnter(suppress = Throwable.class)
4242
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
4343
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
44-
MQTTAdapterHelper.onServiceEnter(GenericMessageImpl.getInstance(),messageChannel,message);
44+
MQTTAdapterHelper.onServiceEnter(MessageAdapterImpl.getInstance(), messageChannel, message);
4545
}
4646

4747
@Advice.OnMethodExit(suppress = Throwable.class)
4848
public static void onExit(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
4949
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
50-
MQTTAdapterHelper.onServiceExit(GenericMessageImpl.getInstance(), messageChannel, message);
50+
MQTTAdapterHelper.onServiceExit(MessageAdapterImpl.getInstance(), messageChannel, message);
5151
}
5252

5353
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/GenericMessageWarp.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,23 @@
1212
public class GenericMessageWarp extends GenericMessage {
1313

1414
private MessageHeaderWarp messageHeaderWarp;
15+
private Object payload;
1516

1617
public GenericMessageWarp(Object payload) {
1718
super(payload);
19+
this.payload = payload;
20+
this.messageHeaderWarp = null;
1821
}
1922

2023
public GenericMessageWarp(Object payload, Map headers) {
2124
super(payload, headers);
25+
this.payload = payload;
2226
this.messageHeaderWarp = new MessageHeaderWarp(headers);
2327
}
2428

2529
public GenericMessageWarp(Object payload, MessageHeaders headers) {
2630
super(payload, headers);
31+
this.payload = payload;
2732
this.messageHeaderWarp = new MessageHeaderWarp(headers);
2833
}
2934

@@ -38,15 +43,4 @@ public void put(String key, String value) {
3843
this.messageHeaderWarp.put(key, value);
3944
}
4045
}
41-
42-
public Object get(String key){
43-
if (this.messageHeaderWarp != null) {
44-
return this.messageHeaderWarp.get(key);
45-
}
46-
return getHeaders().get(key);
47-
}
48-
49-
public MessageHeaderWarp getMessageHeaderWarp() {
50-
return messageHeaderWarp;
51-
}
5246
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/MessageHeaderWarp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class MessageHeaderWarp extends MessageHeaders {
1818
private final Long timestamp;
1919

2020
public MessageHeaderWarp(MessageHeaders messageHeaders) {
21-
super(null);
21+
super(messageHeaders);
2222
this.headers = new HashMap<>();
2323
for (Map.Entry<String, Object> entry : messageHeaders.entrySet()) {
2424
this.headers.put(entry.getKey(), entry.getValue());

0 commit comments

Comments
 (0)