diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java index 3bfe4a2e9..bd8794d32 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java @@ -108,6 +108,7 @@ private class PausableProcessor extends MultiOperatorProcessor { private final AtomicInteger wip = new AtomicInteger(); private final AtomicInteger strictBoundCounter = new AtomicInteger(0); private volatile boolean upstreamCompleted; + private final AtomicBoolean clearQueue = new AtomicBoolean(); PausableProcessor(MultiSubscriber downstream) { super(downstream); @@ -156,11 +157,18 @@ void drain() { if (!unbounded) { strictBoundCounter.decrementAndGet(); } + if (clearQueue.get()) { + break; + } downstream.onItem(item); } if (!paused.get() && upstreamCompleted) { super.onCompletion(); } + if (clearQueue.compareAndSet(true, false)) { + queue.clear(); + strictBoundCounter.set(0); + } if (wip.decrementAndGet() == 0) { return; } @@ -168,9 +176,12 @@ void drain() { } void clearQueue() { - if (queue != null) { - strictBoundCounter.set(0); + if (queue != null && clearQueue.compareAndSet(false, true) && wip.getAndIncrement() == 0) { + // nothing was currently dispatched, clearing the queue. queue.clear(); + clearQueue.set(false); + strictBoundCounter.set(0); + wip.decrementAndGet(); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java index 760caf27c..4b29db24e 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java @@ -709,4 +709,62 @@ public void testMultiplePauseResumeCycles() { await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); } + @Test + public void testConcurrentResumeAndCancel() { + DemandPauser pauser = new DemandPauser(); + + // Slow consumer that adds delay to each item processing + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(1)) + .map(Long::intValue) + .select().first(1000) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferUnconditionally() + .using(pauser) + .invoke(ignored -> { + try { + // Slow consumer - 10ms per item to keep drain loop busy + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items to arrive + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10)); + + // Pause to let buffer fill up with lots of items + pauser.pause(); + await().pollDelay(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(100)); + + int bufferSize = pauser.bufferSize(); + assertThat(bufferSize).isGreaterThan(100); + + // Resume and clear at the same time from different threads + // This reproduces the race condition: drain loop starts and clearQueue is called concurrently + Thread resumeThread = new Thread(() -> pauser.resume()); + Thread clearThread = new Thread(() -> sub.cancel()); + + resumeThread.start(); + clearThread.start(); + + // Wait for both operations to complete + await().untilAsserted(() -> { + assertThat(resumeThread.isAlive()).isFalse(); + assertThat(clearThread.isAlive()).isFalse(); + }); + + // No concurrent modification exception should occur + assertThat(sub.getFailure()).isNull(); + + // Buffer should be cleared + await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0)); + + // Stream should continue to work normally + await().pollDelay(Duration.ofMillis(200)).until(() -> true); + assertThat(sub.getItems()).hasSizeGreaterThan(0); + } + }