Skip to content

Commit cc8bf6d

Browse files
Backpressure Fixes and Docs
Docs for operators that don't support backpressure, particularly all the temporal operators. Fixes for several that needed to request(1) or request(Long.MAX_VALUE).
1 parent 077d70a commit cc8bf6d

21 files changed

+524
-3
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 395 additions & 1 deletion
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,12 @@ private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
6262

6363
@Override
6464
public void request(long n) {
65+
if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
66+
// already started with fast-path
67+
return;
68+
}
6569
if (n == Long.MAX_VALUE) {
70+
REQUESTED_UPDATER.set(this, n);
6671
// fast-path without backpressure
6772
while (it.hasNext()) {
6873
if (o.isUnsubscribed()) {

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
5656

5757
@Override
5858
public void request(long n) {
59+
if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
60+
// already started with fast-path
61+
return;
62+
}
5963
if (n == Long.MAX_VALUE) {
64+
REQUESTED_UPDATER.set(this, n);
6065
// fast-path without backpressure
6166
for (long i = index; i <= end; i++) {
6267
if (o.isUnsubscribed()) {

rxjava-core/src/main/java/rx/internal/operators/OperatorDebounceWithSelector.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4545
return new Subscriber<T>(child) {
4646
final DebounceState<T> state = new DebounceState<T>();
4747
final Subscriber<?> self = this;
48+
49+
@Override
50+
public void onStart() {
51+
// debounce wants to receive everything as a firehose without backpressure
52+
request(Long.MAX_VALUE);
53+
}
54+
4855
@Override
4956
public void onNext(T t) {
5057
Observable<U> debouncer;

rxjava-core/src/main/java/rx/internal/operators/OperatorDistinct.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public void onNext(T t) {
4444
U key = keySelector.call(t);
4545
if (keyMemory.add(key)) {
4646
child.onNext(t);
47+
} else {
48+
request(1);
4749
}
4850
}
4951

rxjava-core/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public void onNext(T t) {
4545
if (hasPrevious) {
4646
if (!(currentKey == key || (key != null && key.equals(currentKey)))) {
4747
child.onNext(t);
48+
} else {
49+
request(1);
4850
}
4951
} else {
5052
hasPrevious = true;

rxjava-core/src/main/java/rx/internal/operators/OperatorElementAt.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public void onNext(T value) {
5555
if (currentIndex == index) {
5656
subscriber.onNext(value);
5757
subscriber.onCompleted();
58+
} else {
59+
request(1);
5860
}
5961
currentIndex++;
6062
}

rxjava-core/src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<?
7474
static final AtomicIntegerFieldUpdater<GroupBySubscriber> TERMINATED_UPDATER
7575
= AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "terminated");
7676

77+
@Override
78+
public void onStart() {
79+
/*
80+
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable" and
81+
* blocking any one group would block the entire parent stream. If backpressure is needed on individual groups then
82+
* operators such as `onBackpressureDrop` or `onBackpressureBuffer` should be used.
83+
*/
84+
request(Long.MAX_VALUE);
85+
}
86+
7787
@Override
7888
public void onCompleted() {
7989
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {

rxjava-core/src/main/java/rx/internal/operators/OperatorGroupByUntil.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, R>> ch
7070
/** Guarded by guard. */
7171
Map<K, GroupSubject<K, R>> groups = new HashMap<K, GroupSubject<K, R>>();
7272

73+
@Override
74+
public void onStart() {
75+
/*
76+
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable" and
77+
* blocking any one group would block the entire parent stream. If backpressure is needed on individual groups then
78+
* operators such as `onBackpressureDrop` or `onBackpressureBuffer` should be used.
79+
*/
80+
request(Long.MAX_VALUE);
81+
}
82+
7383
final Subscriber<T> self = this;
7484
@Override
7585
public void onNext(T t) {

rxjava-core/src/main/java/rx/internal/operators/OperatorSampleWithTime.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ static final class SamplerSubscriber<T> extends Subscriber<T> implements Action0
7070
public SamplerSubscriber(Subscriber<? super T> subscriber) {
7171
this.subscriber = subscriber;
7272
}
73+
74+
@Override
75+
public void onStart() {
76+
request(Long.MAX_VALUE);
77+
}
78+
7379
@Override
7480
public void onNext(T t) {
7581
value = t;

0 commit comments

Comments
 (0)