Skip to content

Commit cc5a609

Browse files
authored
Merge pull request #2007 from ozangunalp/demand_pauser
feat: add demand pausing operator for Multi
2 parents 392fcdf + e5ff7cf commit cc5a609

File tree

10 files changed

+1722
-0
lines changed

10 files changed

+1722
-0
lines changed

documentation/docs/guides/controlling-demand.md

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,101 @@ You can also define a custom function that provides a capping value based on a c
5353
Here we have a function that requests 75% of the downstream requests.
5454

5555
Note that the function must return a value `n` that satisfies `(0 < n <= requested)` where `requested` is the downstream demand.
56+
57+
## Pausing the demand
58+
59+
The `Multi.pauseDemand()` operator provides fine-grained control over demand propagation in reactive streams.
60+
Unlike cancellation, which terminates the subscription, pausing allows to suspend demand without unsubscribing from the upstream.
61+
This is useful for implementing flow control patterns where item flow needs to be paused based on external conditions.
62+
63+
### Basic pausing and resuming
64+
65+
The `pauseDemand()` operator works with a `DemandPauser` handle that allows to control the stream:
66+
67+
```java linenums="1"
68+
{{ insert('java/guides/operators/PausingDemandTest.java', 'basic') }}
69+
```
70+
71+
The `DemandPauser` provides methods to:
72+
73+
- `pause()`: Stop propagating demand to upstream
74+
- `resume()`: Resume demand propagation and deliver buffered items
75+
- `isPaused()`: Check the current pause state
76+
77+
Note that a few items may still arrive after pausing due to in-flight requests that were already issued to upstream.
78+
79+
### Starting in a paused state
80+
81+
You can create a stream that starts paused and only begins flowing when explicitly resumed:
82+
83+
```java linenums="1"
84+
{{ insert('java/guides/operators/PausingDemandTest.java', 'initially-paused') }}
85+
```
86+
87+
This is useful when you want to prepare a stream but delay its execution until certain conditions are met.
88+
89+
### Late subscription
90+
91+
By default, the upstream subscription happens immediately even when starting paused.
92+
The `lateSubscription()` option delays the upstream subscription until the stream is resumed:
93+
94+
```java linenums="1"
95+
{{ insert('java/guides/operators/PausingDemandTest.java', 'late-subscription') }}
96+
```
97+
98+
### Buffer strategies
99+
100+
When a stream is paused, the operator stops requesting new items from upstream.
101+
However, items that were already requested (due to downstream demand) may still arrive.
102+
Buffer strategies control what happens to these in-flight items.
103+
104+
The `pauseDemand()` operator supports three buffer strategies: `BUFFER` (default), `DROP`, and `IGNORE`.
105+
Configuring any other strategy will throw an `IllegalArgumentException`.
106+
107+
#### BUFFER strategy (default)
108+
109+
Already-requested items are buffered while paused and delivered when resumed:
110+
111+
```java linenums="1"
112+
{{ insert('java/guides/operators/PausingDemandTest.java', 'buffer-strategy') }}
113+
```
114+
115+
You can configure the buffer size:
116+
117+
- `bufferUnconditionally()`: Unbounded buffer
118+
- `bufferSize(n)`: Buffer up to `n` items, then fail with buffer overflow
119+
120+
When the buffer overflows, the stream fails with an `IllegalStateException`.
121+
122+
**Important**: The buffer only holds items that were already requested from upstream before pausing.
123+
When paused, no new requests are issued to upstream, so the buffer size is bounded by the outstanding demand at the time of pausing.
124+
125+
#### DROP strategy
126+
127+
Already-requested items are dropped while paused:
128+
129+
```java linenums="1"
130+
{{ insert('java/guides/operators/PausingDemandTest.java', 'drop-strategy') }}
131+
```
132+
133+
Items that arrive while paused are discarded, and when resumed, the stream continues requesting fresh items.
134+
135+
#### IGNORE strategy
136+
137+
Already-requested items continue to flow downstream while paused.
138+
This strategy doesn't use any buffers.
139+
It only pauses demand from being issued to upstream, but does not pause the flow of already requested items.
140+
141+
### Buffer management
142+
143+
When using the BUFFER strategy, you can inspect and manage the buffer:
144+
145+
```java linenums="1"
146+
{{ insert('java/guides/operators/PausingDemandTest.java', 'buffer-management') }}
147+
```
148+
149+
The `DemandPauser` provides:
150+
151+
- `bufferSize()`: Returns the current number of buffered items
152+
- `clearBuffer()`: Clears the buffer (only works while paused), returns `true` if successful
153+
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package guides.operators;
2+
3+
import io.smallrye.mutiny.Multi;
4+
import io.smallrye.mutiny.Uni;
5+
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
6+
import io.smallrye.mutiny.subscription.BackPressureStrategy;
7+
import io.smallrye.mutiny.subscription.DemandPauser;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.time.Duration;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
import static org.assertj.core.api.Assertions.assertThat;
15+
import static org.awaitility.Awaitility.await;
16+
17+
class PausingDemandTest {
18+
19+
@Test
20+
void basicPausing() {
21+
// <basic>
22+
DemandPauser pauser = new DemandPauser();
23+
24+
AssertSubscriber<Integer> sub = AssertSubscriber.create();
25+
Multi.createFrom().range(0, 100)
26+
.pauseDemand().using(pauser)
27+
// Throttle the multi
28+
.onItem().call(i -> Uni.createFrom().nullItem()
29+
.onItem().delayIt().by(Duration.ofMillis(10)))
30+
.subscribe().withSubscriber(sub);
31+
32+
// Unbounded request
33+
sub.request(Long.MAX_VALUE);
34+
// Wait for some items
35+
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10));
36+
37+
// Pause the stream
38+
pauser.pause();
39+
assertThat(pauser.isPaused()).isTrue();
40+
41+
int sizeWhenPaused = sub.getItems().size();
42+
43+
// Wait - no new items should arrive (except a few in-flight)
44+
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
45+
assertThat(sub.getItems()).hasSizeLessThanOrEqualTo(sizeWhenPaused + 5);
46+
47+
// Resume the stream
48+
pauser.resume();
49+
assertThat(pauser.isPaused()).isFalse();
50+
51+
// All items eventually arrive
52+
sub.awaitCompletion();
53+
assertThat(sub.getItems()).hasSize(100);
54+
// </basic>
55+
}
56+
57+
@Test
58+
void initiallyPaused() {
59+
// <initially-paused>
60+
DemandPauser pauser = new DemandPauser();
61+
62+
AssertSubscriber<Integer> sub = Multi.createFrom().range(0, 50)
63+
.pauseDemand()
64+
.paused(true) // Start paused
65+
.using(pauser)
66+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
67+
68+
// No items arrive while paused
69+
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
70+
assertThat(sub.getItems()).isEmpty();
71+
assertThat(pauser.isPaused()).isTrue();
72+
73+
// Resume to start receiving items
74+
pauser.resume();
75+
sub.awaitCompletion();
76+
assertThat(sub.getItems()).hasSize(50);
77+
// </initially-paused>
78+
}
79+
80+
@Test
81+
void lateSubscription() {
82+
// <late-subscription>
83+
DemandPauser pauser = new DemandPauser();
84+
AtomicBoolean subscribed = new AtomicBoolean(false);
85+
86+
AssertSubscriber<Integer> sub = Multi.createFrom().range(0, 50)
87+
.onSubscription().invoke(() -> subscribed.set(true))
88+
.pauseDemand()
89+
.paused(true) // Start paused
90+
.lateSubscription(true) // Delay subscription until resumed
91+
.using(pauser)
92+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
93+
94+
// Stream is not subscribed yet
95+
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
96+
assertThat(subscribed.get()).isFalse();
97+
assertThat(sub.getItems()).isEmpty();
98+
99+
// Resume triggers subscription and item flow
100+
pauser.resume();
101+
await().untilAsserted(() -> assertThat(subscribed.get()).isTrue());
102+
sub.awaitCompletion();
103+
assertThat(sub.getItems()).hasSize(50);
104+
// </late-subscription>
105+
}
106+
107+
@Test
108+
void bufferStrategy() {
109+
// <buffer-strategy>
110+
DemandPauser pauser = new DemandPauser();
111+
112+
AssertSubscriber<Long> sub = AssertSubscriber.create(Long.MAX_VALUE);
113+
114+
Multi.createFrom().ticks().every(Duration.ofMillis(10))
115+
.select().first(100)
116+
.pauseDemand()
117+
.bufferStrategy(BackPressureStrategy.BUFFER)
118+
.bufferSize(20) // Buffer up to 20 items
119+
.using(pauser)
120+
.subscribe().withSubscriber(sub);
121+
122+
// Wait for some items
123+
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5));
124+
125+
// Pause - items buffer up to the limit
126+
pauser.pause();
127+
128+
// Wait for buffer to fill
129+
await().pollDelay(Duration.ofMillis(100))
130+
.untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0));
131+
132+
// Buffer size is capped
133+
assertThat(pauser.bufferSize()).isLessThanOrEqualTo(20);
134+
135+
// Resume drains the buffer
136+
pauser.resume();
137+
await().untilAsserted(() -> assertThat(pauser.bufferSize()).isEqualTo(0));
138+
// </buffer-strategy>
139+
}
140+
141+
@Test
142+
void dropStrategy() {
143+
// <drop-strategy>
144+
DemandPauser pauser = new DemandPauser();
145+
146+
AssertSubscriber<Long> sub = Multi.createFrom().ticks().every(Duration.ofMillis(5))
147+
.select().first(200)
148+
.pauseDemand()
149+
.bufferStrategy(BackPressureStrategy.DROP) // Drop items while paused
150+
.using(pauser)
151+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
152+
153+
// Wait for some items
154+
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(20));
155+
156+
// Pause - subsequent items are dropped
157+
pauser.pause();
158+
int sizeWhenPaused = sub.getItems().size();
159+
160+
// Wait while items are dropped
161+
await().pollDelay(Duration.ofMillis(200)).until(() -> true);
162+
163+
// Resume - continue from current position
164+
pauser.resume();
165+
await().atMost(Duration.ofSeconds(3))
166+
.untilAsserted(() -> assertThat(sub.getItems().size()).isGreaterThan(sizeWhenPaused + 20));
167+
168+
sub.awaitCompletion();
169+
// Not all items arrived (some were dropped)
170+
assertThat(sub.getItems()).hasSizeLessThan(200);
171+
// </drop-strategy>
172+
}
173+
174+
@Test
175+
void bufferManagement() {
176+
// <buffer-management>
177+
DemandPauser pauser = new DemandPauser();
178+
179+
AssertSubscriber<Long> sub = Multi.createFrom().ticks().every(Duration.ofMillis(10))
180+
.select().first(100)
181+
.pauseDemand()
182+
.bufferStrategy(BackPressureStrategy.BUFFER)
183+
.bufferUnconditionally() // Unbounded buffer
184+
.using(pauser)
185+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
186+
187+
// Wait for some items
188+
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(5));
189+
190+
// Pause and let buffer fill
191+
pauser.pause();
192+
await().pollDelay(Duration.ofMillis(100))
193+
.untilAsserted(() -> assertThat(pauser.bufferSize()).isGreaterThan(0));
194+
195+
int bufferSize = pauser.bufferSize();
196+
assertThat(bufferSize).isGreaterThan(0);
197+
198+
// Clear the buffer
199+
boolean cleared = pauser.clearBuffer();
200+
assertThat(cleared).isTrue();
201+
assertThat(pauser.bufferSize()).isEqualTo(0);
202+
203+
// Resume - items continue from current position (cleared items are lost)
204+
pauser.resume();
205+
// </buffer-management>
206+
}
207+
208+
@Test
209+
void externalControl() {
210+
// <external-control>
211+
DemandPauser pauser = new DemandPauser();
212+
213+
// Create and transform the stream
214+
Multi<String> stream = Multi.createFrom().range(0, 1000)
215+
.pauseDemand().using(pauser)
216+
.onItem().transform(i -> "Item-" + i)
217+
.onItem().transform(String::toUpperCase);
218+
219+
// Control the stream from anywhere
220+
pauser.pause();
221+
222+
// Later, subscribe to the stream
223+
AssertSubscriber<String> sub = stream
224+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
225+
226+
// No items while paused
227+
await().pollDelay(Duration.ofMillis(100)).until(() -> true);
228+
assertThat(sub.getItems()).isEmpty();
229+
230+
// Resume to receive items
231+
pauser.resume();
232+
sub.awaitCompletion();
233+
assertThat(sub.getItems()).hasSize(1000);
234+
// </external-control>
235+
}
236+
237+
@Test
238+
void conditionalFlow() {
239+
// <conditional-flow>
240+
DemandPauser pauser = new DemandPauser();
241+
AtomicInteger errorCount = new AtomicInteger(0);
242+
243+
AssertSubscriber<Integer> sub = Multi.createFrom().range(0, 100)
244+
.pauseDemand().using(pauser)
245+
.onItem().invoke(item -> {
246+
// Simulate error-prone processing
247+
if (item % 20 == 0 && item > 0) {
248+
errorCount.incrementAndGet();
249+
pauser.pause(); // Pause on errors
250+
}
251+
})
252+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
253+
254+
// Stream pauses when errors occur
255+
await().untilAsserted(() -> assertThat(errorCount.get()).isGreaterThan(0));
256+
await().untilAsserted(() -> assertThat(pauser.isPaused()).isTrue());
257+
258+
// Handle the error, then resume
259+
errorCount.set(0);
260+
pauser.resume();
261+
262+
// Stream continues
263+
await().pollDelay(Duration.ofMillis(50)).until(() -> true);
264+
assertThat(sub.getItems().size()).isGreaterThan(20);
265+
// </conditional-flow>
266+
}
267+
}

0 commit comments

Comments
 (0)