Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private class PausableProcessor extends MultiOperatorProcessor<T, T> {
private final AtomicInteger wip = new AtomicInteger();
private final AtomicInteger strictBoundCounter = new AtomicInteger(0);
private volatile boolean upstreamCompleted;
private final AtomicBoolean clearQueue = new AtomicBoolean();

PausableProcessor(MultiSubscriber<? super T> downstream) {
super(downstream);
Expand Down Expand Up @@ -156,21 +157,31 @@ void drain() {
if (!unbounded) {
strictBoundCounter.decrementAndGet();
}
if (clearQueue.get()) {
break;
}
downstream.onItem(item);
}
if (!paused.get() && upstreamCompleted) {
super.onCompletion();
}
if (clearQueue.compareAndSet(true, false)) {
queue.clear();
strictBoundCounter.set(0);
}
if (wip.decrementAndGet() == 0) {
return;
}
}
}

void clearQueue() {
if (queue != null) {
strictBoundCounter.set(0);
if (queue != null && clearQueue.compareAndSet(false, true) && wip.getAndIncrement() == 0) {
// nothing was currently dispatched, clearing the queue.
queue.clear();
clearQueue.set(false);
strictBoundCounter.set(0);
wip.decrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,62 @@ public void testMultiplePauseResumeCycles() {
await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100));
}

@Test
public void testConcurrentResumeAndCancel() {
DemandPauser pauser = new DemandPauser();

// Slow consumer that adds delay to each item processing
AssertSubscriber<Integer> sub = Multi.createFrom().ticks().every(Duration.ofMillis(1))
.map(Long::intValue)
.select().first(1000)
.pauseDemand()
.bufferStrategy(BackPressureStrategy.BUFFER)
.bufferUnconditionally()
.using(pauser)
.invoke(ignored -> {
try {
// Slow consumer - 10ms per item to keep drain loop busy
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// Wait for some items to arrive
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10));

// Pause to let buffer fill up with lots of items
pauser.pause();
await().pollDelay(Duration.ofMillis(200))
.untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(100));

int bufferSize = pauser.bufferSize();
assertThat(bufferSize).isGreaterThan(100);

// Resume and clear at the same time from different threads
// This reproduces the race condition: drain loop starts and clearQueue is called concurrently
Thread resumeThread = new Thread(() -> pauser.resume());
Thread clearThread = new Thread(() -> sub.cancel());

resumeThread.start();
clearThread.start();

// Wait for both operations to complete
await().untilAsserted(() -> {
assertThat(resumeThread.isAlive()).isFalse();
assertThat(clearThread.isAlive()).isFalse();
});

// No concurrent modification exception should occur
assertThat(sub.getFailure()).isNull();

// Buffer should be cleared
await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0));

// Stream should continue to work normally
await().pollDelay(Duration.ofMillis(200)).until(() -> true);
assertThat(sub.getItems()).hasSizeGreaterThan(0);
}

}
Loading