Skip to content

Commit 4192a75

Browse files
committed
提交初次mqtt代码
增加mqtt相关参数
1 parent 5d22de3 commit 4192a75

File tree

8 files changed

+139
-0
lines changed

8 files changed

+139
-0
lines changed

arex-agent-bootstrap/src/main/java/io/arex/agent/bootstrap/model/MockCategoryType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public class MockCategoryType implements Serializable {
1919
public static final MockCategoryType DUBBO_CONSUMER = createDependency("DubboConsumer");
2020
public static final MockCategoryType DUBBO_PROVIDER = createEntryPoint("DubboProvider");
2121
public static final MockCategoryType DUBBO_STREAM_PROVIDER = createDependency("DubboStreamProvider");
22+
public static final MockCategoryType MQTT_MESSAGE_CONSUMER = createEntryPoint("MqttMessageConsumer");
23+
2224

2325
private String name;
2426
private boolean entryPoint;

arex-agent/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@
147147
<artifactId>arex-jcasbin</artifactId>
148148
<version>${project.version}</version>
149149
</dependency>
150+
<dependency>
151+
<groupId>${project.groupId}</groupId>
152+
<artifactId>arex-integration-mqtt</artifactId>
153+
<version>${project.version}</version>
154+
</dependency>
150155
<!--Agent instrumentation end-->
151156

152157
<dependency>

arex-instrumentation-api/src/main/java/io/arex/inst/runtime/util/MockUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ public static ArexMocker createDubboStreamProvider(String operationName) {
6868
return create(MockCategoryType.DUBBO_STREAM_PROVIDER, operationName);
6969
}
7070

71+
public static ArexMocker createMqttConsumer(String operationName){
72+
return create(MockCategoryType.MQTT_MESSAGE_CONSUMER,operationName);
73+
}
74+
75+
7176
public static ArexMocker create(MockCategoryType categoryType, String operationName) {
7277
ArexMocker mocker = new ArexMocker();
7378
long createTime = System.currentTimeMillis();
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>arex-instrumentation-parent</artifactId>
7+
<groupId>io.arex</groupId>
8+
<version>0.2.0</version>
9+
<relativePath>../../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>arex-integration-mqtt</artifactId>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>org.springframework.integration</groupId>
18+
<artifactId>spring-integration-mqtt</artifactId>
19+
<version>5.5.10</version>
20+
<scope>provided</scope>
21+
</dependency>
22+
</dependencies>
23+
</project>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.arex.inst.mqtt;
2+
3+
import io.arex.agent.bootstrap.model.Mocker;
4+
import io.arex.inst.runtime.util.MockUtils;
5+
6+
/**
7+
* MQTTAdapterHelper
8+
*/
9+
public class MQTTAdapterHelper {
10+
11+
public static Mocker createMocker(String operationName) {
12+
Mocker mocker = MockUtils.createMqttConsumer(operationName);
13+
mocker.getTargetRequest().setType(Byte.class.getName());
14+
return mocker;
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.arex.inst.mqtt.inst;
2+
3+
import io.arex.agent.bootstrap.model.Mocker;
4+
import io.arex.agent.bootstrap.util.StringUtil;
5+
import io.arex.inst.extension.MethodInstrumentation;
6+
import io.arex.inst.extension.TypeInstrumentation;
7+
import io.arex.inst.mqtt.MQTTAdapterHelper;
8+
import io.arex.inst.runtime.context.ContextManager;
9+
import io.arex.inst.runtime.util.MockUtils;
10+
import net.bytebuddy.asm.Advice;
11+
import net.bytebuddy.description.method.MethodDescription;
12+
import net.bytebuddy.description.type.TypeDescription;
13+
import net.bytebuddy.matcher.ElementMatcher;
14+
import org.eclipse.paho.client.mqttv3.MqttMessage;
15+
16+
import java.util.Base64;
17+
import java.util.Collections;
18+
import java.util.List;
19+
20+
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
21+
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
22+
import static net.bytebuddy.matcher.ElementMatchers.named;
23+
import static net.bytebuddy.matcher.ElementMatchers.not;
24+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
25+
26+
/**
27+
* EclipseInstrumentationV3
28+
*/
29+
public class EclipseInstrumentationV3 extends TypeInstrumentation {
30+
@Override
31+
protected ElementMatcher<TypeDescription> typeMatcher() {
32+
return not(isInterface()).and(hasSuperType(named("org.eclipse.paho.client.mqttv3.MqttCallback")));
33+
}
34+
35+
@Override
36+
public List<MethodInstrumentation> methodAdvices() {
37+
ElementMatcher<MethodDescription> matcher = named("messageArrived")
38+
.and(takesArgument(0, named("java.lang.String")))
39+
.and(takesArgument(1, named("org.eclipse.paho.client.mqttv3.MqttMessage")));
40+
return Collections.singletonList(new MethodInstrumentation(matcher, ArrivedAdvice.class.getName()));
41+
}
42+
43+
public static class ArrivedAdvice {
44+
45+
@Advice.OnMethodEnter(suppress = Throwable.class)
46+
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) String topic,
47+
@Advice.Argument(value = 1, readOnly = false) MqttMessage mqttMessage) {
48+
if (StringUtil.isEmpty(topic) || mqttMessage == null) {
49+
return;
50+
}
51+
if (ContextManager.needRecordOrReplay()){
52+
Mocker mocker = MQTTAdapterHelper.createMocker(topic);
53+
mocker.getTargetRequest().setBody(Base64.getEncoder().encodeToString(mqttMessage.getPayload()));
54+
if (ContextManager.needReplay()) {
55+
MockUtils.replayMocker(mocker);
56+
} else if (ContextManager.needRecord()) {
57+
MockUtils.recordMocker(mocker);
58+
}
59+
}
60+
}
61+
}
62+
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.arex.inst.mqtt.inst;
2+
3+
import com.google.auto.service.AutoService;
4+
import io.arex.inst.extension.ModuleInstrumentation;
5+
import io.arex.inst.extension.TypeInstrumentation;
6+
7+
import java.util.Arrays;
8+
import java.util.List;
9+
10+
/**
11+
* MQTTAdapterModuleInstrumentation
12+
*/
13+
@AutoService(ModuleInstrumentation.class)
14+
public class MQTTAdapterModuleInstrumentation extends ModuleInstrumentation {
15+
16+
public MQTTAdapterModuleInstrumentation() {
17+
super("mqtt-adapter");
18+
}
19+
20+
@Override
21+
public List<TypeInstrumentation> instrumentationTypes() {
22+
return Arrays.asList(new EclipseInstrumentationV3());
23+
}
24+
}

arex-instrumentation/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
<module>authentication/arex-shiro</module>
4444
<module>authentication/arex-jcasbin</module>
4545
<module>foundation/arex-serializer</module>
46+
<module>mq/arex-integration-mqtt</module>
4647
</modules>
4748
<dependencies>
4849
<dependency>

0 commit comments

Comments
 (0)