Skip to content

Commit 07a3b78

Browse files
committed
新增mqtt组件
1 parent db56c67 commit 07a3b78

File tree

5 files changed

+55
-1
lines changed

5 files changed

+55
-1
lines changed

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/MessageQueueExtractor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ private void executePostProcess() {
5151
if (ContextManager.needReplay()) {
5252
adapter.addHeader(messageChannel,message, ArexConstants.REPLAY_ID,ContextManager.currentContext().getReplayId());
5353
}
54+
// Think about other ways to replace the head
55+
adapter.resetMsg(message);
5456
}
5557

5658
private void doExecute() {

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public interface MessageAdapter<MC,Msg> {
2323

2424
void addHeader(MC mc,Msg msg,String key,String value);
2525

26+
Msg resetMsg(Msg msg);
27+
2628
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/adapter/MessageAdapterImpl.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22

33
import io.arex.inst.mqtt.warp.GenericMessageWarp;
44
import io.arex.inst.mqtt.warp.MessageHeaderWarp;
5+
import io.arex.inst.runtime.util.LogUtil;
56
import org.springframework.messaging.Message;
67
import org.springframework.messaging.MessageChannel;
78
import org.springframework.messaging.MessageHeaders;
89
import org.springframework.messaging.support.GenericMessage;
910

11+
import java.lang.reflect.Field;
1012
import java.nio.charset.StandardCharsets;
1113

1214
/**
@@ -34,7 +36,10 @@ public MessageChannel warpMC(Object messageChannel) {
3436
if (messageChannel == null){
3537
return null;
3638
}
37-
return (MessageChannel) messageChannel;
39+
if (messageChannel instanceof MessageChannel){
40+
return (MessageChannel) messageChannel;
41+
}
42+
return null;
3843
}
3944

4045
@Override
@@ -57,6 +62,10 @@ public Message warpMessage(Object message) {
5762

5863
@Override
5964
public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) {
65+
if (msg instanceof GenericMessageWarp){
66+
GenericMessageWarp messageTemp = (GenericMessageWarp) msg;
67+
return messageTemp.getMessageHeaderWarp();
68+
}
6069
return msg.getHeaders();
6170
}
6271

@@ -74,6 +83,12 @@ public boolean markProcessed(Message message, String flagKey) {
7483

7584
@Override
7685
public String getHeader(MessageChannel messageChannel, Message message, String key) {
86+
if (message instanceof GenericMessageWarp) {
87+
GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message;
88+
Object object = genericMessageWarp.get(key);
89+
return object != null ? object.toString() : null;
90+
}
91+
7792
if(message instanceof GenericMessage){
7893
Object obj = message.getHeaders().get(key);
7994
return obj != null ? obj.toString() : null;
@@ -96,4 +111,23 @@ public void addHeader(MessageChannel messageChannel, Message message, String key
96111
genericMessageWarp.put(key,value);
97112
}
98113
}
114+
115+
@Override
116+
public Message resetMsg(Message message) {
117+
if (message instanceof GenericMessageWarp){
118+
try {
119+
GenericMessageWarp messageWarp = (GenericMessageWarp) message;
120+
Field headers = message.getClass().getSuperclass().getDeclaredField("headers");
121+
headers.setAccessible(true);
122+
headers.set(message, messageWarp.getMessageHeaderWarp());
123+
} catch (NoSuchFieldException e) {
124+
LogUtil.warn("MessageAdapterImpl.resetMsg - NoSuchFieldException", e);
125+
} catch (IllegalAccessException e) {
126+
LogUtil.warn("MessageAdapterImpl.resetMsg - IllegalAccessException", e);
127+
}
128+
}
129+
return message;
130+
}
131+
132+
99133
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/GenericMessageWarp.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,15 @@ public void put(String key, String value) {
3838
this.messageHeaderWarp.put(key, value);
3939
}
4040
}
41+
42+
public Object get(String key){
43+
if (this.messageHeaderWarp != null) {
44+
return this.messageHeaderWarp.get(key);
45+
}
46+
return getHeaders().get(key);
47+
}
48+
49+
public MessageHeaderWarp getMessageHeaderWarp() {
50+
return messageHeaderWarp;
51+
}
4152
}

arex-instrumentation/mq/arex-integration-mqtt/src/main/java/io/arex/inst/mqtt/warp/MessageHeaderWarp.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public class MessageHeaderWarp extends MessageHeaders {
1414

1515
public MessageHeaderWarp(MessageHeaders messageHeaders) {
1616
super(messageHeaders);
17+
this.put(ID,messageHeaders.get(ID));
18+
this.put(TIMESTAMP,messageHeaders.get(TIMESTAMP));
1719
}
1820

1921
public MessageHeaderWarp(Map<String, Object> headers) {
@@ -25,6 +27,9 @@ public MessageHeaderWarp(Map<String, Object> headers, UUID id, Long timestamp) {
2527
}
2628

2729
public Object put(String key, Object value){
30+
if (value == null){
31+
return null;
32+
}
2833
super.getRawHeaders().put(key,value);
2934
return value;
3035
}

0 commit comments

Comments
 (0)