Skip to content

Commit 0a5402c

Browse files
Fix retry ack handling in DownStreamMsgContext and add tests
1 parent b4e566f commit 0a5402c

File tree

2 files changed

+101
-6
lines changed

2 files changed

+101
-6
lines changed

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,36 @@ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
185185
*
186186
* @param downStreamMsgContext Down Stream Message Context
187187
*/
188-
private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
189-
List<CloudEvent> msgExts = new ArrayList<>();
190-
msgExts.add(downStreamMsgContext.event);
191-
log.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.event.getSubject(),
192-
downStreamMsgContext.seq, downStreamMsgContext.event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
193-
downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
188+
private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
189+
if (downStreamMsgContext.consumer == null
190+
|| downStreamMsgContext.consumeConcurrentlyContext == null
191+
|| downStreamMsgContext.event == null) {
192+
193+
log.warn(
194+
"eventMeshAckMsg skipped, consumer:{}, context:{}, event:{}",
195+
downStreamMsgContext.consumer == null,
196+
downStreamMsgContext.consumeConcurrentlyContext == null,
197+
downStreamMsgContext.event == null
198+
);
199+
return;
194200
}
195201

202+
List<CloudEvent> msgExts = new ArrayList<>();
203+
msgExts.add(downStreamMsgContext.event);
204+
205+
log.warn(
206+
"eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}",
207+
downStreamMsgContext.event.getSubject(),
208+
downStreamMsgContext.seq,
209+
downStreamMsgContext.event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS)
210+
);
211+
212+
downStreamMsgContext.consumer.updateOffset(
213+
msgExts,
214+
downStreamMsgContext.consumeConcurrentlyContext
215+
);
216+
}
217+
196218
@Override
197219
public void doRun() {
198220
retry();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;
2+
3+
import org.apache.eventmesh.api.AbstractContext;
4+
import org.apache.eventmesh.common.protocol.SubscriptionItem;
5+
import org.apache.eventmesh.common.protocol.SubscriptionMode;
6+
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
7+
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
8+
9+
import io.cloudevents.CloudEvent;
10+
import io.cloudevents.core.builder.CloudEventBuilder;
11+
12+
import org.junit.jupiter.api.Test;
13+
14+
import java.net.URI;
15+
16+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
17+
18+
class DownStreamMsgContextTest {
19+
20+
private CloudEvent buildEvent() {
21+
return CloudEventBuilder.v1()
22+
.withId("test-id")
23+
.withSource(URI.create("test://source"))
24+
.withType("test-type")
25+
.withSubject("test-topic")
26+
.build();
27+
}
28+
29+
private SubscriptionItem buildSubscriptionItem() {
30+
SubscriptionItem item = new SubscriptionItem();
31+
item.setMode(SubscriptionMode.CLUSTERING);
32+
return item;
33+
}
34+
35+
@Test
36+
void retry_shouldNotThrowException_whenConsumerOrContextIsNull() {
37+
38+
CloudEvent event = buildEvent();
39+
40+
// Intentionally set to null to simulate edge case
41+
Session session = null;
42+
MQConsumerWrapper consumer = null;
43+
AbstractContext context = null;
44+
45+
DownStreamMsgContext msgContext = new DownStreamMsgContext(
46+
event,
47+
session,
48+
consumer,
49+
context,
50+
false,
51+
buildSubscriptionItem()
52+
);
53+
54+
assertDoesNotThrow(msgContext::retry);
55+
}
56+
57+
@Test
58+
void ackMsg_shouldNotThrowException_whenDependenciesAreNull() {
59+
60+
CloudEvent event = buildEvent();
61+
62+
DownStreamMsgContext msgContext = new DownStreamMsgContext(
63+
event,
64+
null,
65+
null,
66+
null,
67+
false,
68+
buildSubscriptionItem()
69+
);
70+
71+
assertDoesNotThrow(msgContext::ackMsg);
72+
}
73+
}

0 commit comments

Comments
 (0)