Skip to content

Commit 109182c

Browse files
committed
增加mqtt代码
1 parent 9fcf269 commit 109182c

File tree

11 files changed

+437
-91
lines changed

11 files changed

+437
-91
lines changed
Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,83 @@
11
package io.arex.inst.mqtt;
22

3-
import io.arex.agent.bootstrap.model.Mocker;
4-
import io.arex.inst.runtime.util.MockUtils;
5-
import org.springframework.messaging.Message;
6-
import org.springframework.messaging.MessageChannel;
3+
import io.arex.agent.bootstrap.internal.Pair;
4+
import io.arex.agent.bootstrap.util.StringUtil;
5+
import io.arex.inst.mqtt.adapter.MessageAdapter;
6+
import io.arex.inst.runtime.config.Config;
7+
import io.arex.inst.runtime.listener.CaseEvent;
8+
import io.arex.inst.runtime.listener.CaseEventDispatcher;
9+
import io.arex.inst.runtime.listener.EventSource;
10+
import io.arex.inst.runtime.model.ArexConstants;
11+
import io.arex.inst.runtime.util.IgnoreUtils;
712

813
/**
914
* MQTTAdapterHelper
1015
*/
1116
public class MQTTAdapterHelper {
1217

18+
public static final String PROCESSED_FLAG = "arex-processed-flag";
1319

14-
public static <MC,T extends Message> void onServiceEnter(MessageAdapter<MC,T> adapter, MessageChannel messageChannel,Message<?> message){
15-
if (shouldSkip(messageChannel,message)){
16-
return;
20+
21+
public static <MC, Msg> Pair<MC, Msg> onServiceEnter(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message) {
22+
Msg msg = adapter.warpMessage(message);
23+
if (msg == null) {
24+
return null;
25+
}
26+
if (adapter.markProcessed(msg, PROCESSED_FLAG)) {
27+
return null;
28+
}
29+
MC mc = adapter.warpMC(messageChannel);
30+
if (mc == null){
31+
return null;
1732
}
33+
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
34+
if (shouldSkip(adapter, mc, msg)) {
35+
return null;
36+
}
37+
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
38+
String excludeMockTemplate = adapter.getHeader(mc, msg, ArexConstants.HEADER_EXCLUDE_MOCK);
39+
CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent(EventSource.of(caseId, excludeMockTemplate)));
40+
return Pair.of(mc,msg);
41+
}
1842

1943

44+
public static <MC, Msg> void onServiceExit(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message){
45+
Msg msg = adapter.warpMessage(message);
46+
MC mc = adapter.warpMC(messageChannel);
47+
if (msg == null || mc == null) {
48+
return;
49+
}
50+
adapter.removeHeader(mc,msg,PROCESSED_FLAG);
51+
new MessageQueueExtractor<>( mc, msg,adapter).execute();
2052
}
2153

2254

2355

24-
private static boolean shouldSkip(MessageChannel messageChannel,Message<?> mqttMessage){
25-
if (messageChannel== null || mqttMessage == null) {
56+
57+
private static<MC, Msg> boolean shouldSkip(MessageAdapter<MC, Msg> adapter,MC mc, Msg msg){
58+
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
59+
// Replay scene
60+
if (StringUtil.isNotEmpty(caseId)) {
61+
return Config.get().getBoolean("arex.disable.replay", false);
62+
}
63+
64+
String forceRecord = adapter.getHeader(mc, msg, ArexConstants.FORCE_RECORD);
65+
// Do not skip if header with arex-force-record=true
66+
if (Boolean.parseBoolean(forceRecord)) {
67+
return false;
68+
}
69+
// Skip if request header with arex-replay-warm-up=true
70+
if (Boolean.parseBoolean(adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP))) {
2671
return true;
2772
}
28-
return false;
73+
String topic = adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP);
74+
if (StringUtil.isEmpty(topic)) {
75+
return false;
76+
}
77+
if (IgnoreUtils.ignoreOperation(topic)) {
78+
return true;
79+
}
80+
return Config.get().invalidRecord(topic);
2981
}
3082

31-
public static Mocker createMocker(String operationName) {
32-
Mocker mocker = MockUtils.createMqttConsumer(operationName);
33-
mocker.getTargetRequest().setType(Byte.class.getName());
34-
return mocker;
35-
}
3683
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MessageAdapter.java

Lines changed: 0 additions & 14 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.arex.inst.mqtt;
2+
3+
import io.arex.agent.bootstrap.model.Mocker;
4+
import io.arex.inst.mqtt.adapter.MessageAdapter;
5+
import io.arex.inst.runtime.context.ContextManager;
6+
import io.arex.inst.runtime.model.ArexConstants;
7+
import io.arex.inst.runtime.util.LogUtil;
8+
import io.arex.inst.runtime.util.MockUtils;
9+
import org.springframework.messaging.MessageHeaders;
10+
11+
import java.util.Base64;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
/**
16+
* @author : MentosL
17+
* @date : 2023/5/9 22:16
18+
*/
19+
public class MessageQueueExtractor<MC,Msg> {
20+
private final MC messageChannel;
21+
private final Msg message;
22+
private final MessageAdapter<MC,Msg> adapter;
23+
24+
25+
public MessageQueueExtractor(MC messageChannel, Msg message, MessageAdapter<MC, Msg> adapter) {
26+
this.messageChannel = messageChannel;
27+
this.message = message;
28+
this.adapter = adapter;
29+
}
30+
31+
32+
public void execute() {
33+
try {
34+
if (message == null || messageChannel == null || adapter == null) {
35+
return;
36+
}
37+
if (!ContextManager.needRecordOrReplay()) {
38+
return;
39+
}
40+
doExecute();
41+
executePostProcess();
42+
} catch (Exception e) {
43+
LogUtil.warn("MessageQueue.execute", e);
44+
}
45+
}
46+
47+
private void executePostProcess() {
48+
if (ContextManager.needRecord()) {
49+
adapter.addHeader(messageChannel,message, ArexConstants.RECORD_ID,ContextManager.currentContext().getCaseId());
50+
}
51+
if (ContextManager.needReplay()) {
52+
adapter.addHeader(messageChannel,message, ArexConstants.REPLAY_ID,ContextManager.currentContext().getReplayId());
53+
}
54+
// Think about other ways to replace the head
55+
adapter.resetMsg(message);
56+
}
57+
58+
private void doExecute() {
59+
Mocker mocker = MockUtils.createMqttConsumer(adapter.getHeader(messageChannel,message,"mqtt_receivedTopic"));
60+
MessageHeaders header = adapter.getHeader(messageChannel, message);
61+
Map<String, Object> requestAttributes = new HashMap<>();
62+
for (Map.Entry<String, Object> entry : header.entrySet()) {
63+
requestAttributes.put(entry.getKey(), entry.getValue());
64+
}
65+
mocker.getTargetRequest().setAttributes(requestAttributes);
66+
mocker.getTargetRequest().setBody(Base64.getEncoder().encodeToString(adapter.getMsg(messageChannel,message)));
67+
if (ContextManager.needReplay()) {
68+
MockUtils.replayMocker(mocker);
69+
} else if (ContextManager.needRecord()) {
70+
MockUtils.recordMocker(mocker);
71+
}
72+
}
73+
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/GenericMessageImpl.java

Lines changed: 0 additions & 36 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.arex.inst.mqtt.adapter;
2+
3+
import org.springframework.messaging.MessageHeaders;
4+
5+
/**
6+
* MessageAdapter
7+
*/
8+
public interface MessageAdapter<MC,Msg> {
9+
10+
MC warpMC(Object messageChannel);
11+
12+
Msg warpMessage(Object message);
13+
14+
byte[] getMsg(MC c, Msg msg);
15+
16+
MessageHeaders getHeader(MC c, Msg msg);
17+
18+
boolean markProcessed(Msg msg,String flagKey);
19+
20+
String getHeader(MC mc,Msg msg,String key);
21+
22+
void removeHeader(MC mc,Msg msg,String key);
23+
24+
void addHeader(MC mc,Msg msg,String key,String value);
25+
26+
Msg resetMsg(Msg msg);
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package io.arex.inst.mqtt.adapter;
2+
3+
import io.arex.inst.mqtt.warp.GenericMessageWarp;
4+
import io.arex.inst.mqtt.warp.MessageHeaderWarp;
5+
import io.arex.inst.runtime.util.LogUtil;
6+
import org.springframework.messaging.Message;
7+
import org.springframework.messaging.MessageChannel;
8+
import org.springframework.messaging.MessageHeaders;
9+
import org.springframework.messaging.support.GenericMessage;
10+
11+
import java.lang.reflect.Field;
12+
import java.nio.charset.StandardCharsets;
13+
14+
/**
15+
* MessageImpl
16+
*/
17+
public class MessageAdapterImpl implements MessageAdapter<MessageChannel, Message> {
18+
19+
private static final MessageAdapterImpl INSTANCE = new MessageAdapterImpl();
20+
21+
public static MessageAdapterImpl getInstance() {
22+
return INSTANCE;
23+
}
24+
25+
@Override
26+
public byte[] getMsg(MessageChannel messageChannel, Message msg) {
27+
Object payload = msg.getPayload();
28+
if (payload instanceof byte[]){
29+
return ((byte[]) payload);
30+
}
31+
return payload.toString().getBytes(StandardCharsets.UTF_8);
32+
}
33+
34+
@Override
35+
public MessageChannel warpMC(Object messageChannel) {
36+
if (messageChannel == null){
37+
return null;
38+
}
39+
if (messageChannel instanceof MessageChannel){
40+
return (MessageChannel) messageChannel;
41+
}
42+
return null;
43+
}
44+
45+
@Override
46+
public Message warpMessage(Object message) {
47+
if (message == null){
48+
return null;
49+
}
50+
if (message instanceof GenericMessageWarp){
51+
return (GenericMessageWarp) message;
52+
}
53+
54+
if (message instanceof GenericMessage) {
55+
GenericMessage messageTemp = (GenericMessage) message;
56+
MessageHeaders headers = messageTemp.getHeaders();
57+
MessageHeaderWarp messageHeaderWarp = new MessageHeaderWarp(headers);
58+
return new GenericMessageWarp(messageTemp.getPayload(), messageHeaderWarp);
59+
}
60+
return null;
61+
}
62+
63+
@Override
64+
public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) {
65+
if (msg instanceof GenericMessageWarp){
66+
GenericMessageWarp messageTemp = (GenericMessageWarp) msg;
67+
return messageTemp.getMessageHeaderWarp();
68+
}
69+
return msg.getHeaders();
70+
}
71+
72+
@Override
73+
public boolean markProcessed(Message message, String flagKey) {
74+
if (message == null){
75+
return true;
76+
}
77+
if (message instanceof GenericMessageWarp){
78+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp)message;
79+
genericMessageWarp.put(flagKey,Boolean.TRUE.toString());
80+
}
81+
return false;
82+
}
83+
84+
@Override
85+
public String getHeader(MessageChannel messageChannel, Message message, String key) {
86+
if (message instanceof GenericMessageWarp) {
87+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
88+
Object object = genericMessageWarp.get(key);
89+
return object != null ? object.toString() : null;
90+
}
91+
92+
if(message instanceof GenericMessage){
93+
Object obj = message.getHeaders().get(key);
94+
return obj != null ? obj.toString() : null;
95+
}
96+
return null;
97+
}
98+
99+
@Override
100+
public void removeHeader(MessageChannel messageChannel, Message message, String key) {
101+
if (message instanceof GenericMessageWarp){
102+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
103+
genericMessageWarp.removeHeader(key);
104+
}
105+
}
106+
107+
@Override
108+
public void addHeader(MessageChannel messageChannel, Message message, String key, String value) {
109+
if (message instanceof GenericMessageWarp){
110+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
111+
genericMessageWarp.put(key,value);
112+
}
113+
}
114+
115+
@Override
116+
public Message resetMsg(Message message) {
117+
if (message instanceof GenericMessageWarp){
118+
try {
119+
GenericMessageWarp messageWarp = (GenericMessageWarp) message;
120+
Field headers = message.getClass().getSuperclass().getDeclaredField("headers");
121+
headers.setAccessible(true);
122+
headers.set(message, messageWarp.getMessageHeaderWarp());
123+
} catch (NoSuchFieldException e) {
124+
LogUtil.warn("MessageAdapterImpl.resetMsg - NoSuchFieldException", e);
125+
} catch (IllegalAccessException e) {
126+
LogUtil.warn("MessageAdapterImpl.resetMsg - IllegalAccessException", e);
127+
}
128+
}
129+
return message;
130+
}
131+
132+
133+
}

0 commit comments

Comments
 (0)