Skip to content

Commit 82215e4

Browse files
ozangunalpcescoffier
authored andcommitted
Fix ack coordination for empty publishers in POST_PROCESSING strategy of payload -> stream signatures
Fixes #3232
1 parent e1089fe commit 82215e4

File tree

3 files changed

+96
-16
lines changed

3 files changed

+96
-16
lines changed

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,25 @@ private void processMethodReturningAProcessorOfPayloads() {
281281
};
282282
}
283283

284+
private Multi<? extends Message<?>> handleMultiAckCoordination(Message<?> message, Flow.Publisher<?> pubFlow) {
285+
// POST_PROCESSING must not be used when returning an infinite stream
286+
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
287+
return MultiUtils.publisher(pubFlow)
288+
.onItem()
289+
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())))
290+
.onTermination().call((throwable, cancelled) -> {
291+
if (coordinator.getTrackedCount() == 0) {
292+
// If the publisher terminates without emitting any item, we need to ack the original message.
293+
if (throwable == null && !cancelled) {
294+
return Uni.createFrom().completionStage(message.ack());
295+
}
296+
return Uni.createFrom().completionStage(message.nack(throwable));
297+
} else {
298+
return Uni.createFrom().voidItem();
299+
}
300+
});
301+
}
302+
284303
/**
285304
* {@code PublisherBuilder<O> method(I payload)}
286305
*/
@@ -291,11 +310,7 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa
291310
if (isPostAck()) {
292311
try {
293312
PublisherBuilder<?> pb = invoke(getArguments(message));
294-
// POST_PROCESSING must not be used when returning an infinite stream
295-
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
296-
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
297-
.onItem()
298-
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
313+
return handleMultiAckCoordination(message, AdaptersToFlow.publisher(pb.buildRs()));
299314
} catch (Throwable t) {
300315
return handlePostInvocation(message, t);
301316
}
@@ -318,11 +333,7 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi
318333
if (isPostAck()) {
319334
try {
320335
Publisher<?> pub = invoke(getArguments(message));
321-
// POST_PROCESSING must not be used when returning an infinite stream
322-
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
323-
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
324-
.onItem()
325-
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
336+
return handleMultiAckCoordination(message, AdaptersToFlow.publisher(pub));
326337
} catch (Throwable t) {
327338
return handlePostInvocation(message, t);
328339
}
@@ -345,12 +356,7 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
345356
if (isPostAck()) {
346357
try {
347358
Flow.Publisher<?> pub = invoke(getArguments(message));
348-
// POST_PROCESSING must not be used when returning an infinite stream
349-
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
350-
return MultiUtils.publisher(pub)
351-
.onItem()
352-
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
353-
359+
return handleMultiAckCoordination(message, pub);
354360
} catch (Throwable t) {
355361
return handlePostInvocation(message, t);
356362
}

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/AcknowledgementCoordinator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.ArrayList;
44
import java.util.List;
55
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.atomic.AtomicInteger;
67
import java.util.concurrent.locks.ReentrantLock;
78

89
import org.eclipse.microprofile.reactive.messaging.Message;
@@ -21,6 +22,7 @@ public class AcknowledgementCoordinator {
2122
private final Message<?> input;
2223

2324
private volatile boolean done;
25+
private final AtomicInteger trackedCount = new AtomicInteger();
2426
private final List<Tracker> tracked = new ArrayList<>();
2527

2628
private final ReentrantLock lock = new ReentrantLock();
@@ -33,6 +35,7 @@ public Message<?> track(Message<?> msg) {
3335
lock.lock();
3436
try {
3537
Tracker tracker = new Tracker();
38+
trackedCount.getAndIncrement();
3639
tracked.add(tracker);
3740
return msg
3841
.withAck(() -> {
@@ -48,6 +51,10 @@ public Message<?> track(Message<?> msg) {
4851
}
4952
}
5053

54+
public int getTrackedCount() {
55+
return trackedCount.get();
56+
}
57+
5158
private void onAck(Tracker id) {
5259
lock.lock();
5360
try {

smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.awaitility.Awaitility.await;
55

6+
import java.util.Arrays;
67
import java.util.List;
78
import java.util.Set;
89
import java.util.concurrent.*;
10+
import java.util.stream.Stream;
911

1012
import jakarta.enterprise.context.ApplicationScoped;
1113

@@ -130,6 +132,71 @@ public void testThatMessagesAreNackedAfterFailingProcessingOfPayloadUni() throws
130132
assertThat(throwables).hasSize(2);
131133
}
132134

135+
@ApplicationScoped
136+
public static class StringToStringMultiProcessorWithPostProcessingAck {
137+
138+
@Incoming("data")
139+
@Outgoing("out")
140+
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
141+
public Multi<String> process(String incomingData) {
142+
return Multi.createFrom().items(getWordsStream(incomingData));
143+
}
144+
145+
private Stream<String> getWordsStream(String sentence) {
146+
return Arrays.stream(sentence.split(" "));
147+
}
148+
}
149+
150+
@Test
151+
public void testAckingMessagesForStringToStringMultiProcessorWithPostProcessingAck() throws InterruptedException {
152+
addBeanClass(StringToStringMultiProcessorWithPostProcessingAck.class);
153+
initialize();
154+
Emitter<String> emitter = get(EmitterBean.class).emitter();
155+
Sink sink = get(Sink.class);
156+
157+
Set<String> acked = new ConcurrentHashSet<>();
158+
Set<String> nacked = new ConcurrentHashSet<>();
159+
160+
run(acked, nacked, emitter);
161+
162+
await().until(() -> sink.list().size() == 10);
163+
assertThat(acked).hasSize(10);
164+
assertThat(nacked).hasSize(0);
165+
}
166+
167+
@ApplicationScoped
168+
public static class StringToEmptyStringMultiProcessorWithPostProcessingAck {
169+
170+
@Incoming("data")
171+
@Outgoing("out")
172+
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
173+
public Multi<String> process(String incomingData) {
174+
return Multi.createFrom().items(getWordsStream(incomingData));
175+
}
176+
177+
private Stream<String> getWordsStream(String sentence) {
178+
return Stream.empty();
179+
}
180+
}
181+
182+
@Test
183+
public void testAckingMessagesForStringToEmptyStringMultiProcessorWithPostProcessingAck() throws InterruptedException {
184+
addBeanClass(StringToEmptyStringMultiProcessorWithPostProcessingAck.class);
185+
initialize();
186+
Emitter<String> emitter = get(EmitterBean.class).emitter();
187+
Sink sink = get(Sink.class);
188+
189+
Set<String> acked = new ConcurrentHashSet<>();
190+
Set<String> nacked = new ConcurrentHashSet<>();
191+
192+
run(acked, nacked, emitter);
193+
194+
// All items are skipped
195+
await().pollDelay(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(sink.list()).isEmpty());
196+
assertThat(acked).hasSize(10);
197+
assertThat(nacked).hasSize(0);
198+
}
199+
133200
private List<Throwable> run(Set<String> acked, Set<String> nacked, Emitter<String> emitter)
134201
throws InterruptedException {
135202
List<Throwable> reasons = new CopyOnWriteArrayList<>();

0 commit comments

Comments
 (0)