Skip to content

Commit 0f4a5e7

Browse files
authored
2.x: add limit() to limit both item count and request amount (#5655)
* 2.x: add limit() to limit both item count and request amount * Address some feedback. * Use the cancelled constant value.
1 parent ddd9b67 commit 0f4a5e7

File tree

4 files changed

+396
-1
lines changed

4 files changed

+396
-1
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9676,6 +9676,53 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
96769676
return RxJavaPlugins.onAssembly(new FlowableLift<R, T>(this, lifter));
96779677
}
96789678

9679+
/**
9680+
* Limits both the number of upstream items (after which the sequence completes)
9681+
* and the total downstream request amount requested from the upstream to
9682+
* possibly prevent the creation of excess items by the upstream.
9683+
* <p>
9684+
* The operator requests at most the given {@code count} of items from upstream even
9685+
* if the downstream requests more than that. For example, given a {@code limit(5)},
9686+
* if the downstream requests 1, a request of 1 is submitted to the upstream
9687+
* and the operator remembers that only 4 items can be requested now on. A request
9688+
* of 5 at this point will request 4 from the upstream and any subsequent requests will
9689+
* be ignored.
9690+
* <p>
9691+
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
9692+
* may not be preserved further upstream. For example,
9693+
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
9694+
* default (128) elements from the given {@code source}.
9695+
* <p>
9696+
* The main use of this operator is with sources that are async boundaries that
9697+
* don't interfere with request amounts, such as certain {@code Flowable}-based
9698+
* network endpoints that relay downstream request amounts unchanged and are, therefore,
9699+
* prone to trigger excessive item creation/transmission over the network.
9700+
* <dl>
9701+
* <dt><b>Backpressure:</b></dt>
9702+
* <dd>The operator requests a total of the given {@code count} items from the upstream.</dd>
9703+
* <dt><b>Scheduler:</b></dt>
9704+
* <dd>{@code limit} does not operate by default on a particular {@link Scheduler}.</dd>
9705+
* </dl>
9706+
9707+
* @param count the maximum number of items and the total request amount, non-negative.
9708+
* Zero will immediately cancel the upstream on subscription and complete
9709+
* the downstream.
9710+
* @return the new Flowable instance
9711+
* @see #take(long)
9712+
* @see #rebatchRequests(int)
9713+
* @since 2.1.6 - experimental
9714+
*/
9715+
@Experimental
9716+
@BackpressureSupport(BackpressureKind.SPECIAL)
9717+
@SchedulerSupport(SchedulerSupport.NONE)
9718+
@CheckReturnValue
9719+
public final Flowable<T> limit(long count) {
9720+
if (count < 0) {
9721+
throw new IllegalArgumentException("count >= 0 required but it was " + count);
9722+
}
9723+
return RxJavaPlugins.onAssembly(new FlowableLimit<T>(this, count));
9724+
}
9725+
96799726
/**
96809727
* Returns a Flowable that applies a specified function to each item emitted by the source Publisher and
96819728
* emits the results of these function applications.
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.annotations.Experimental;
22+
import io.reactivex.internal.subscriptions.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
24+
25+
/**
26+
* Limits both the total request amount and items received from the upstream.
27+
*
28+
* @param <T> the source and output value type
29+
* @since 2.1.6 - experimental
30+
*/
31+
@Experimental
32+
public final class FlowableLimit<T> extends AbstractFlowableWithUpstream<T, T> {
33+
34+
final long n;
35+
36+
public FlowableLimit(Flowable<T> source, long n) {
37+
super(source);
38+
this.n = n;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(Subscriber<? super T> s) {
43+
source.subscribe(new LimitSubscriber<T>(s, n));
44+
}
45+
46+
static final class LimitSubscriber<T>
47+
extends AtomicLong
48+
implements FlowableSubscriber<T>, Subscription {
49+
50+
private static final long serialVersionUID = 2288246011222124525L;
51+
52+
final Subscriber<? super T> actual;
53+
54+
long remaining;
55+
56+
Subscription upstream;
57+
58+
LimitSubscriber(Subscriber<? super T> actual, long remaining) {
59+
this.actual = actual;
60+
this.remaining = remaining;
61+
lazySet(remaining);
62+
}
63+
64+
@Override
65+
public void onSubscribe(Subscription s) {
66+
if (SubscriptionHelper.validate(this.upstream, s)) {
67+
if (remaining == 0L) {
68+
s.cancel();
69+
EmptySubscription.complete(actual);
70+
} else {
71+
this.upstream = s;
72+
actual.onSubscribe(this);
73+
}
74+
}
75+
}
76+
77+
@Override
78+
public void onNext(T t) {
79+
long r = remaining;
80+
if (r > 0L) {
81+
remaining = --r;
82+
actual.onNext(t);
83+
if (r == 0L) {
84+
upstream.cancel();
85+
actual.onComplete();
86+
}
87+
}
88+
}
89+
90+
@Override
91+
public void onError(Throwable t) {
92+
if (remaining > 0L) {
93+
remaining = 0L;
94+
actual.onError(t);
95+
} else {
96+
RxJavaPlugins.onError(t);
97+
}
98+
}
99+
100+
@Override
101+
public void onComplete() {
102+
if (remaining > 0L) {
103+
remaining = 0L;
104+
actual.onComplete();
105+
}
106+
}
107+
108+
@Override
109+
public void request(long n) {
110+
if (SubscriptionHelper.validate(n)) {
111+
for (;;) {
112+
long r = get();
113+
if (r == 0L) {
114+
break;
115+
}
116+
long toRequest;
117+
if (r <= n) {
118+
toRequest = r;
119+
} else {
120+
toRequest = n;
121+
}
122+
long u = r - toRequest;
123+
if (compareAndSet(r, u)) {
124+
upstream.request(toRequest);
125+
break;
126+
}
127+
}
128+
}
129+
}
130+
131+
@Override
132+
public void cancel() {
133+
upstream.cancel();
134+
}
135+
136+
}
137+
}

src/test/java/io/reactivex/ParamValidationCheckerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,9 @@ public void checkParallelFlowable() {
182182
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class));
183183
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class, Scheduler.class));
184184

185-
// zero retry is allowed
185+
// zero take/limit is allowed
186186
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "take", Long.TYPE));
187+
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "limit", Long.TYPE));
187188

188189
// negative time is considered as zero time
189190
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class));

0 commit comments

Comments
 (0)