Skip to content

Commit 95dfed7

Browse files
committed
fix: demand pauser clear queue inside drain loop to avoid concurrent read access
1 parent 8b8d066 commit 95dfed7

File tree

2 files changed

+71
-2
lines changed

2 files changed

+71
-2
lines changed

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ private class PausableProcessor extends MultiOperatorProcessor<T, T> {
108108
private final AtomicInteger wip = new AtomicInteger();
109109
private final AtomicInteger strictBoundCounter = new AtomicInteger(0);
110110
private volatile boolean upstreamCompleted;
111+
private final AtomicBoolean clearQueue = new AtomicBoolean();
111112

112113
PausableProcessor(MultiSubscriber<? super T> downstream) {
113114
super(downstream);
@@ -156,21 +157,31 @@ void drain() {
156157
if (!unbounded) {
157158
strictBoundCounter.decrementAndGet();
158159
}
160+
if (clearQueue.get()) {
161+
break;
162+
}
159163
downstream.onItem(item);
160164
}
161165
if (!paused.get() && upstreamCompleted) {
162166
super.onCompletion();
163167
}
168+
if (clearQueue.compareAndSet(true, false)) {
169+
queue.clear();
170+
strictBoundCounter.set(0);
171+
}
164172
if (wip.decrementAndGet() == 0) {
165173
return;
166174
}
167175
}
168176
}
169177

170178
void clearQueue() {
171-
if (queue != null) {
172-
strictBoundCounter.set(0);
179+
if (queue != null && clearQueue.compareAndSet(false, true) && wip.getAndIncrement() == 0) {
180+
// nothing was currently dispatched, clearing the queue.
173181
queue.clear();
182+
clearQueue.set(false);
183+
strictBoundCounter.set(0);
184+
wip.decrementAndGet();
174185
}
175186
}
176187

implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,4 +709,62 @@ public void testMultiplePauseResumeCycles() {
709709
await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100));
710710
}
711711

712+
@Test
713+
public void testConcurrentResumeAndCancel() {
714+
DemandPauser pauser = new DemandPauser();
715+
716+
// Slow consumer that adds delay to each item processing
717+
AssertSubscriber<Integer> sub = Multi.createFrom().ticks().every(Duration.ofMillis(1))
718+
.map(Long::intValue)
719+
.select().first(1000)
720+
.pauseDemand()
721+
.bufferStrategy(BackPressureStrategy.BUFFER)
722+
.bufferUnconditionally()
723+
.using(pauser)
724+
.invoke(ignored -> {
725+
try {
726+
// Slow consumer - 10ms per item to keep drain loop busy
727+
Thread.sleep(50);
728+
} catch (InterruptedException e) {
729+
Thread.currentThread().interrupt();
730+
}
731+
})
732+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
733+
734+
// Wait for some items to arrive
735+
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10));
736+
737+
// Pause to let buffer fill up with lots of items
738+
pauser.pause();
739+
await().pollDelay(Duration.ofMillis(200))
740+
.untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(100));
741+
742+
int bufferSize = pauser.bufferSize();
743+
assertThat(bufferSize).isGreaterThan(100);
744+
745+
// Resume and clear at the same time from different threads
746+
// This reproduces the race condition: drain loop starts and clearQueue is called concurrently
747+
Thread resumeThread = new Thread(() -> pauser.resume());
748+
Thread clearThread = new Thread(() -> sub.cancel());
749+
750+
resumeThread.start();
751+
clearThread.start();
752+
753+
// Wait for both operations to complete
754+
await().untilAsserted(() -> {
755+
assertThat(resumeThread.isAlive()).isFalse();
756+
assertThat(clearThread.isAlive()).isFalse();
757+
});
758+
759+
// No concurrent modification exception should occur
760+
assertThat(sub.getFailure()).isNull();
761+
762+
// Buffer should be cleared
763+
await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0));
764+
765+
// Stream should continue to work normally
766+
await().pollDelay(Duration.ofMillis(200)).until(() -> true);
767+
assertThat(sub.getItems()).hasSizeGreaterThan(0);
768+
}
769+
712770
}

0 commit comments

Comments
 (0)