Skip to content

Commit 708589e

Browse files
committed
增加mqtt代码
1 parent 405c5ab commit 708589e

File tree

6 files changed

+45
-58
lines changed

6 files changed

+45
-58
lines changed

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MQTTAdapterHelper.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
import io.arex.agent.bootstrap.model.Mocker;
44
import io.arex.agent.bootstrap.util.StringUtil;
5+
import io.arex.inst.mqtt.adapter.MessageAdapter;
56
import io.arex.inst.runtime.config.Config;
67
import io.arex.inst.runtime.listener.CaseEvent;
78
import io.arex.inst.runtime.listener.CaseEventDispatcher;
9+
import io.arex.inst.runtime.listener.EventSource;
810
import io.arex.inst.runtime.model.ArexConstants;
11+
import io.arex.inst.runtime.util.IgnoreUtils;
912
import io.arex.inst.runtime.util.MockUtils;
1013

1114
/**
@@ -29,6 +32,23 @@ public static <MC, Msg> void onServiceEnter(MessageAdapter<MC, Msg> adapter, Obj
2932
return;
3033
}
3134
CaseEventDispatcher.onEvent(CaseEvent.ofEnterEvent());
35+
if (shouldSkip(adapter, mc, msg)) {
36+
return ;
37+
}
38+
String caseId = adapter.getHeader(mc, msg, ArexConstants.RECORD_ID);
39+
String excludeMockTemplate = adapter.getHeader(mc, msg, ArexConstants.HEADER_EXCLUDE_MOCK);
40+
CaseEventDispatcher.onEvent(CaseEvent.ofCreateEvent(EventSource.of(caseId, excludeMockTemplate)));
41+
}
42+
43+
44+
public static <MC, Msg> void onServiceExit(MessageAdapter<MC, Msg> adapter, Object messageChannel, Object message){
45+
Msg msg = adapter.warpMessage(message);
46+
MC mc = adapter.warpMC(messageChannel);
47+
if (msg == null || mc == null) {
48+
return;
49+
}
50+
adapter.removeHeader(mc,msg,PROCESSED_FLAG);
51+
new MessageQueueExtractor<>( mc, msg,adapter).execute();
3252
}
3353

3454

@@ -50,19 +70,14 @@ private static<MC, Msg> boolean shouldSkip(MessageAdapter<MC, Msg> adapter,MC m
5070
if (Boolean.parseBoolean(adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP))) {
5171
return true;
5272
}
53-
73+
String topic = adapter.getHeader(mc, msg, ArexConstants.REPLAY_WARM_UP);
74+
if (StringUtil.isEmpty(topic)) {
75+
return false;
76+
}
77+
if (IgnoreUtils.ignoreOperation(topic)) {
78+
return true;
79+
}
80+
return Config.get().invalidRecord(topic);
5481
}
5582

56-
57-
58-
59-
60-
61-
62-
63-
public static Mocker createMocker(String operationName) {
64-
Mocker mocker = MockUtils.createMqttConsumer(operationName);
65-
mocker.getTargetRequest().setType(Byte.class.getName());
66-
return mocker;
67-
}
6883
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MessageAdapter.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MessageQueueExtractor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ private void executePostProcess() {
5151
if (ContextManager.needReplay()) {
5252
adapter.addHeader(messageChannel,message, ArexConstants.REPLAY_ID,ContextManager.currentContext().getReplayId());
5353
}
54-
// Think about other ways to replace the head
55-
adapter.resetMsg(message);
5654
}
5755

5856
private void doExecute() {

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/GenericMessageImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.arex.inst.mqtt.adapter;
22

3-
import io.arex.inst.mqtt.MessageAdapter;
43
import org.springframework.messaging.MessageChannel;
54
import org.springframework.messaging.MessageHeaders;
65
import org.springframework.messaging.support.GenericMessage;
76

87
import java.nio.charset.StandardCharsets;
8+
import java.util.HashMap;
9+
import java.util.UUID;
910

1011
/**
1112
* GenericMessageAdapter
@@ -20,9 +21,7 @@ public static GenericMessageImpl getInstance() {
2021

2122
@Override
2223
public byte[] getMsg(MessageChannel messageChannel, GenericMessage msg) {
23-
2424
Object payload = msg.getPayload();
25-
2625
if (payload instanceof byte[]){
2726
return ((byte[]) payload);
2827
}
@@ -67,4 +66,14 @@ public String getHeader(MessageChannel messageChannel, GenericMessage genericMes
6766
Object obj = genericMessage.getHeaders().get(key);
6867
return obj != null ? obj.toString() : null;
6968
}
69+
70+
@Override
71+
public void removeHeader(MessageChannel messageChannel, GenericMessage genericMessage, String key) {
72+
genericMessage.getHeaders().remove(key);
73+
}
74+
75+
@Override
76+
public void addHeader(MessageChannel messageChannel, GenericMessage genericMessage, String key, String value) {
77+
genericMessage.getHeaders().put(key,value);
78+
}
7079
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,4 @@ public interface MessageAdapter<MC,Msg> {
2323

2424
void addHeader(MC mc,Msg msg,String key,String value);
2525

26-
Msg resetMsg(Msg msg);
27-
2826
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/inst/MQGenericInstrumentationV3.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package io.arex.inst.mqtt.inst;
22

3-
import io.arex.agent.bootstrap.internal.Pair;
43
import io.arex.inst.extension.MethodInstrumentation;
54
import io.arex.inst.extension.TypeInstrumentation;
65
import io.arex.inst.mqtt.MQTTAdapterHelper;
7-
import io.arex.inst.mqtt.adapter.MessageAdapterImpl;
6+
import io.arex.inst.mqtt.adapter.GenericMessageImpl;
87
import net.bytebuddy.asm.Advice;
98
import net.bytebuddy.description.method.MethodDescription;
109
import net.bytebuddy.description.type.TypeDescription;
@@ -41,24 +40,14 @@ public static class ArrivedAdvice {
4140

4241
@Advice.OnMethodEnter(suppress = Throwable.class)
4342
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
44-
@Advice.Argument(value = 1, readOnly = false) Message<?> message,
45-
@Advice.Local("channelMsgPair") Pair<MessageChannel, Message> pair) {
46-
Pair<MessageChannel, Message> messageChannelMessagePair =
47-
MQTTAdapterHelper.onServiceEnter(MessageAdapterImpl.getInstance(), messageChannel, message);
48-
if (messageChannelMessagePair == null){
49-
return;
50-
}
51-
pair = messageChannelMessagePair;
43+
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
44+
MQTTAdapterHelper.onServiceEnter(GenericMessageImpl.getInstance(),messageChannel,message);
5245
}
5346

5447
@Advice.OnMethodExit(suppress = Throwable.class)
5548
public static void onExit(@Advice.Argument(value = 0, readOnly = false) MessageChannel messageChannel,
56-
@Advice.Argument(value = 1, readOnly = false) Message<?> message,
57-
@Advice.Local("channelMsgPair") Pair<MessageChannel, Message> pair) {
58-
if (pair != null){
59-
message = pair.getSecond();
60-
}
61-
MQTTAdapterHelper.onServiceExit(MessageAdapterImpl.getInstance(), messageChannel, message);
49+
@Advice.Argument(value = 1, readOnly = false) Message<?> message) {
50+
MQTTAdapterHelper.onServiceExit(GenericMessageImpl.getInstance(), messageChannel, message);
6251
}
6352

6453
}

0 commit comments

Comments
 (0)