Skip to content

Commit f045695

Browse files
committed
提交
1 parent c7f7252 commit f045695

File tree

6 files changed

+156
-90
lines changed

6 files changed

+156
-90
lines changed

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MQTTAdapterHelper.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.arex.inst.mqtt;
22

3-
import io.arex.agent.bootstrap.model.Mocker;
3+
import io.arex.agent.bootstrap.internal.Pair;
44
import io.arex.agent.bootstrap.util.StringUtil;
55
import io.arex.inst.mqtt.adapter.MessageAdapter;
66
import io.arex.inst.runtime.config.Config;
@@ -9,7 +9,6 @@
99
import io.arex.inst.runtime.listener.EventSource;
1010
import io.arex.inst.runtime.model.ArexConstants;
1111
import io.arex.inst.runtime.util.IgnoreUtils;
12-
import io.arex.inst.runtime.util.MockUtils;
1312

1413
/**
1514
* MQTTAdapterHelper
@@ -19,25 +18,26 @@ public class MQTTAdapterHelper {
1918
public static final String PROCESSED_FLAG = "arex-processed-flag";
2019

2120

22-
public static <MC, Msg> void onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
21+
public static <MC, Msg> Pair<MC, Msg> onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
2322
Msg msg = adapter.warpMessage(message);
2423
if (msg == null) {
25-
return;
24+
return null;
2625
}
2726
if (adapter.markProcessed(msg, PROCESSED_FLAG)) {
28-
return;
27+
return null;
2928
}
3029
MC mc = adapter.warpMC(messageChannel);
3130
if (mc == null){
32-
return;
31+
return null;
3332
}
3433
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
3534
if (shouldSkip(adapter, mc, msg)) {
36-
return ;
35+
return null;
3736
}
3837
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
3938
String excludeMockTemplate = adapter.getHeader(mc, msg, ArexConstants.HEADER_EXCLUDE_MOCK);
4039
CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent(EventSource.of(caseId, excludeMockTemplate)));
40+
return Pair.of(mc,msg);
4141
}
4242

4343

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/GenericMessageImpl.java

Lines changed: 0 additions & 79 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package io.arex.inst.mqtt.adapter;
2+
3+
import io.arex.inst.mqtt.warp.GenericMessageWarp;
4+
import io.arex.inst.mqtt.warp.MessageHeaderWarp;
5+
import org.springframework.messaging.Message;
6+
import org.springframework.messaging.MessageChannel;
7+
import org.springframework.messaging.MessageHeaders;
8+
import org.springframework.messaging.support.GenericMessage;
9+
10+
import java.nio.charset.StandardCharsets;
11+
12+
/**
13+
* MessageAdapterImpl
14+
*/
15+
public class MessageAdapterImpl implements MessageAdapter<MessageChannel, Message> {
16+
17+
private static final MessageAdapterImpl INSTANCE = new MessageAdapterImpl();
18+
19+
public static MessageAdapterImpl getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
@Override
24+
public byte[] getMsg(MessageChannel messageChannel, Message msg) {
25+
Object payload = msg.getPayload();
26+
if (payload instanceof byte[]){
27+
return ((byte[]) payload);
28+
}
29+
return payload.toString().getBytes(StandardCharsets.UTF_8);
30+
}
31+
32+
@Override
33+
public MessageChannel warpMC(Object messageChannel) {
34+
if (messageChannel == null){
35+
return null;
36+
}
37+
return (MessageChannel) messageChannel;
38+
}
39+
40+
@Override
41+
public Message warpMessage(Object message) {
42+
if (message == null){
43+
return null;
44+
}
45+
if (message instanceof GenericMessageWarp){
46+
return (GenericMessageWarp) message;
47+
}
48+
49+
if (message instanceof GenericMessage) {
50+
GenericMessage messageTemp = (GenericMessage) message;
51+
MessageHeaders headers = messageTemp.getHeaders();
52+
MessageHeaderWarp messageHeaderWarp = new MessageHeaderWarp(headers);
53+
return new GenericMessageWarp(messageTemp.getPayload(), messageHeaderWarp);
54+
}
55+
return null;
56+
}
57+
58+
@Override
59+
public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) {
60+
return msg.getHeaders();
61+
}
62+
63+
@Override
64+
public boolean markProcessed(Message message, String flagKey) {
65+
if (message == null){
66+
return true;
67+
}
68+
if (message instanceof GenericMessageWarp){
69+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp)message;
70+
genericMessageWarp.put(flagKey,Boolean.TRUE.toString());
71+
}
72+
return false;
73+
}
74+
75+
@Override
76+
public String getHeader(MessageChannel messageChannel, Message message, String key) {
77+
if(message instanceof GenericMessage){
78+
Object obj = message.getHeaders().get(key);
79+
return obj != null ? obj.toString() : null;
80+
}
81+
return null;
82+
}
83+
84+
@Override
85+
public void removeHeader(MessageChannel messageChannel, Message message, String key) {
86+
if (message instanceof GenericMessageWarp){
87+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
88+
genericMessageWarp.removeHeader(key);
89+
}
90+
}
91+
92+
@Override
93+
public void addHeader(MessageChannel messageChannel, Message message, String key, String value) {
94+
if (message instanceof GenericMessageWarp){
95+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
96+
genericMessageWarp.put(key,value);
97+
}
98+
}
99+
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/inst/MQGenericInstrumentationV3.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import io.arex.inst.extension.MethodInstrumentation;
44
import io.arex.inst.extension.TypeInstrumentation;
55
import io.arex.inst.mqtt.MQTTAdapterHelper;
6-
import io.arex.inst.mqtt.adapter.GenericMessageImpl;
6+
import io.arex.inst.mqtt.adapter.MessageAdapterImpl;
77
import net.bytebuddy.asm.Advice;
88
import net.bytebuddy.description.method.MethodDescription;
99
import net.bytebuddy.description.type.TypeDescription;
@@ -41,13 +41,13 @@ public static class ArrivedAdvice {
4141
@Advice.OnMethodEnter(suppress = Throwable.class)
4242
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
4343
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
44-
MQTTAdapterHelper.onServiceEnter(GenericMessageImpl.getInstance(),messageChannel,message);
44+
MQTTAdapterHelper.onServiceEnter(MessageAdapterImpl.getInstance(), messageChannel, message);
4545
}
4646

