Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions documentation/docs/guides/controlling-demand.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,101 @@ You can also define a custom function that provides a capping value based on a c
Here we have a function that requests 75% of the downstream requests.

Note that the function must return a value `n` that satisfies `(0 < n <= requested)` where `requested` is the downstream demand.

## Pausing the demand

The `Multi.pauseDemand()` operator provides fine-grained control over demand propagation in reactive streams.
Unlike cancellation, which terminates the subscription, pausing allows to suspend demand without unsubscribing from the upstream.
This is useful for implementing flow control patterns where item flow needs to be paused based on external conditions.

### Basic pausing and resuming

The `pauseDemand()` operator works with a `DemandPauser` handle that allows to control the stream:

```java linenums="1"
{{ insert('java/guides/operators/PausingDemandTest.java', 'basic') }}
```

The `DemandPauser` provides methods to:

- `pause()`: Stop propagating demand to upstream
- `resume()`: Resume demand propagation and deliver buffered items
- `isPaused()`: Check the current pause state

Note that a few items may still arrive after pausing due to in-flight requests that were already issued to upstream.

### Starting in a paused state

You can create a stream that starts paused and only begins flowing when explicitly resumed:

```java linenums="1"
{{ insert('java/guides/operators/PausingDemandTest.java', 'initially-paused') }}
```

This is useful when you want to prepare a stream but delay its execution until certain conditions are met.

### Late subscription

By default, the upstream subscription happens immediately even when starting paused.
The `lateSubscription()` option delays the upstream subscription until the stream is resumed:

```java linenums="1"
{{ insert('java/guides/operators/PausingDemandTest.java', 'late-subscription') }}
```

### Buffer strategies

When a stream is paused, the operator stops requesting new items from upstream.
However, items that were already requested (due to downstream demand) may still arrive.
Buffer strategies control what happens to these in-flight items.

The `pauseDemand()` operator supports three buffer strategies: `BUFFER` (default), `DROP`, and `IGNORE`.
Configuring any other strategy will throw an `IllegalArgumentException`.

#### BUFFER strategy (default)

Already-requested items are buffered while paused and delivered when resumed:

```java linenums="1"
{{ insert('java/guides/operators/PausingDemandTest.java', 'buffer-strategy') }}
```

You can configure the buffer size:

- `bufferUnconditionally()`: Unbounded buffer
- `bufferSize(n)`: Buffer up to `n` items, then fail with buffer overflow

When the buffer overflows, the stream fails with an `IllegalStateException`.

**Important**: The buffer only holds items that were already requested from upstream before pausing.
When paused, no new requests are issued to upstream, so the buffer size is bounded by the outstanding demand at the time of pausing.

#### DROP strategy

Already-requested items are dropped while paused:

```java linenums="1"
{{ insert('java/guides/operators/PausingDemandTest.java', 'drop-strategy') }}
```

Items that arrive while paused are discarded, and when resumed, the stream continues requesting fresh items.

#### IGNORE strategy

Already-requested items continue to flow downstream while paused.
This strategy doesn't use any buffers.
It only pauses demand from being issued to upstream, but does not pause the flow of already requested items.

### Buffer management

When using the BUFFER strategy, you can inspect and manage the buffer:

```java linenums="1"
{{ insert('java/guides/operators/PausingDemandTest.java', 'buffer-management') }}
```

The `DemandPauser` provides:

- `bufferSize()`: Returns the current number of buffered items
- `clearBuffer()`: Clears the buffer (only works while paused), returns `true` if successful

267 changes: 267 additions & 0 deletions documentation/src/test/java/guides/operators/PausingDemandTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package guides.operators;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.DemandPauser;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class PausingDemandTest {

@Test
void basicPausing() {
// <basic>
DemandPauser pauser = new DemandPauser();

AssertSubscriber<Integer> sub = AssertSubscriber.create();
Multi.createFrom().range(0, 100)
.pauseDemand().using(pauser)
// Throttle the multi
.onItem().call(i -> Uni.createFrom().nullItem()
.onItem().delayIt().by(Duration.ofMillis(10)))
.subscribe().withSubscriber(sub);

// Unbounded request
sub.request(Long.MAX_VALUE);
// Wait for some items
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10));

// Pause the stream
pauser.pause();
assertThat(pauser.isPaused()).isTrue();

int sizeWhenPaused = sub.getItems().size();

// Wait - no new items should arrive (except a few in-flight)
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
assertThat(sub.getItems()).hasSizeLessThanOrEqualTo(sizeWhenPaused + 5);

// Resume the stream
pauser.resume();
assertThat(pauser.isPaused()).isFalse();

// All items eventually arrive
sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(100);
// </basic>
}

@Test
void initiallyPaused() {
// <initially-paused>
DemandPauser pauser = new DemandPauser();

AssertSubscriber<Integer> sub = Multi.createFrom().range(0, 50)
.pauseDemand()
.paused(true) // Start paused
.using(pauser)
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// No items arrive while paused
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
assertThat(sub.getItems()).isEmpty();
assertThat(pauser.isPaused()).isTrue();

// Resume to start receiving items
pauser.resume();
sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(50);
// </initially-paused>
}

