diff --git a/documentation/docs/guides/controlling-demand.md b/documentation/docs/guides/controlling-demand.md index aa5c051cb..74b64bb8f 100644 --- a/documentation/docs/guides/controlling-demand.md +++ b/documentation/docs/guides/controlling-demand.md @@ -53,3 +53,101 @@ You can also define a custom function that provides a capping value based on a c Here we have a function that requests 75% of the downstream requests. Note that the function must return a value `n` that satisfies `(0 < n <= requested)` where `requested` is the downstream demand. + +## Pausing the demand + +The `Multi.pauseDemand()` operator provides fine-grained control over demand propagation in reactive streams. +Unlike cancellation, which terminates the subscription, pausing allows to suspend demand without unsubscribing from the upstream. +This is useful for implementing flow control patterns where item flow needs to be paused based on external conditions. + +### Basic pausing and resuming + +The `pauseDemand()` operator works with a `DemandPauser` handle that allows to control the stream: + +```java linenums="1" +{{ insert('java/guides/operators/PausingDemandTest.java', 'basic') }} +``` + +The `DemandPauser` provides methods to: + +- `pause()`: Stop propagating demand to upstream +- `resume()`: Resume demand propagation and deliver buffered items +- `isPaused()`: Check the current pause state + +Note that a few items may still arrive after pausing due to in-flight requests that were already issued to upstream. + +### Starting in a paused state + +You can create a stream that starts paused and only begins flowing when explicitly resumed: + +```java linenums="1" +{{ insert('java/guides/operators/PausingDemandTest.java', 'initially-paused') }} +``` + +This is useful when you want to prepare a stream but delay its execution until certain conditions are met. + +### Late subscription + +By default, the upstream subscription happens immediately even when starting paused. +The `lateSubscription()` option delays the upstream subscription until the stream is resumed: + +```java linenums="1" +{{ insert('java/guides/operators/PausingDemandTest.java', 'late-subscription') }} +``` + +### Buffer strategies + +When a stream is paused, the operator stops requesting new items from upstream. +However, items that were already requested (due to downstream demand) may still arrive. +Buffer strategies control what happens to these in-flight items. + +The `pauseDemand()` operator supports three buffer strategies: `BUFFER` (default), `DROP`, and `IGNORE`. +Configuring any other strategy will throw an `IllegalArgumentException`. + +#### BUFFER strategy (default) + +Already-requested items are buffered while paused and delivered when resumed: + +```java linenums="1" +{{ insert('java/guides/operators/PausingDemandTest.java', 'buffer-strategy') }} +``` + +You can configure the buffer size: + +- `bufferUnconditionally()`: Unbounded buffer +- `bufferSize(n)`: Buffer up to `n` items, then fail with buffer overflow + +When the buffer overflows, the stream fails with an `IllegalStateException`. + +**Important**: The buffer only holds items that were already requested from upstream before pausing. +When paused, no new requests are issued to upstream, so the buffer size is bounded by the outstanding demand at the time of pausing. + +#### DROP strategy + +Already-requested items are dropped while paused: + +```java linenums="1" +{{ insert('java/guides/operators/PausingDemandTest.java', 'drop-strategy') }} +``` + +Items that arrive while paused are discarded, and when resumed, the stream continues requesting fresh items. + +#### IGNORE strategy + +Already-requested items continue to flow downstream while paused. +This strategy doesn't use any buffers. +It only pauses demand from being issued to upstream, but does not pause the flow of already requested items. + +### Buffer management + +When using the BUFFER strategy, you can inspect and manage the buffer: + +```java linenums="1" +{{ insert('java/guides/operators/PausingDemandTest.java', 'buffer-management') }} +``` + +The `DemandPauser` provides: + +- `bufferSize()`: Returns the current number of buffered items +- `clearBuffer()`: Clears the buffer (only works while paused), returns `true` if successful + diff --git a/documentation/src/test/java/guides/operators/PausingDemandTest.java b/documentation/src/test/java/guides/operators/PausingDemandTest.java new file mode 100644 index 000000000..832b2990c --- /dev/null +++ b/documentation/src/test/java/guides/operators/PausingDemandTest.java @@ -0,0 +1,267 @@ +package guides.operators; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.DemandPauser; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class PausingDemandTest { + + @Test + void basicPausing() { + // + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = AssertSubscriber.create(); + Multi.createFrom().range(0, 100) + .pauseDemand().using(pauser) + // Throttle the multi + .onItem().call(i -> Uni.createFrom().nullItem() + .onItem().delayIt().by(Duration.ofMillis(10))) + .subscribe().withSubscriber(sub); + + // Unbounded request + sub.request(Long.MAX_VALUE); + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10)); + + // Pause the stream + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + int sizeWhenPaused = sub.getItems().size(); + + // Wait - no new items should arrive (except a few in-flight) + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sub.getItems()).hasSizeLessThanOrEqualTo(sizeWhenPaused + 5); + + // Resume the stream + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // All items eventually arrive + sub.awaitCompletion(); + assertThat(sub.getItems()).hasSize(100); + // + } + + @Test + void initiallyPaused() { + // + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().range(0, 50) + .pauseDemand() + .paused(true) // Start paused + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // No items arrive while paused + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sub.getItems()).isEmpty(); + assertThat(pauser.isPaused()).isTrue(); + + // Resume to start receiving items + pauser.resume(); + sub.awaitCompletion(); + assertThat(sub.getItems()).hasSize(50); + // + } + + @Test + void lateSubscription() { + // + DemandPauser pauser = new DemandPauser(); + AtomicBoolean subscribed = new AtomicBoolean(false); + + AssertSubscriber sub = Multi.createFrom().range(0, 50) + .onSubscription().invoke(() -> subscribed.set(true)) + .pauseDemand() + .paused(true) // Start paused + .lateSubscription(true) // Delay subscription until resumed + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Stream is not subscribed yet + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(subscribed.get()).isFalse(); + assertThat(sub.getItems()).isEmpty(); + + // Resume triggers subscription and item flow + pauser.resume(); + await().untilAsserted(() -> assertThat(subscribed.get()).isTrue()); + sub.awaitCompletion(); + assertThat(sub.getItems()).hasSize(50); + // + } + + @Test + void bufferStrategy() { + // + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = AssertSubscriber.create(Long.MAX_VALUE); + + Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .select().first(100) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferSize(20) // Buffer up to 20 items + .using(pauser) + .subscribe().withSubscriber(sub); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5)); + + // Pause - items buffer up to the limit + pauser.pause(); + + // Wait for buffer to fill + await().pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0)); + + // Buffer size is capped + assertThat(pauser.bufferSize()).isLessThanOrEqualTo(20); + + // Resume drains the buffer + pauser.resume(); + await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0)); + // + } + + @Test + void dropStrategy() { + // + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(5)) + .select().first(200) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.DROP) // Drop items while paused + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(20)); + + // Pause - subsequent items are dropped + pauser.pause(); + int sizeWhenPaused = sub.getItems().size(); + + // Wait while items are dropped + await().pollDelay(Duration.ofMillis(200)).until(() -> true); + + // Resume - continue from current position + pauser.resume(); + await().atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(sub.getItems().size()).isGreaterThan(sizeWhenPaused + 20)); + + sub.awaitCompletion(); + // Not all items arrived (some were dropped) + assertThat(sub.getItems()).hasSizeLessThan(200); + // + } + + @Test + void bufferManagement() { + // + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .select().first(100) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferUnconditionally() // Unbounded buffer + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5)); + + // Pause and let buffer fill + pauser.pause(); + await().pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0)); + + int bufferSize = pauser.bufferSize(); + assertThat(bufferSize).isGreaterThan(0); + + // Clear the buffer + boolean cleared = pauser.clearBuffer(); + assertThat(cleared).isTrue(); + assertThat(pauser.bufferSize()).isEqualTo(0); + + // Resume - items continue from current position (cleared items are lost) + pauser.resume(); + // + } + + @Test + void externalControl() { + // + DemandPauser pauser = new DemandPauser(); + + // Create and transform the stream + Multi stream = Multi.createFrom().range(0, 1000) + .pauseDemand().using(pauser) + .onItem().transform(i -> "Item-" + i) + .onItem().transform(String::toUpperCase); + + // Control the stream from anywhere + pauser.pause(); + + // Later, subscribe to the stream + AssertSubscriber sub = stream + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // No items while paused + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sub.getItems()).isEmpty(); + + // Resume to receive items + pauser.resume(); + sub.awaitCompletion(); + assertThat(sub.getItems()).hasSize(1000); + // + } + + @Test + void conditionalFlow() { + // + DemandPauser pauser = new DemandPauser(); + AtomicInteger errorCount = new AtomicInteger(0); + + AssertSubscriber sub = Multi.createFrom().range(0, 100) + .pauseDemand().using(pauser) + .onItem().invoke(item -> { + // Simulate error-prone processing + if (item % 20 == 0 && item > 0) { + errorCount.incrementAndGet(); + pauser.pause(); // Pause on errors + } + }) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Stream pauses when errors occur + await().untilAsserted(() -> assertThat(errorCount.get()).isGreaterThan(0)); + await().untilAsserted(() -> assertThat(pauser.isPaused()).isTrue()); + + // Handle the error, then resume + errorCount.set(0); + pauser.resume(); + + // Stream continues + await().pollDelay(Duration.ofMillis(50)).until(() -> true); + assertThat(sub.getItems().size()).isGreaterThan(20); + // + } +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/Multi.java b/implementation/src/main/java/io/smallrye/mutiny/Multi.java index 05e35151a..1e52f570c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Multi.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Multi.java @@ -637,6 +637,48 @@ default Multi> attachContext() { @CheckReturnValue MultiDemandPacing paceDemand(); + /** + * Allows pausing and resuming demand propagation to upstream using a {@link io.smallrye.mutiny.subscription.DemandPauser}. + *