4747
@Advice.OnMethodExit(suppress = Throwable.class)
4848
public static void onExit(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
4949
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
50-
MQTTAdapterHelper.onServiceExit(GenericMessageImpl.getInstance(), messageChannel, message);
50+
MQTTAdapterHelper.onServiceExit(MessageAdapterImpl.getInstance(), messageChannel, message);
5151
}
5252

5353
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.arex.inst.mqtt.warp;
2+
3+
import org.springframework.messaging.MessageHeaders;
4+
import org.springframework.messaging.support.GenericMessage;
5+
6+
import java.util.Map;
7+
8+
/**
9+
* @author : MentosL
10+
* @date : 2023/5/10 20:53
11+
*/
12+
public class GenericMessageWarp extends GenericMessage {
13+
14+
private MessageHeaderWarp messageHeaderWarp;
15+
private Object payload;
16+
17+
public GenericMessageWarp(Object payload) {
18+
super(payload);
19+
this.payload = payload;
20+
this.messageHeaderWarp = null;
21+
}
22+
23+
public GenericMessageWarp(Object payload, Map headers) {
24+
super(payload, headers);
25+
this.payload = payload;
26+
this.messageHeaderWarp = new MessageHeaderWarp(headers);
27+
}
28+
29+
public GenericMessageWarp(Object payload, MessageHeaders headers) {
30+
super(payload, headers);
31+
this.payload = payload;
32+
this.messageHeaderWarp = new MessageHeaderWarp(headers);
33+
}
34+
35+
public void removeHeader(String key) {
36+
if (this.messageHeaderWarp != null){
37+
this.messageHeaderWarp.remove(key);
38+
}
39+
}
40+
41+
public void put(String key, String value) {
42+
if (this.messageHeaderWarp != null) {
43+
this.messageHeaderWarp.put(key, value);
44+
}
45+
}
46+
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/MessageHeaderWarp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class MessageHeaderWarp extends MessageHeaders {
1818
private final Long timestamp;
1919

2020
public MessageHeaderWarp(MessageHeaders messageHeaders) {
21-
super(null);
21+
super(messageHeaders);
2222
this.headers = new HashMap<>();
2323
for (Map.Entry<String, Object> entry : messageHeaders.entrySet()) {
2424
this.headers.put(entry.getKey(), entry.getValue());

0 commit comments

Comments
 (0)