@Test
void lateSubscription() {
// <late-subscription>
DemandPauser pauser = new DemandPauser();
AtomicBoolean subscribed = new AtomicBoolean(false);

AssertSubscriber<Integer> sub = Multi.createFrom().range(0, 50)
.onSubscription().invoke(() -> subscribed.set(true))
.pauseDemand()
.paused(true) // Start paused
.lateSubscription(true) // Delay subscription until resumed
.using(pauser)
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// Stream is not subscribed yet
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
assertThat(subscribed.get()).isFalse();
assertThat(sub.getItems()).isEmpty();

// Resume triggers subscription and item flow
pauser.resume();
await().untilAsserted(() -> assertThat(subscribed.get()).isTrue());
sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(50);
// </late-subscription>
}

@Test
void bufferStrategy() {
// <buffer-strategy>
DemandPauser pauser = new DemandPauser();

AssertSubscriber<Long> sub = AssertSubscriber.create(Long.MAX_VALUE);

Multi.createFrom().ticks().every(Duration.ofMillis(10))
.select().first(100)
.pauseDemand()
.bufferStrategy(BackPressureStrategy.BUFFER)
.bufferSize(20) // Buffer up to 20 items
.using(pauser)
.subscribe().withSubscriber(sub);

// Wait for some items
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5));

// Pause - items buffer up to the limit
pauser.pause();

// Wait for buffer to fill
await().pollDelay(Duration.ofMillis(100))
.untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0));

// Buffer size is capped
assertThat(pauser.bufferSize()).isLessThanOrEqualTo(20);

// Resume drains the buffer
pauser.resume();
await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0));
// </buffer-strategy>
}

@Test
void dropStrategy() {
// <drop-strategy>
DemandPauser pauser = new DemandPauser();

AssertSubscriber<Long> sub = Multi.createFrom().ticks().every(Duration.ofMillis(5))
.select().first(200)
.pauseDemand()
.bufferStrategy(BackPressureStrategy.DROP) // Drop items while paused
.using(pauser)
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// Wait for some items
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(20));

// Pause - subsequent items are dropped
pauser.pause();
int sizeWhenPaused = sub.getItems().size();

// Wait while items are dropped
await().pollDelay(Duration.ofMillis(200)).until(() -> true);

// Resume - continue from current position
pauser.resume();
await().atMost(Duration.ofSeconds(3))
.untilAsserted(() -> assertThat(sub.getItems().size()).isGreaterThan(sizeWhenPaused + 20));

sub.awaitCompletion();
// Not all items arrived (some were dropped)
assertThat(sub.getItems()).hasSizeLessThan(200);
// </drop-strategy>
}

@Test
void bufferManagement() {
// <buffer-management>
DemandPauser pauser = new DemandPauser();

AssertSubscriber<Long> sub = Multi.createFrom().ticks().every(Duration.ofMillis(10))
.select().first(100)
.pauseDemand()
.bufferStrategy(BackPressureStrategy.BUFFER)
.bufferUnconditionally() // Unbounded buffer
.using(pauser)
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// Wait for some items
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5));

// Pause and let buffer fill
pauser.pause();
await().pollDelay(Duration.ofMillis(100))
.untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0));

int bufferSize = pauser.bufferSize();
assertThat(bufferSize).isGreaterThan(0);

// Clear the buffer
boolean cleared = pauser.clearBuffer();
assertThat(cleared).isTrue();
assertThat(pauser.bufferSize()).isEqualTo(0);

// Resume - items continue from current position (cleared items are lost)
pauser.resume();
// </buffer-management>
}

@Test
void externalControl() {
// <external-control>
DemandPauser pauser = new DemandPauser();

// Create and transform the stream
Multi<String> stream = Multi.createFrom().range(0, 1000)
.pauseDemand().using(pauser)
.onItem().transform(i -> "Item-" + i)
.onItem().transform(String::toUpperCase);

// Control the stream from anywhere
pauser.pause();

// Later, subscribe to the stream
AssertSubscriber<String> sub = stream
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// No items while paused
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
assertThat(sub.getItems()).isEmpty();

// Resume to receive items
pauser.resume();
sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(1000);
// </external-control>
}

@Test
void conditionalFlow() {
// <conditional-flow>
DemandPauser pauser = new DemandPauser();
AtomicInteger errorCount = new AtomicInteger(0);

AssertSubscriber<Integer> sub = Multi.createFrom().range(0, 100)
.pauseDemand().using(pauser)
.onItem().invoke(item -> {
// Simulate error-prone processing
if (item % 20 == 0 && item > 0) {
errorCount.incrementAndGet();
pauser.pause(); // Pause on errors
}
})
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

// Stream pauses when errors occur
await().untilAsserted(() -> assertThat(errorCount.get()).isGreaterThan(0));
await().untilAsserted(() -> assertThat(pauser.isPaused()).isTrue());

// Handle the error, then resume
errorCount.set(0);
pauser.resume();

// Stream continues
await().pollDelay(Duration.ofMillis(50)).until(() -> true);
assertThat(sub.getItems().size()).isGreaterThan(20);
// </conditional-flow>
}
}
Loading
Loading