Skip to content

Commit cdb26a2

Browse files
committed
新增mqtt
1 parent 07a3b78 commit cdb26a2

14 files changed

+573
-38
lines changed

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MessageQueueExtractor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.springframework.messaging.MessageHeaders;
1010

1111
import java.util.Base64;
12+
import java.util.Collections;
1213
import java.util.HashMap;
1314
import java.util.Map;
1415

@@ -37,31 +38,35 @@ public void execute() {
3738
if (!ContextManager.needRecordOrReplay()) {
3839
return;
3940
}
41+
executeBeforeProcess();
4042
doExecute();
41-
executePostProcess();
43+
executeAfterProcess();
4244
} catch (Exception e) {
4345
LogUtil.warn("MessageQueue.execute", e);
4446
}
4547
}
4648

47-
private void executePostProcess() {
49+
private void executeBeforeProcess() {
4850
if (ContextManager.needRecord()) {
4951
adapter.addHeader(messageChannel,message, ArexConstants.RECORD_ID,ContextManager.currentContext().getCaseId());
5052
}
5153
if (ContextManager.needReplay()) {
5254
adapter.addHeader(messageChannel,message, ArexConstants.REPLAY_ID,ContextManager.currentContext().getReplayId());
5355
}
56+
}
57+
private void executeAfterProcess(){
5458
// Think about other ways to replace the head
5559
adapter.resetMsg(message);
5660
}
5761

5862
private void doExecute() {
5963
Mocker mocker = MockUtils.createMqttConsumer(adapter.getHeader(messageChannel,message,"mqtt_receivedTopic"));
6064
MessageHeaders header = adapter.getHeader(messageChannel, message);
61-
Map<String, Object> requestAttributes = new HashMap<>();
65+
Map<String, Object> requestOrigin = new HashMap<>();
6266
for (Map.Entry<String, Object> entry : header.entrySet()) {
63-
requestAttributes.put(entry.getKey(), entry.getValue());
67+
requestOrigin.put(entry.getKey(), entry.getValue());
6468
}
69+
Map<String, Object> requestAttributes = Collections.singletonMap("Headers", requestOrigin);
6570
mocker.getTargetRequest().setAttributes(requestAttributes);
6671
mocker.getTargetRequest().setBody(Base64.getEncoder().encodeToString(adapter.getMsg(messageChannel,message)));
6772
if (ContextManager.needReplay()) {

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public interface MessageAdapter<MC,Msg> {
1919

2020
String getHeader(MC mc,Msg msg,String key);
2121

22-
void removeHeader(MC mc,Msg msg,String key);
22+
boolean removeHeader(MC mc,Msg msg,String key);
2323

24-
void addHeader(MC mc,Msg msg,String key,String value);
24+
boolean addHeader(MC mc,Msg msg,String key,String value);
2525

2626
Msg resetMsg(Msg msg);
2727

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapterImpl.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.arex.inst.mqtt.adapter;
22

3+
import io.arex.agent.bootstrap.util.StringUtil;
34
import io.arex.inst.mqtt.warp.GenericMessageWarp;
45
import io.arex.inst.mqtt.warp.MessageHeaderWarp;
56
import io.arex.inst.runtime.util.LogUtil;
@@ -24,7 +25,13 @@ public static MessageAdapterImpl getInstance() {
2425

2526
@Override
2627
public byte[] getMsg(MessageChannel messageChannel, Message msg) {
28+
if (msg == null){
29+
return new byte[]{};
30+
}
2731
Object payload = msg.getPayload();
32+
if (payload == null){
33+
return new byte[]{};
34+
}
2835
if (payload instanceof byte[]){
2936
return ((byte[]) payload);
3037
}
@@ -57,11 +64,17 @@ public Message warpMessage(Object message) {
5764
MessageHeaderWarp messageHeaderWarp = new MessageHeaderWarp(headers);
5865
return new GenericMessageWarp(messageTemp.getPayload(), messageHeaderWarp);
5966
}
67+
if (message instanceof Message){
68+
return (Message)message;
69+
}
6070
return null;
6171
}
6272

6373
@Override
6474
public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) {
75+
if (msg == null){
76+
return null;
77+
}
6578
if (msg instanceof GenericMessageWarp){
6679
GenericMessageWarp messageTemp = (GenericMessageWarp) msg;
6780
return messageTemp.getMessageHeaderWarp();
@@ -83,6 +96,9 @@ public boolean markProcessed(Message message, String flagKey) {
8396

8497
@Override
8598
public String getHeader(MessageChannel messageChannel, Message message, String key) {
99+
if (message == null || StringUtil.isEmpty(key)){
100+
return null;
101+
}
86102
if (message instanceof GenericMessageWarp) {
87103
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
88104
Object object = genericMessageWarp.get(key);
@@ -93,27 +109,44 @@ public String getHeader(MessageChannel messageChannel, Message message, String k
93109
Object obj = message.getHeaders().get(key);
94110
return obj != null ? obj.toString() : null;
95111
}
112+
if (message.getHeaders() != null){
113+
Object obj = message.getHeaders().get(key);
114+
return obj != null ? obj.toString() : null ;
115+
}
96116
return null;
97117
}
98118

99119
@Override
100-
public void removeHeader(MessageChannel messageChannel, Message message, String key) {
120+
public boolean removeHeader(MessageChannel messageChannel, Message message, String key) {
121+
if (message == null || StringUtil.isEmpty(key)){
122+
return false;
123+
}
101124
if (message instanceof GenericMessageWarp){
102125
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
103126
genericMessageWarp.removeHeader(key);
127+
return true;
104128
}
129+
return false;
105130
}
106131

107132
@Override
108-
public void addHeader(MessageChannel messageChannel, Message message, String key, String value) {
133+
public boolean addHeader(MessageChannel messageChannel, Message message, String key, String value) {
134+
if (message == null ){
135+
return false;
136+
}
109137
if (message instanceof GenericMessageWarp){
110138
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
111139
genericMessageWarp.put(key,value);
140+
return true;
112141
}
142+
return false;
113143
}
114144

115145
@Override
116146
public Message resetMsg(Message message) {
147+
if (message == null){
148+
return null;
149+
}
117150
if (message instanceof GenericMessageWarp){
118151
try {
119152
GenericMessageWarp messageWarp = (GenericMessageWarp) message;

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/inst/MQGenericInstrumentationV3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
*/
2525
public class MQGenericInstrumentationV3 extends TypeInstrumentation {
2626
@Override
27-
protected ElementMatcher<TypeDescription> typeMatcher() {
27+
public ElementMatcher<TypeDescription> typeMatcher() {
2828
return extendsClass(named("org.springframework.messaging.core.AbstractMessageSendingTemplate"), false)
2929
.and(named("org.springframework.messaging.core.GenericMessagingTemplate"));
3030
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/GenericMessageWarp.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class GenericMessageWarp extends GenericMessage {
1515

1616
public GenericMessageWarp(Object payload) {
1717
super(payload);
18+
this.messageHeaderWarp = new MessageHeaderWarp(super.getHeaders());
1819
}
1920

2021
public GenericMessageWarp(Object payload, Map headers) {

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/MessageHeaderWarp.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,14 @@ public class MessageHeaderWarp extends MessageHeaders {
1414

1515
public MessageHeaderWarp(MessageHeaders messageHeaders) {
1616
super(messageHeaders);
17-
this.put(ID,messageHeaders.get(ID));
18-
this.put(TIMESTAMP,messageHeaders.get(TIMESTAMP));
17+
if (messageHeaders != null && messageHeaders.size() > 0){
18+
if (messageHeaders.get(ID) != null){
19+
this.put(ID,messageHeaders.get(ID));
20+
}
21+
if (messageHeaders.get(TIMESTAMP) != null){
22+
this.put(TIMESTAMP,messageHeaders.get(TIMESTAMP));
23+
}
24+
}
1925
}
2026

2127
public MessageHeaderWarp(Map<String, Object> headers) {

arex-instrumentation/mq/arex-integration-mqtt/src/test/java/io/arex/inst/mqtt/inst/EclipseInstrumentationV3Test.java

Lines changed: 0 additions & 26 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package io.arex.inst.mqtt.inst;
2+
3+
import io.arex.agent.bootstrap.internal.Pair;
4+
import io.arex.inst.mqtt.MQTTAdapterHelper;
5+
import io.arex.inst.mqtt.adapter.MessageAdapter;
6+
import io.arex.inst.mqtt.adapter.MessageAdapterImpl;
7+
import io.arex.inst.runtime.context.ContextManager;
8+
import io.arex.inst.runtime.context.RecordLimiter;
9+
import io.arex.inst.runtime.model.ArexConstants;
10+
import io.arex.inst.runtime.util.IgnoreUtils;
11+
import org.junit.jupiter.api.AfterAll;
12+
import org.junit.jupiter.api.BeforeAll;
13+
import org.junit.jupiter.api.extension.ExtendWith;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.Arguments;
16+
import org.junit.jupiter.params.provider.MethodSource;
17+
import org.mockito.Mockito;
18+
import org.mockito.junit.jupiter.MockitoExtension;
19+
import org.springframework.messaging.Message;
20+
import org.springframework.messaging.MessageChannel;
21+
22+
import java.util.Objects;
23+
import java.util.function.Predicate;
24+
import java.util.stream.Stream;
25+
26+
import static org.junit.jupiter.api.Assertions.assertTrue;
27+
import static org.junit.jupiter.params.provider.Arguments.arguments;
28+
import static org.mockito.ArgumentMatchers.any;
29+
import static org.mockito.ArgumentMatchers.eq;
30+
31+
/**
32+
* @author : MentosL
33+
* @date : 2023/5/16 20:53
34+
*/
35+
@ExtendWith(MockitoExtension.class)
36+
public class MQTTAdapterHelperTest {
37+
static MessageAdapter messageAdapter;
38+
static MessageChannel messageChannel;
39+
static Message message;
40+
41+
@BeforeAll
42+
static void setUp() {
43+
messageAdapter = Mockito.mock(MessageAdapter.class);
44+
messageChannel = Mockito.mock(MessageChannel.class);
45+
message = Mockito.mock(Message.class);
46+
}
47+
48+
@AfterAll
49+
static void tearDown() {
50+
messageAdapter = null;
51+
messageChannel = null;
52+
message = null;
53+
Mockito.clearAllCaches();
54+
}
55+
56+
@ParameterizedTest
57+
@MethodSource("onServiceEnterCase")
58+
void onServiceEnter(Runnable mocker, Predicate<Pair> predicate) {
59+
mocker.run();
60+
Pair result = MQTTAdapterHelper.onServiceEnter(messageAdapter, new Object(), new Object());
61+
assertTrue(predicate.test(result));
62+
}
63+
64+
static Stream<Arguments> onServiceEnterCase() {
65+
Runnable emptyMocker = () -> {};
66+
Runnable mocker1 = () -> {
67+
Mockito.when(messageAdapter.warpMessage(any())).thenReturn("mock");
68+
Mockito.when(messageAdapter.markProcessed(any(), any())).thenReturn(true);
69+
};
70+
Runnable mocker2 = () -> {
71+
Mockito.when(messageAdapter.markProcessed(any(), any())).thenReturn(false);
72+
};
73+
Runnable mocker3 = () -> {
74+
Mockito.when(messageAdapter.getMsg(any(),any())).thenReturn(null);
75+
};
76+
77+
Runnable mocker4 = () -> {
78+
Mockito.when(messageAdapter.getHeader(any(), any())).thenReturn(null);
79+
};
80+
Runnable mocker5 = () -> {
81+
Mockito.when(messageAdapter.getHeader(any(), any(),eq(ArexConstants.RECORD_ID))).thenReturn("mock");
82+
};
83+
Runnable mocker6 = () -> {
84+
Mockito.when(messageAdapter.removeHeader(any(), any(),eq(ArexConstants.RECORD_ID))).thenReturn(true);
85+
};
86+
Runnable mocker7 = () -> {
87+
Mockito.when(messageAdapter.addHeader(any(), any(),eq(ArexConstants.RECORD_ID),eq("mock"))).thenReturn(true);
88+
};
89+
Runnable mocker8 = () -> {
90+
Mockito.when(messageAdapter.resetMsg(any())).thenReturn("mock");
91+
};
92+
Predicate<Pair<?, ?>> predicate1 = Objects::isNull;
93+
return Stream.of(
94+
arguments(emptyMocker, predicate1),
95+
arguments(mocker1, predicate1),
96+
arguments(mocker2, predicate1),
97+
arguments(mocker3, predicate1),
98+
arguments(mocker4, predicate1),
99+
arguments(mocker5, predicate1),
100+
arguments(mocker6, predicate1),
101+
arguments(mocker7, predicate1),
102+
arguments(mocker8, predicate1)
103+
);
104+
}
105+
106+
}

0 commit comments

Comments
 (0)