Skip to content

Commit db56c67

Browse files
committed
调整
1 parent 5e8e72f commit db56c67

File tree

4 files changed

+17
-32
lines changed

4 files changed

+17
-32
lines changed

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import java.nio.charset.StandardCharsets;
1111

1212
/**
13-
* MessageAdapterImpl
13+
* MessageImpl
1414
*/
1515
public class MessageAdapterImpl implements MessageAdapter<MessageChannel, Message> {
1616

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/inst/MQGenericInstrumentationV3.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.arex.inst.mqtt.inst;
22

3+
import io.arex.agent.bootstrap.internal.Pair;
34
import io.arex.inst.extension.MethodInstrumentation;
45
import io.arex.inst.extension.TypeInstrumentation;
56
import io.arex.inst.mqtt.MQTTAdapterHelper;
@@ -40,13 +41,23 @@ public static class ArrivedAdvice {
4041

4142
@Advice.OnMethodEnter(suppress = Throwable.class)
4243
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
43-
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
44-
MQTTAdapterHelper.onServiceEnter(MessageAdapterImpl.getInstance(), messageChannel, message);
44+
@Advice.Argument(value = 1, readOnly = false) Message<?> message,
45+
@Advice.Local("channelMsgPair") Pair<MessageChannel, Message> pair) {
46+
Pair<MessageChannel, Message> messageChannelMessagePair =
47+
MQTTAdapterHelper.onServiceEnter(MessageAdapterImpl.getInstance(), messageChannel, message);
48+
if (messageChannelMessagePair == null){
49+
return;
50+
}
51+
pair = messageChannelMessagePair;
4552
}
4653

4754
@Advice.OnMethodExit(suppress = Throwable.class)
4855
public static void onExit(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
49-
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
56+
@Advice.Argument(value = 1, readOnly = false) Message<?> message,
57+
@Advice.Local("channelMsgPair") Pair<MessageChannel, Message> pair) {
58+
if (pair != null){
59+
message = pair.getSecond();
60+
}
5061
MQTTAdapterHelper.onServiceExit(MessageAdapterImpl.getInstance(), messageChannel, message);
5162
}
5263

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/GenericMessageWarp.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,18 @@
1212
public class GenericMessageWarp extends GenericMessage {
1313

1414
private MessageHeaderWarp messageHeaderWarp;
15-
private Object payload;
1615

1716
public GenericMessageWarp(Object payload) {
1817
super(payload);
19-
this.payload = payload;
20-
this.messageHeaderWarp = null;
2118
}
2219

2320
public GenericMessageWarp(Object payload, Map headers) {
2421
super(payload, headers);
25-
this.payload = payload;
2622
this.messageHeaderWarp = new MessageHeaderWarp(headers);
2723
}
2824

2925
public GenericMessageWarp(Object payload, MessageHeaders headers) {
3026
super(payload, headers);
31-
this.payload = payload;
3227
this.messageHeaderWarp = new MessageHeaderWarp(headers);
3328
}
3429

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/MessageHeaderWarp.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import org.springframework.messaging.MessageHeaders;
44

5-
import java.util.Collections;
6-
import java.util.HashMap;
75
import java.util.Map;
86
import java.util.UUID;
97

@@ -13,45 +11,26 @@
1311
*/
1412
public class MessageHeaderWarp extends MessageHeaders {
1513

16-
private final Map<String, Object> headers;
17-
private final UUID id;
18-
private final Long timestamp;
1914

2015
public MessageHeaderWarp(MessageHeaders messageHeaders) {
2116
super(messageHeaders);
22-
this.headers = new HashMap<>();
23-
for (Map.Entry<String, Object> entry : messageHeaders.entrySet()) {
24-
this.headers.put(entry.getKey(), entry.getValue());
25-
}
26-
this.id = messageHeaders.getId();
27-
this.timestamp = messageHeaders.getTimestamp();
2817
}
2918

3019
public MessageHeaderWarp(Map<String, Object> headers) {
3120
super(headers);
32-
this.headers = headers;
33-
this.id = getIdGenerator().generateId();
34-
this.timestamp = System.currentTimeMillis();
3521
}
3622

3723
public MessageHeaderWarp(Map<String, Object> headers, UUID id, Long timestamp) {
3824
super(headers, id, timestamp);
39-
this.headers = headers;
40-
this.id = id;
41-
this.timestamp = timestamp;
4225
}
4326

44-
4527
public Object put(String key, Object value){
46-
this.headers.put(key,value);
28+
super.getRawHeaders().put(key,value);
4729
return value;
4830
}
4931

5032
public void remove(String key){
51-
this.headers.remove(key);
33+
super.getRawHeaders().remove(key);
5234
}
5335

54-
public Map<String, Object> getHeaders() {
55-
return Collections.unmodifiableMap(headers);
56-
}
5736
}

0 commit comments

Comments
 (0)