+ * Unlike cancellation which terminates the subscription, temporarily pausing suspends the demand without unsubscribing. + * This is useful for implementing flow control patterns where demand needs to be temporarily suspended based on + * external conditions (e.g., downstream system availability, rate limiting, resource constraints). + *

+ * Example: + * + *

+     * {@code
+     * DemandPauser pauser = new DemandPauser();
+     *
+     * Multi.createFrom().range(0, 100)
+     *         .pauseDemand().using(pauser)
+     *         .onItem().call(i -> Uni.createFrom().nullItem()
+     *                 .onItem().delayIt().by(Duration.ofMillis(10)))
+     *         .subscribe().with(System.out::println);
+     *
+     * // Later, from anywhere in the application:
+     * pauser.pause(); // Stop requesting new items
+     * pauser.resume(); // Continue requesting items
+     * }
+     * 
+ *

+ * Reactive Streams Compliance: + * This operator is compliant with the Reactive Streams specification as long as the + * {@link io.smallrye.mutiny.subscription.DemandPauser} used to manage demand is eventually resumed. + * When the upstream signals completion, + * if the stream is paused, the completion signal is deferred until the stream is resumed, at which point + * all buffered items are delivered to downstream before the completion signal is propagated. + * When the upstream signals failure, + * any buffered items are discarded and the error is immediately propagated downstream. + * + * @return a {@link MultiDemandPausing} to configure the pausing behavior + * @see io.smallrye.mutiny.subscription.DemandPauser + */ + @CheckReturnValue + default MultiDemandPausing pauseDemand() { + throw new UnsupportedOperationException("Default method added to limit binary incompatibility"); + } + /** * Cap all downstream subscriber requests to a maximum value. *

diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiDemandPausing.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiDemandPausing.java new file mode 100644 index 000000000..e1f1d1221 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiDemandPausing.java @@ -0,0 +1,145 @@ +package io.smallrye.mutiny.groups; + +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; +import static io.smallrye.mutiny.helpers.ParameterValidation.positive; + +import io.smallrye.common.annotation.CheckReturnValue; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.operators.multi.MultiDemandPausingOp; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.DemandPauser; + +/** + * Configures a pausable {@link Multi} stream. + *

+ * This class allows configuring how a stream behaves when paused, including: + *

+ * + * @param the type of items emitted by the stream + */ +public class MultiDemandPausing { + private final AbstractMulti upstream; + private boolean paused = false; + private boolean lateSubscription = false; + private int bufferSize = Infrastructure.getMultiOverflowDefaultBufferSize(); + private boolean unbounded = false; + private BackPressureStrategy bufferStrategy = BackPressureStrategy.BUFFER; + + public MultiDemandPausing(AbstractMulti upstream) { + this.upstream = upstream; + } + + /** + * Sets the initial pause state of the stream. + *

+ * When set to {@code true}, the stream starts paused and no items will flow until + * {@link DemandPauser#resume()} is called. + * + * @param paused {@code true} to start paused, {@code false} to start flowing (default) + * @return this configuration instance + */ + @CheckReturnValue + public MultiDemandPausing paused(boolean paused) { + this.paused = paused; + return this; + } + + /** + * Delays the upstream subscription until the stream is resumed. + *

+ * By default, the upstream subscription happens immediately even when starting paused. + * When {@code lateSubscription} is {@code true} and the stream starts {@code paused}, the upstream + * subscription is delayed until {@link DemandPauser#resume()} is called. + *

+ * This is useful for hot sources where you want to avoid missing early items that would be + * emitted before you're ready to process them. + * + * @param lateSubscription {@code true} to delay subscription until resumed, {@code false} for immediate subscription + * (default) + * @return this configuration instance + */ + @CheckReturnValue + public MultiDemandPausing lateSubscription(boolean lateSubscription) { + this.lateSubscription = lateSubscription; + return this; + } + + /** + * Sets the maximum buffer size for already-requested items when using {@link BackPressureStrategy#BUFFER}. + *

+ * When the stream is paused, items that were already requested from upstream can be buffered. + *

+ * Note: The buffer only holds items that were already requested from upstream before pausing. + * When paused, no new requests are issued to upstream. + * + * @param bufferSize the maximum buffer size, must be positive + * @return this configuration instance + */ + @CheckReturnValue + public MultiDemandPausing bufferSize(int bufferSize) { + this.bufferSize = positive(bufferSize, "bufferSize"); + this.unbounded = false; + return this; + } + + /** + * Sets the buffer size for already-requested items to unbounded when using {@link BackPressureStrategy#BUFFER}. + *

+ * When the stream is paused, items that were already requested from upstream can be buffered. + * + * @return this configuration instance + */ + @CheckReturnValue + public MultiDemandPausing bufferUnconditionally() { + this.bufferSize = Infrastructure.getMultiOverflowDefaultBufferSize(); + this.unbounded = true; + return this; + } + + /** + * Sets the strategy for handling already-requested items while paused. + *

+ * Available strategies: + *

    + *
  • {@link BackPressureStrategy#BUFFER}: Buffer items while paused, deliver when resumed (default)
  • + *
  • {@link BackPressureStrategy#DROP}: Drop items while paused, continue with fresh items when resumed
  • + *
  • {@link BackPressureStrategy#IGNORE}: Continue delivering already-requested items even while paused
  • + *
+ * + * @param bufferStrategy the buffer strategy, must not be {@code null} + * @return this configuration instance + */ + @CheckReturnValue + public MultiDemandPausing bufferStrategy(BackPressureStrategy bufferStrategy) { + this.bufferStrategy = nonNull(bufferStrategy, "bufferStrategy"); + if (bufferStrategy != BackPressureStrategy.BUFFER + && bufferStrategy != BackPressureStrategy.DROP + && bufferStrategy != BackPressureStrategy.IGNORE) { + throw new IllegalArgumentException("Demand pauser only supports BUFFER, DROP or IGNORE strategy"); + } + return this; + } + + /** + * Sets the demand pauser and return the new {@link Multi}. + * + * @param pauser the pauser handle, must not be {@code null} + * @return the new {@link Multi} + */ + @CheckReturnValue + public Multi using(DemandPauser pauser) { + DemandPauser p = nonNull(pauser, "pauser"); + MultiDemandPausingOp pausingMulti = new MultiDemandPausingOp<>(upstream, + paused, lateSubscription, bufferSize, unbounded, bufferStrategy); + p.bind(pausingMulti); + return Infrastructure.onMultiCreation(pausingMulti); + } + +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java index 3a7d600c3..90ae45f19 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java @@ -16,6 +16,7 @@ import io.smallrye.mutiny.groups.MultiCollect; import io.smallrye.mutiny.groups.MultiConvert; import io.smallrye.mutiny.groups.MultiDemandPacing; +import io.smallrye.mutiny.groups.MultiDemandPausing; import io.smallrye.mutiny.groups.MultiGroup; import io.smallrye.mutiny.groups.MultiIfNoItem; import io.smallrye.mutiny.groups.MultiOnCancel; @@ -206,6 +207,11 @@ public MultiDemandPacing paceDemand() { return new MultiDemandPacing<>(this); } + @Override + public MultiDemandPausing pauseDemand() { + return new MultiDemandPausing<>(this); + } + @Override public Multi capDemandsUsing(LongFunction function) { return Infrastructure.onMultiCreation(new MultiDemandCapping<>(this, nonNull(function, "function"))); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java new file mode 100644 index 000000000..3bfe4a2e9 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java @@ -0,0 +1,241 @@ +package io.smallrye.mutiny.operators.multi; + +import java.util.Queue; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.helpers.queues.Queues; +import io.smallrye.mutiny.operators.MultiOperator; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.MultiSubscriber; +import io.smallrye.mutiny.subscription.PausableMulti; + +/** + * Operator that allows pausing and resuming demand propagation to upstream. + *

+ * When paused, this operator stops requesting new items from upstream. + * Already-requested items are handled according to the configured {@link BackPressureStrategy}: + *

    + *
  • {@link BackPressureStrategy#BUFFER}: Items are buffered and delivered when resumed
  • + *
  • {@link BackPressureStrategy#DROP}: Items are dropped
  • + *
  • {@link BackPressureStrategy#IGNORE}: Items continue to flow downstream
  • + *
+ * + * @param the type of items + */ +public class MultiDemandPausingOp extends MultiOperator implements PausableMulti { + + private volatile PausableProcessor processor; + + private final AtomicBoolean paused; + private final AtomicBoolean subscribed = new AtomicBoolean(); + private final boolean lateSubscription; + private final int bufferSize; + private final boolean unbounded; + private final BackPressureStrategy backPressureStrategy; + + public MultiDemandPausingOp(Multi upstream, boolean initiallyPaused, boolean lateSubscription, int bufferSize, + boolean unbounded, BackPressureStrategy backPressureStrategy) { + super(upstream); + this.paused = new AtomicBoolean(initiallyPaused); + this.lateSubscription = lateSubscription; + this.bufferSize = bufferSize; + this.unbounded = unbounded; + this.backPressureStrategy = backPressureStrategy; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + processor = new PausableProcessor(subscriber); + if (!lateSubscription || !paused.get()) { // if late subscription is disabled, we can subscribe now. + subscribed.set(true); + upstream().subscribe(processor); + } + } + + @Override + public boolean isPaused() { + return paused.get(); + } + + @Override + public void pause() { + paused.set(true); + } + + @Override + public void resume() { + if (paused.compareAndSet(true, false)) { + PausableProcessor p = processor; + if (p != null) { + if (lateSubscription && subscribed.compareAndSet(false, true)) { + upstream().subscribe(p); + } + p.resume(); + } + } + } + + @Override + public int bufferSize() { + PausableProcessor p = processor; + if (p != null) { + return p.queueSize(); + } + return 0; + } + + @Override + public boolean clearBuffer() { + if (paused.get()) { + PausableProcessor p = processor; + if (p != null) { + p.clearQueue(); + return true; + } + } + return false; + } + + private class PausableProcessor extends MultiOperatorProcessor { + + private final AtomicLong demand = new AtomicLong(); + private final Queue queue; + private final AtomicInteger wip = new AtomicInteger(); + private final AtomicInteger strictBoundCounter = new AtomicInteger(0); + private volatile boolean upstreamCompleted; + + PausableProcessor(MultiSubscriber downstream) { + super(downstream); + // Determine if we need a queue based on strategy and buffer size + if (backPressureStrategy == BackPressureStrategy.BUFFER) { + this.queue = unbounded ? Queues. unbounded(bufferSize).get() : Queues. get(bufferSize).get(); + } else { + this.queue = null; + } + } + + void resume() { + Flow.Subscription subscription = getUpstreamSubscription(); + if (subscription == Subscriptions.CANCELLED) { + return; + } + // Drain any buffered items first + drain(); + long currentDemand = demand.get(); + if (currentDemand > 0) { + Subscriptions.produced(demand, currentDemand); + subscription.request(currentDemand); + } + } + + void drain() { + if (queue == null) { + if (upstreamCompleted) { + super.onCompletion(); + } + return; + } + if (wip.getAndIncrement() > 0) { + return; + } + while (true) { + Queue qe = queue; + // Drain all buffered items - these were already requested from upstream + // so we don't need to check downstream demand here + while (!paused.get()) { + T item = qe.poll(); + if (item == null) { + // queue empty + break; + } + if (!unbounded) { + strictBoundCounter.decrementAndGet(); + } + downstream.onItem(item); + } + if (!paused.get() && upstreamCompleted) { + super.onCompletion(); + } + if (wip.decrementAndGet() == 0) { + return; + } + } + } + + void clearQueue() { + if (queue != null) { + strictBoundCounter.set(0); + queue.clear(); + } + } + + int queueSize() { + return (queue != null) ? queue.size() : 0; + } + + @Override + public void onItem(T item) { + if (backPressureStrategy != BackPressureStrategy.IGNORE && paused.get()) { + if (backPressureStrategy == BackPressureStrategy.DROP) { + return; + } + // When paused buffer items if necessary + if ((!unbounded && strictBoundCounter.getAndIncrement() >= bufferSize) || !queue.offer(item)) { + // Buffer is full, throw exception + onFailure(new IllegalStateException("Buffer overflow: cannot buffer more than " + bufferSize + " items")); + } + } else { + super.onItem(item); + } + } + + @Override + public void request(long numberOfItems) { + if (numberOfItems <= 0) { + onFailure(Subscriptions.getInvalidRequestException()); + return; + } + Flow.Subscription subscription = getUpstreamSubscription(); + if (subscription == Subscriptions.CANCELLED) { + return; + } + try { + Subscriptions.add(demand, numberOfItems); + if (paused.get()) { + return; + } + long currentDemand = demand.get(); + if (currentDemand > 0) { + Subscriptions.produced(demand, currentDemand); + subscription.request(currentDemand); + } + } catch (Throwable failure) { + onFailure(failure); + } + } + + @Override + public void cancel() { + clearQueue(); + processor = null; + super.cancel(); + } + + @Override + public void onFailure(Throwable failure) { + clearQueue(); + super.onFailure(failure); + } + + @Override + public void onCompletion() { + upstreamCompleted = true; + drain(); + } + } +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/subscription/DemandPauser.java b/implementation/src/main/java/io/smallrye/mutiny/subscription/DemandPauser.java new file mode 100644 index 000000000..cf0747370 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/subscription/DemandPauser.java @@ -0,0 +1,116 @@ +package io.smallrye.mutiny.subscription; + +import io.smallrye.common.annotation.Experimental; + +/** + * A handle to control a pausable stream without holding a direct reference to the stream itself. + *

+ * This handle allows pausing, resuming, and inspecting the state of a pausable stream from anywhere + * in the application, even after the stream has been transformed or subscribed to. + *

+ * Example usage: + * + *

+ * {@code
+ * DemandPauser pauser = new DemandPauser();
+ *
+ * Multi.createFrom().range(0, 100)
+ *         .pauseDemand().using(pauser)
+ *         .onItem().call(i -> Uni.createFrom().nullItem()
+ *                 .onItem().delayIt().by(Duration.ofMillis(10)))
+ *         .onItem().transform(i -> i * 2)
+ *         .subscribe().with(System.out::println);
+ *
+ * // Control from anywhere
+ * pauser.pause();
+ * pauser.resume();
+ * System.out.println("Paused: " + pauser.isPaused());
+ * }
+ * 
+ */ +@Experimental("This API is still being designed and may change in the future") +public class DemandPauser { + + volatile PausableMulti multi; + + /** + * Binds this handle to a pausable channel. + * This is typically called internally when creating a pausable stream. + * + * @param multi the pausable channel to bind to + */ + public void bind(PausableMulti multi) { + this.multi = multi; + } + + /** + * Pauses the stream. Already requested items will be handled according to the configured buffer strategy. + * + * @throws IllegalStateException if the handle is not bound to a channel + */ + public void pause() { + ensureBound(); + multi.pause(); + } + + /** + * Resumes the stream. Buffered items (if any) will be delivered before new items are requested. + * + * @throws IllegalStateException if the handle is not bound to a channel + */ + public void resume() { + ensureBound(); + multi.resume(); + } + + /** + * Checks if the stream is currently paused. + * + * @return {@code true} if paused, {@code false} otherwise + * @throws IllegalStateException if the handle is not bound to a channel + */ + public boolean isPaused() { + ensureBound(); + return multi.isPaused(); + } + + /** + * Returns the current buffer size (number of items in the buffer). + * Only applicable when using BUFFER strategy. + * + * @return the number of buffered items + * @throws IllegalStateException if the handle is not bound to a channel + */ + public int bufferSize() { + ensureBound(); + return multi.bufferSize(); + } + + /** + * Clears the buffer if the stream is currently paused. + * Only applicable when using BUFFER strategy. + * + * @return {@code true} if the buffer was cleared, {@code false} if not paused or no buffer + * @throws IllegalStateException if the handle is not bound to a channel + */ + public boolean clearBuffer() { + ensureBound(); + return multi.clearBuffer(); + } + + /** + * Checks if this handle is bound to a channel. + * + * @return {@code true} if bound, {@code false} otherwise + */ + public boolean isBound() { + return multi != null; + } + + private void ensureBound() { + if (multi == null) { + throw new IllegalStateException("DemandPauser is not bound to a stream. " + + "Make sure to use .pauseDemand().using(pauser) in the pausable configuration."); + } + } +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/subscription/PausableMulti.java b/implementation/src/main/java/io/smallrye/mutiny/subscription/PausableMulti.java new file mode 100644 index 000000000..14c2c9c16 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/subscription/PausableMulti.java @@ -0,0 +1,46 @@ +package io.smallrye.mutiny.subscription; + +/** + * Interface for controlling a pausable Multi stream. + *

+ * This interface defines the contract for pausing and resuming demand propagation in a reactive stream. + * Implementations of this interface are typically bound to a {@link DemandPauser} to provide external control. + *

+ * This is an internal interface used by the demand pausing operator. + * Users should interact with {@link DemandPauser} instead. + * + * @see DemandPauser + */ +public interface PausableMulti { + + /** + * Pauses demand propagation to the upstream. + */ + void pause(); + + /** + * Resumes demand propagation to the upstream. + */ + void resume(); + + /** + * Checks if demand propagation is currently paused. + */ + boolean isPaused(); + + /** + * Returns the current number of buffered items. + * Only applicable when using {@link BackPressureStrategy#BUFFER}. + * + * @return the number of buffered items, or 0 if no buffer is used + */ + int bufferSize(); + + /** + * Clears the buffer if currently paused. + * Only applicable when using {@link BackPressureStrategy#BUFFER}. + * + * @return {@code true} if the buffer was cleared, {@code false} if not paused or no buffer exists + */ + boolean clearBuffer(); +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java new file mode 100644 index 000000000..760caf27c --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingTest.java @@ -0,0 +1,712 @@ +package io.smallrye.mutiny.operators.multi; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.DemandPauser; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class MultiDemandPausingTest { + + @Test + public void testPauseDemand() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().range(0, 100) + .pauseDemand().using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10)); + + // Pause the stream + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + int sizeWhenPaused = sub.getItems().size(); + + // Wait a bit - no new items should arrive + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sub.getItems()).hasSizeLessThanOrEqualTo(sizeWhenPaused + 5); // allow for some in-flight + + // Resume the stream + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // All items should eventually arrive + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); + } + + @Test + public void testDropStrategy() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(5)) + .map(Long::intValue) + .select().first(200) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.DROP) + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(20)); + + // Pause the stream - items will be dropped since we're using DROP strategy + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + int sizeWhenPaused = sub.getItems().size(); + + // Wait for items to be dropped (stream is still emitting) + await().pollDelay(Duration.ofMillis(200)).until(() -> true); + + // Items should not have advanced much (maybe a few in-flight) + assertThat(sub.getItems().size()).isLessThanOrEqualTo(sizeWhenPaused + 10); + + // Resume the stream + pauser.resume(); + + // Not all items will arrive (some were dropped while paused) + await().atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(sub.getItems().size()).isGreaterThan(sizeWhenPaused + 20)); + + // Should still be less than total since some were dropped + assertThat(sub.getItems()).hasSizeLessThan(200); + } + + @Test + public void testInitiallyPaused() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().range(0, 50) + .pauseDemand() + .paused(true) + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait a bit - no items should arrive + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sub.getItems()).isEmpty(); + assertThat(pauser.isPaused()).isTrue(); + + // Resume the stream + pauser.resume(); + + // All items should arrive + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(50)); + } + + @Test + public void testBoundedBuffer() { + DemandPauser pauser = new DemandPauser(); + + Multi source = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .map(Long::intValue) + .select().first(100); + + AssertSubscriber sub = source.pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferSize(20) + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5)); + + // Pause and wait for buffer overflow + pauser.pause(); + + // Buffer should overflow and cause failure + await().atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> assertThat(sub.getFailure()).isInstanceOf(IllegalStateException.class) + .hasMessage("Buffer overflow: cannot buffer more than 20 items")); + + assertThat(sub.getItems()).hasSizeLessThan(100); + } + + @Test + public void testBufferSize() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .map(Long::intValue) + .select().first(100) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferSize(20) + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5)); + + // Pause - items should buffer + pauser.pause(); + + // Wait for buffer to fill + await().pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0)); + + assertThat(pauser.bufferSize()).isLessThanOrEqualTo(20); + + // Resume + pauser.resume(); + + await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0)); + } + + @Test + public void testClearBuffer() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .map(Long::intValue) + .select().first(100) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferUnconditionally() + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5)); + + // Pause - items should buffer + pauser.pause(); + + // Wait for buffer to fill + await().pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0)); + + int bufferSize = pauser.bufferSize(); + assertThat(bufferSize).isGreaterThan(0); + + // Clear the buffer + boolean cleared = pauser.clearBuffer(); + assertThat(cleared).isTrue(); + assertThat(pauser.bufferSize()).isEqualTo(0); + + // Resume + pauser.resume(); + + // Items will continue from where we resumed, not from buffer + await().pollDelay(Duration.ofMillis(200)).until(() -> true); + assertThat(sub.getItems()).hasSizeLessThan(100); // Some items were dropped + } + + @Test + public void testLateSubscription() { + DemandPauser pauser = new DemandPauser(); + AtomicBoolean subscribed = new AtomicBoolean(false); + + AssertSubscriber sub = Multi.createFrom().range(0, 50) + .onSubscription().invoke(() -> subscribed.set(true)) + .pauseDemand() + .paused(true) // Start paused + .lateSubscription(true) // Delay subscription until resumed + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Stream is not subscribed yet + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(subscribed.get()).isFalse(); + assertThat(sub.getItems()).isEmpty(); + assertThat(pauser.isPaused()).isTrue(); + + // Resume triggers subscription and item flow + pauser.resume(); + await().untilAsserted(() -> assertThat(subscribed.get()).isTrue()); + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(50)); + assertThat(pauser.isPaused()).isFalse(); + } + + @Test + public void testLateSubscriptionWithHotSource() { + DemandPauser pauser = new DemandPauser(); + AtomicBoolean subscribed = new AtomicBoolean(false); + + // Hot source that emits immediately + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .onSubscription().invoke(() -> subscribed.set(true)) + .select().first(100) + .pauseDemand() + .paused(true) // Start paused + .lateSubscription(true) // Delay subscription - won't miss early items + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Stream is not subscribed yet, no items are being emitted + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(subscribed.get()).isFalse(); + assertThat(sub.getItems()).isEmpty(); + + // Resume triggers subscription and starts receiving items from the beginning + pauser.resume(); + await().untilAsserted(() -> assertThat(subscribed.get()).isTrue()); + + // First item should be 0, not some later value + await().untilAsserted(() -> assertThat(sub.getItems()).isNotEmpty()); + assertThat(sub.getItems().get(0)).isEqualTo(0L); + + // Eventually receive all items + await().atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); + } + + @Test + public void testLateSubscriptionNotPausedSubscribesImmediately() { + DemandPauser pauser = new DemandPauser(); + AtomicBoolean subscribed = new AtomicBoolean(false); + + AssertSubscriber sub = Multi.createFrom().range(0, 50) + .onSubscription().invoke(() -> subscribed.set(true)) + .pauseDemand() + .lateSubscription(true) // Late subscription enabled but... + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Stream subscribes immediately because it's not paused + // lateSubscription only delays when initially paused + await().untilAsserted(() -> assertThat(subscribed.get()).isTrue()); + + // Items flow normally + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(50)); + assertThat(pauser.isPaused()).isFalse(); + } + + @Test + public void testMethodsWithoutSubscription() { + DemandPauser pauser = new DemandPauser(); + + // Pauser is bound but stream not subscribed yet + Multi.createFrom().range(0, 50) + .pauseDemand() + .using(pauser); + + // Pauser is bound + assertThat(pauser.isBound()).isTrue(); + + // Can check state even without subscription + assertThat(pauser.isPaused()).isFalse(); + + // Can pause before subscription + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + // Buffer operations work (return defaults when no subscription) + assertThat(pauser.bufferSize()).isEqualTo(0); + assertThat(pauser.clearBuffer()).isFalse(); // Returns false when not subscribed + + // Resume also works + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + } + + @Test + public void testPauserNotBound() { + DemandPauser pauser = new DemandPauser(); + + // Pauser is not bound yet + assertThat(pauser.isBound()).isFalse(); + + // All operations should throw IllegalStateException + assertThatThrownBy(() -> pauser.pause()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("DemandPauser is not bound"); + + assertThatThrownBy(() -> pauser.resume()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("DemandPauser is not bound"); + + assertThatThrownBy(() -> pauser.isPaused()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("DemandPauser is not bound"); + + assertThatThrownBy(() -> pauser.bufferSize()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("DemandPauser is not bound"); + + assertThatThrownBy(() -> pauser.clearBuffer()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("DemandPauser is not bound"); + } + + @Test + public void testInvalidBufferStrategy() { + DemandPauser pauser = new DemandPauser(); + + // ERROR strategy is not supported for pauseDemand + assertThatThrownBy(() -> Multi.createFrom().range(0, 50) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.ERROR) + .using(pauser)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Demand pauser only supports BUFFER, DROP or IGNORE strategy"); + + // LATEST strategy is not supported for pauseDemand + assertThatThrownBy(() -> Multi.createFrom().range(0, 50) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.LATEST) + .using(pauser)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Demand pauser only supports BUFFER, DROP or IGNORE strategy"); + } + + @Test + public void testIgnoreStrategy() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(5)) + .map(Long::intValue) + .select().first(100) + .pauseDemand() + .bufferStrategy(BackPressureStrategy.IGNORE) + .using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(20)); + + // Pause the stream - already requested items continue to flow with IGNORE strategy + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + int sizeWhenPaused = sub.getItems().size(); + + // With IGNORE strategy, already-requested items continue to arrive even when paused + // No new demand is issued, but in-flight items keep flowing + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + + // Items may continue to arrive (in-flight items) even when paused + int sizeAfterPause = sub.getItems().size(); + // Some items may have arrived during pause with IGNORE strategy + assertThat(sizeAfterPause).isGreaterThanOrEqualTo(sizeWhenPaused); + + // Resume the stream - new demand will be issued + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // All items should eventually arrive + await().atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); + + // Buffer should always be 0 with IGNORE strategy (no buffering) + assertThat(pauser.bufferSize()).isEqualTo(0); + } + + @Test + public void testCancelWhilePaused() { + DemandPauser pauser = new DemandPauser(); + AtomicBoolean cancelled = new AtomicBoolean(false); + + AssertSubscriber sub = Multi.createFrom().range(0, 100) + .onCancellation().invoke(() -> cancelled.set(true)) + .pauseDemand().using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create()); + + // Request and wait for some items + sub.request(10); + assertThat(sub.getItems()).hasSize(10); + + // Pause the stream + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + int sizeWhenPaused = sub.getItems().size(); + + // Cancel while paused + sub.cancel(); + + await().untilAsserted(() -> assertThat(cancelled.get()).isTrue()); + + // Resume after cancel + pauser.resume(); + + // No new items should arrive after cancel + int sizeAfterCancel = sub.getItems().size(); + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sizeWhenPaused).isEqualTo(sizeAfterCancel); + } + + @Test + public void testCancelWithBufferedItems() { + DemandPauser pauser = new DemandPauser(); + List items = new CopyOnWriteArrayList<>(); + AtomicBoolean cancelled = new AtomicBoolean(false); + + Multi source = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .onCancellation().invoke(() -> cancelled.set(true)) + .select().first(100); + + source.pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferUnconditionally() + .using(pauser) + .subscribe().withSubscriber(new MultiSubscriber() { + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onItem(Long item) { + items.add(item); + if (items.size() > 5) { + pauser.pause(); + // Wait for buffer to fill + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + // Cancel with items in buffer + subscription.cancel(); + } + } + + @Override + public void onFailure(Throwable failure) { + } + + @Override + public void onCompletion() { + } + }); + + await().untilAsserted(() -> assertThat(cancelled.get()).isTrue()); + + // Buffer should have been cleared on cancel + assertThat(pauser.bufferSize()).isEqualTo(0); + + // No new items should arrive after cancel + int sizeAfterCancel = items.size(); + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(items.size()).isEqualTo(sizeAfterCancel); + } + + @Test + public void testOnFailureWhilePaused() { + DemandPauser pauser = new DemandPauser(); + List items = new CopyOnWriteArrayList<>(); + AtomicInteger failureCount = new AtomicInteger(); + AtomicBoolean wasFailure = new AtomicBoolean(false); + + Multi multi = Multi.createFrom().emitter(emitter -> { + for (int i = 0; i < 20; i++) { + emitter.emit(i); + } + emitter.fail(new RuntimeException("Test failure")); + }); + multi + .pauseDemand().using(pauser) + .subscribe().with( + item -> { + items.add(item); + if (items.size() > 5) { + pauser.pause(); + } + }, + failure -> { + wasFailure.set(true); + failureCount.incrementAndGet(); + }); + + // Wait for failure to be propagated + await().untilAsserted(() -> assertThat(wasFailure.get()).isTrue()); + assertThat(failureCount.get()).isEqualTo(1); + + // No new items should arrive after failure + int sizeAfterFailure = items.size(); + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(items.size()).isEqualTo(sizeAfterFailure); + } + + @Test + public void testOnFailureWithBufferedItems() { + DemandPauser pauser = new DemandPauser(); + List items = new CopyOnWriteArrayList<>(); + AtomicInteger failureCount = new AtomicInteger(); + AtomicBoolean wasFailure = new AtomicBoolean(false); + + Multi multi = Multi.createFrom().emitter(emitter -> { + for (int i = 0; i < 100; i++) { + emitter.emit(i); + if (i == 50) { + emitter.fail(new RuntimeException("Test failure at 50")); + return; + } + } + }); + multi + .pauseDemand() + .bufferStrategy(BackPressureStrategy.BUFFER) + .bufferUnconditionally() + .using(pauser) + .subscribe().with( + item -> { + items.add(item); + if (items.size() > 5) { + pauser.pause(); + } + }, + failure -> { + wasFailure.set(true); + failureCount.incrementAndGet(); + }); + + // Wait for failure to be propagated + await().untilAsserted(() -> assertThat(wasFailure.get()).isTrue()); + assertThat(failureCount.get()).isEqualTo(1); + + // Buffer should be cleared after failure + assertThat(pauser.bufferSize()).isEqualTo(0); + + // Items received should be less than 51 (some may be buffered when failure occurs) + assertThat(items.size()).isLessThanOrEqualTo(51); + } + + @Test + public void testUpstreamFailurePropagation() { + DemandPauser pauser = new DemandPauser(); + List items = new CopyOnWriteArrayList<>(); + AtomicInteger failureCount = new AtomicInteger(); + RuntimeException expectedException = new RuntimeException("Upstream failure"); + + Multi.createFrom().range(0, 10) + .map(i -> { + if (i == 5) { + throw expectedException; + } + return i; + }) + .pauseDemand().using(pauser) + .subscribe().with( + items::add, + failure -> { + assertThat(failure).isEqualTo(expectedException); + failureCount.incrementAndGet(); + }); + + // Wait for failure to be propagated + await().untilAsserted(() -> assertThat(failureCount.get()).isEqualTo(1)); + + // Should have received items 0-4 + assertThat(items).containsExactly(0, 1, 2, 3, 4); + } + + @Test + public void testResumeWhenAlreadyResumed() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .select().first(100) + .pauseDemand().using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Initially not paused + assertThat(pauser.isPaused()).isFalse(); + + // Call resume when already resumed (should be idempotent) + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // Items should flow normally + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); + + // Resume again after completion + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + pauser.clearBuffer(); + } + + @Test + public void testPauseWhenAlreadyPaused() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .select().first(100) + .pauseDemand().using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10)); + + // Pause the stream + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + int sizeWhenPaused = sub.getItems().size(); + + // Pause again when already paused (should be idempotent) + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + + // Wait a bit - no new items should arrive + await().pollDelay(Duration.ofMillis(100)).until(() -> true); + assertThat(sub.getItems()).hasSizeLessThanOrEqualTo(sizeWhenPaused + 5); // allow for some in-flight + + // Resume + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // All items should eventually arrive + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); + } + + @Test + public void testMultiplePauseResumeCycles() { + DemandPauser pauser = new DemandPauser(); + + AssertSubscriber sub = Multi.createFrom().range(0, 100) + .pauseDemand().using(pauser) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + // Wait for some items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10)); + + // First pause-resume cycle + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + await().pollDelay(Duration.ofMillis(50)).until(() -> true); + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // Wait for more items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(30)); + + // Second pause-resume cycle + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + await().pollDelay(Duration.ofMillis(50)).until(() -> true); + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // Wait for more items + await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(50)); + + // Third pause-resume cycle + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + await().pollDelay(Duration.ofMillis(50)).until(() -> true); + pauser.resume(); + assertThat(pauser.isPaused()).isFalse(); + + // All items should eventually arrive + await().untilAsserted(() -> assertThat(sub.getItems()).hasSize(100)); + } + +} diff --git a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiDemandPausingTckTest.java b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiDemandPausingTckTest.java new file mode 100644 index 000000000..a92dc0724 --- /dev/null +++ b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiDemandPausingTckTest.java @@ -0,0 +1,49 @@ +package io.smallrye.mutiny.tcktests; + +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import io.smallrye.mutiny.subscription.DemandPauser; + +public class MultiDemandPausingTckTest extends AbstractPublisherTck { + + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); + + @Override + public Flow.Publisher createFlowPublisher(long elements) { + DemandPauser pauser = new DemandPauser(); + AtomicLong count = new AtomicLong(); + return upstream(elements) + .emitOn(executor) + .pauseDemand() + .bufferUnconditionally() + .using(pauser) + .invoke(() -> { + if (count.incrementAndGet() % 3L == 0) { + pauser.pause(); + executor.schedule(pauser::resume, 50, TimeUnit.MILLISECONDS); + } + }) + .emitOn(executor); + + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + DemandPauser pauser = new DemandPauser(); + AtomicLong count = new AtomicLong(); + return failedUpstream().pauseDemand() + .bufferUnconditionally() + .using(pauser) + .invoke(() -> { + if (count.incrementAndGet() % 3L == 0) { + pauser.pause(); + executor.schedule(pauser::resume, 50, TimeUnit.MILLISECONDS); + } + }); + } + +}