Skip to content

Commit 6da1b4c

Browse files
committed
fix(pauseDemand): changes after review, to squash
1 parent d12f34b commit 6da1b4c

File tree

9 files changed

+525
-167
lines changed

9 files changed

+525
-167
lines changed

documentation/docs/guides/controlling-demand.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ The `lateSubscription()` option delays the upstream subscription until the strea
9898

9999
When a stream is paused, the operator stops requesting new items from upstream.
100100
However, items that were already requested (due to downstream demand) may still arrive.
101-
Buffer strategies control what happens to these in-flight items:
101+
Buffer strategies control what happens to these in-flight items.
102+
103+
The `pauseDemand()` operator supports three buffer strategies: `BUFFER` (default), `DROP`, and `IGNORE`.
104+
Configuring any other strategy will throw an `IllegalArgumentException`.
102105

103106
#### BUFFER strategy (default)
104107

@@ -109,7 +112,7 @@ Already-requested items are buffered while paused and delivered when resumed:
109112
```
110113

111114
You can configure the buffer size:
112-
- `bufferSize(0)`: Unbounded buffer (uses default buffer size internally)
115+
- `bufferUnconditionally()`: Unbounded buffer
113116
- `bufferSize(n)`: Buffer up to `n` items, then fail with buffer overflow
114117

115118
When the buffer overflows, the stream fails with an `IllegalStateException`.

documentation/src/test/java/guides/operators/PausingDemandTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ void basicPausing() {
2424
AssertSubscriber<Integer> sub = AssertSubscriber.create();
2525
Multi.createFrom().range(0, 100)
2626
.pauseDemand().using(pauser)
27-
// throttle the multi
27+
// Throttle the multi
2828
.onItem().call(i -> Uni.createFrom().nullItem()
2929
.onItem().delayIt().by(Duration.ofMillis(10)))
3030
.subscribe().withSubscriber(sub);
3131

32-
// request unbound
32+
// Unbounded request
3333
sub.request(Long.MAX_VALUE);
3434
// Wait for some items
3535
await().untilAsserted(() -> assertThat(sub.getItems()).hasSizeGreaterThan(10));
@@ -180,7 +180,7 @@ void bufferManagement() {
180180
.select().first(100)
181181
.pauseDemand()
182182
.bufferStrategy(BackPressureStrategy.BUFFER)
183-
.bufferSize(0) // Unbounded buffer
183+
.bufferUnconditionally() // Unbounded buffer
184184
.using(pauser)
185185
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
186186

implementation/revapi.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,7 @@
5151
"criticality" : "highlight",
5252
"minSeverity" : "POTENTIALLY_BREAKING",
5353
"minCriticality" : "documented",
54-
"differences" : [{
55-
"code": "java.method.addedToInterface",
56-
"new": "method io.smallrye.mutiny.groups.MultiDemandPausing<T> io.smallrye.mutiny.Multi<T>::pauseDemand()",
57-
"justification": "add demand pauser to Multi"
58-
}
59-
]
54+
"differences" : [ ]
6055
}
6156
}, {
6257
"extension" : "revapi.reporter.json",

implementation/src/main/java/io/smallrye/mutiny/Multi.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ default Multi<ItemWithContext<T>> attachContext() {
640640
/**
641641
* Allows pausing and resuming demand propagation to upstream using a {@link io.smallrye.mutiny.subscription.DemandPauser}.
642642
* <p>
643-
* Unlike cancellation which terminates the subscription, pausing temporarily suspends demand without unsubscribing.
643+
* Unlike cancellation which terminates the subscription, temporarily pausing suspends the demand without unsubscribing.
644644
* This is useful for implementing flow control patterns where demand needs to be temporarily suspended based on
645645
* external conditions (e.g., downstream system availability, rate limiting, resource constraints).
646646
* <p>
@@ -666,7 +666,9 @@ default Multi<ItemWithContext<T>> attachContext() {
666666
* @see io.smallrye.mutiny.subscription.DemandPauser
667667
*/
668668
@CheckReturnValue
669-
MultiDemandPausing<T> pauseDemand();
669+
default MultiDemandPausing<T> pauseDemand() {
670+
throw new UnsupportedOperationException("Default method added to limit binary incompatibility");
671+
}
670672

671673
/**
672674
* Cap all downstream subscriber requests to a maximum value.

implementation/src/main/java/io/smallrye/mutiny/groups/MultiDemandPausing.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.smallrye.mutiny.groups;
22

33
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
4-
import static io.smallrye.mutiny.helpers.ParameterValidation.positiveOrZero;
4+
import static io.smallrye.mutiny.helpers.ParameterValidation.positive;
55

66
import io.smallrye.common.annotation.CheckReturnValue;
77
import io.smallrye.mutiny.Multi;
@@ -29,6 +29,7 @@ public class MultiDemandPausing<T> {
2929
private boolean paused = false;
3030
private boolean lateSubscription = false;
3131
private int bufferSize = Infrastructure.getMultiOverflowDefaultBufferSize();
32+
private boolean unbounded = false;
3233
private BackPressureStrategy bufferStrategy = BackPressureStrategy.BUFFER;
3334

3435
public MultiDemandPausing(AbstractMulti<T> upstream) {
@@ -74,21 +75,31 @@ public MultiDemandPausing<T> lateSubscription(boolean lateSubscription) {
7475
* Sets the maximum buffer size for already-requested items when using {@link BackPressureStrategy#BUFFER}.
7576
* <p>
7677
* When the stream is paused, items that were already requested from upstream can be buffered.
77-
* This parameter controls the maximum number of items to buffer:
78-
* <ul>
79-
* <li>{@code 0}: Unbounded buffer (uses default buffer size internally)</li>
80-
* <li>{@code n > 0}: Buffer up to {@code n} items, then fail with {@link IllegalStateException}</li>
81-
* </ul>
8278
* <p>
8379
* Note: The buffer only holds items that were already requested from upstream before pausing.
8480
* When paused, no new requests are issued to upstream.
8581
*
86-
* @param bufferSize the maximum buffer size, must be non-negative
82+
* @param bufferSize the maximum buffer size, must be positive
8783
* @return this configuration instance
8884
*/
8985
@CheckReturnValue
9086
public MultiDemandPausing<T> bufferSize(int bufferSize) {
91-
this.bufferSize = positiveOrZero(bufferSize, "bufferSize");
87+
this.bufferSize = positive(bufferSize, "bufferSize");
88+
this.unbounded = false;
89+
return this;
90+
}
91+
92+
/**
93+
* Sets the buffer size for already-requested items to unbounded when using {@link BackPressureStrategy#BUFFER}.
94+
* <p>
95+
* When the stream is paused, items that were already requested from upstream can be buffered.
96+
*
97+
* @return this configuration instance
98+
*/
99+
@CheckReturnValue
100+
public MultiDemandPausing<T> bufferUnconditionally() {
101+
this.bufferSize = Infrastructure.getMultiOverflowDefaultBufferSize();
102+
this.unbounded = true;
92103
return this;
93104
}
94105

@@ -126,7 +137,7 @@ public MultiDemandPausing<T> bufferStrategy(BackPressureStrategy bufferStrategy)
126137
public Multi<T> using(DemandPauser pauser) {
127138
DemandPauser p = nonNull(pauser, "pauser");
128139
MultiDemandPausingOp<T> pausingMulti = new MultiDemandPausingOp<>(upstream,
129-
paused, lateSubscription, bufferSize, bufferStrategy);
140+
paused, lateSubscription, bufferSize, unbounded, bufferStrategy);
130141
p.bind(pausingMulti);
131142
return Infrastructure.onMultiCreation(pausingMulti);
132143
}

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDemandPausingOp.java

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
import java.util.Queue;
44
import java.util.concurrent.Flow;
55
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.atomic.AtomicInteger;
67
import java.util.concurrent.atomic.AtomicLong;
7-
import java.util.function.Supplier;
88

99
import io.smallrye.mutiny.Multi;
1010
import io.smallrye.mutiny.helpers.Subscriptions;
1111
import io.smallrye.mutiny.helpers.queues.Queues;
12-
import io.smallrye.mutiny.infrastructure.Infrastructure;
1312
import io.smallrye.mutiny.operators.MultiOperator;
1413
import io.smallrye.mutiny.subscription.BackPressureStrategy;
1514
import io.smallrye.mutiny.subscription.MultiSubscriber;
@@ -36,14 +35,16 @@ public class MultiDemandPausingOp<T> extends MultiOperator<T, T> implements Paus
3635
private final AtomicBoolean subscribed = new AtomicBoolean();
3736
private final boolean lateSubscription;
3837
private final int bufferSize;
38+
private final boolean unbounded;
3939
private final BackPressureStrategy backPressureStrategy;
4040

4141
public MultiDemandPausingOp(Multi<T> upstream, boolean initiallyPaused, boolean lateSubscription, int bufferSize,
42-
BackPressureStrategy backPressureStrategy) {
42+
boolean unbounded, BackPressureStrategy backPressureStrategy) {
4343
super(upstream);
4444
this.paused = new AtomicBoolean(initiallyPaused);
4545
this.lateSubscription = lateSubscription;
4646
this.bufferSize = bufferSize;
47+
this.unbounded = unbounded;
4748
this.backPressureStrategy = backPressureStrategy;
4849
}
4950

@@ -104,19 +105,16 @@ private class PausableProcessor extends MultiOperatorProcessor<T, T> {
104105

105106
private final AtomicLong demand = new AtomicLong();
106107
private final Queue<T> queue;
107-
private final int maxBufferSize;
108+
private final AtomicInteger strictBoundCounter = new AtomicInteger(0);
108109

109110
PausableProcessor(MultiSubscriber<? super T> downstream) {
110111
super(downstream);
111112
// Determine if we need a queue based on strategy and buffer size
112113
if (backPressureStrategy == BackPressureStrategy.BUFFER) {
113-
Supplier<Queue<T>> qs = bufferSize == 0 ? Queues.unbounded(Infrastructure.getMultiOverflowDefaultBufferSize())
114-
: Queues.get(bufferSize);
115-
this.queue = qs.get();
114+
this.queue = unbounded ? Queues.<T> unbounded(bufferSize).get() : Queues.<T> get(bufferSize).get();
116115
} else {
117116
this.queue = null;
118117
}
119-
this.maxBufferSize = bufferSize;
120118
}
121119

122120
void resume() {
@@ -140,17 +138,22 @@ void drainQueue() {
140138
Queue<T> qe = queue;
141139
// Drain all buffered items - these were already requested from upstream
142140
// so we don't need to check downstream demand here
143-
while (!qe.isEmpty() && !paused.get()) {
141+
while (!paused.get()) {
144142
T item = qe.poll();
145143
if (item == null) {
144+
// queue empty
146145
break;
147146
}
147+
if (!unbounded) {
148+
strictBoundCounter.decrementAndGet();
149+
}
148150
downstream.onItem(item);
149151
}
150152
}
151153

152154
void clearQueue() {
153155
if (queue != null) {
156+
strictBoundCounter.set(0);
154157
queue.clear();
155158
}
156159
}
@@ -162,40 +165,19 @@ int queueSize() {
162165
@Override
163166
public void onItem(T item) {
164167
if (backPressureStrategy != BackPressureStrategy.IGNORE && paused.get()) {
168+
if (backPressureStrategy == BackPressureStrategy.DROP) {
169+
return;
170+
}
165171
// When paused buffer items if necessary
166-
if (!offerToQueue(item)) {
167-
handleOverflow();
172+
if ((!unbounded && strictBoundCounter.getAndIncrement() >= bufferSize) || !queue.offer(item)) {
173+
// Buffer is full, throw exception
174+
onFailure(new IllegalStateException("Buffer overflow: cannot buffer more than " + bufferSize + " items"));
168175
}
169176
} else {
170177
super.onItem(item);
171178
}
172179
}
173180

174-
private void handleOverflow() {
175-
switch (backPressureStrategy) {
176-
case BUFFER:
177-
// Buffer is full, throw exception
178-
onFailure(new IllegalStateException(
179-
"Buffer overflow: cannot buffer more than " + maxBufferSize + " items"));
180-
return;
181-
case DROP:
182-
// Drop the new item (do nothing)
183-
return;
184-
}
185-
}
186-
187-
private boolean offerToQueue(T item) {
188-
if (queue == null) {
189-
return false;
190-
}
191-
// If unbounded (maxBufferSize == 0), always succeed
192-
if (maxBufferSize == 0) {
193-
return queue.offer(item);
194-
}
195-
// For bounded queues, check size before offering
196-
return queue.size() < maxBufferSize && queue.offer(item);
197-
}
198-
199181
@Override
200182
public void request(long numberOfItems) {
201183
if (numberOfItems <= 0) {
@@ -223,8 +205,21 @@ public void request(long numberOfItems) {
223205

224206
@Override
225207
public void cancel() {
208+
clearQueue();
226209
processor = null;
227210
super.cancel();
228211
}
212+
213+
@Override
214+
public void onFailure(Throwable failure) {
215+
clearQueue();
216+
super.onFailure(failure);
217+
}
218+
219+
@Override
220+
public void onCompletion() {
221+
clearQueue();
222+
super.onCompletion();
223+
}
229224
}
230225
}

implementation/src/main/java/io/smallrye/mutiny/subscription/PausableMulti.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
public interface PausableMulti {
1515

1616
/**
17-
* Pauses demand propagation to upstream.
17+
* Pauses demand propagation to the upstream.
1818
*/
1919
void pause();
2020

2121
/**
22-
* Resumes demand propagation to upstream.
22+
* Resumes demand propagation to the upstream.
2323
*/
2424
void resume();
2525

0 commit comments

Comments
 (0)