Skip to content

Commit e28dc4b

Browse files
ADI1133akarnokd
authored andcommitted
Merge FlowableEmitter.BackpressureMode into BackpressureStrategy (#4729)
* Merge FlowableEmitter.BackpressureMode into BackpressureStrategy * Cleanup code
1 parent f8b6fbd commit e28dc4b

File tree

11 files changed

+234
-121
lines changed

11 files changed

+234
-121
lines changed

src/main/java/io/reactivex/BackpressureStrategy.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,26 @@
1818
*/
1919
public enum BackpressureStrategy {
2020
/**
21-
* Buffer all values (unbounded) until there is a downstream demand for it.
21+
* OnNext events are written without any buffering or dropping.
22+
* Downstream has to deal with any overflow.
23+
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
24+
*/
25+
NONE,
26+
/**
27+
* Signals a MissingBackpressureException in case the downstream can't keep up.
28+
*/
29+
ERROR,
30+
/**
31+
* Buffers <em>all</em> onNext values until the downstream consumes it.
2232
*/
2333
BUFFER,
2434
/**
25-
* Drop the value if there is no current demand for it from the downstream.
35+
* Drops the most recent onNext value if the downstream can't keep up.
2636
*/
2737
DROP,
2838
/**
29-
* Have a latest value always available and overwrite it with more recent ones
30-
* if there is no demand for it from the downstream.
39+
* Keeps only the latest onNext value, overwriting any previous value if the
40+
* downstream can't keep up.
3141
*/
3242
LATEST
3343
}

src/main/java/io/reactivex/Flowable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,7 +1525,7 @@ public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends
15251525
*
15261526
* emitter.setCancellable(c::close);
15271527
*
1528-
* }, BackpressureMode.BUFFER);
1528+
* }, BackpressureStrategy.BUFFER);
15291529
* </code></pre>
15301530
* <p>
15311531
* You should call the FlowableEmitter onNext, onError and onComplete methods in a serialized fashion. The
@@ -1542,12 +1542,12 @@ public static <T> Flowable<T> concatEager(Iterable<? extends Publisher<? extends
15421542
* @param mode the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
15431543
* @return the new Flowable instance
15441544
* @see FlowableOnSubscribe
1545-
* @see FlowableEmitter.BackpressureMode
1545+
* @see BackpressureStrategy
15461546
* @see Cancellable
15471547
*/
15481548
@BackpressureSupport(BackpressureKind.SPECIAL)
15491549
@SchedulerSupport(SchedulerSupport.NONE)
1550-
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, FlowableEmitter.BackpressureMode mode) {
1550+
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
15511551
ObjectHelper.requireNonNull(source, "source is null");
15521552
ObjectHelper.requireNonNull(mode, "mode is null");
15531553
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));

src/main/java/io/reactivex/FlowableEmitter.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -63,33 +63,4 @@ public interface FlowableEmitter<T> extends Emitter<T> {
6363
* @return the serialized FlowableEmitter
6464
*/
6565
FlowableEmitter<T> serialize();
66-
67-
/**
68-
* Options to handle backpressure in the emitter.
69-
*/
70-
enum BackpressureMode {
71-
/**
72-
* OnNext events are written without any buffering or dropping.
73-
* Downstream has to deal with any overflow.
74-
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
75-
*/
76-
NONE,
77-
/**
78-
* Signals a MissingBackpressureException in case the downstream can't keep up.
79-
*/
80-
ERROR,
81-
/**
82-
* Buffers <em>all</em> onNext values until the downstream consumes it.
83-
*/
84-
BUFFER,
85-
/**
86-
* Drops the most recent onNext value if the downstream can't keep up.
87-
*/
88-
DROP,
89-
/**
90-
* Keeps only the latest onNext value, overwriting any previous value if the
91-
* downstream can't keep up.
92-
*/
93-
LATEST
94-
}
9566
}

src/main/java/io/reactivex/Observable.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.*;
1717
import java.util.concurrent.*;
1818

19+
import io.reactivex.internal.operators.flowable.FlowableOnBackpressureError;
1920
import org.reactivestreams.Publisher;
2021

2122
import io.reactivex.annotations.*;
@@ -11707,12 +11708,16 @@ public final Flowable<T> toFlowable(BackpressureStrategy strategy) {
1170711708
Flowable<T> o = new FlowableFromObservable<T>(this);
1170811709

1170911710
switch (strategy) {
11710-
case DROP:
11711-
return o.onBackpressureDrop();
11712-
case LATEST:
11713-
return o.onBackpressureLatest();
11714-
default:
11715-
return o.onBackpressureBuffer();
11711+
case DROP:
11712+
return o.onBackpressureDrop();
11713+
case LATEST:
11714+
return o.onBackpressureLatest();
11715+
case NONE:
11716+
return o;
11717+
case ERROR:
11718+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(o));
11719+
default:
11720+
return o.onBackpressureBuffer();
1171611721
}
1171711722
}
1171811723

src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public final class FlowableCreate<T> extends Flowable<T> {
3333

3434
final FlowableOnSubscribe<T> source;
3535

36-
final FlowableEmitter.BackpressureMode backpressure;
36+
final BackpressureStrategy backpressure;
3737

38-
public FlowableCreate(FlowableOnSubscribe<T> source, FlowableEmitter.BackpressureMode backpressure) {
38+
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
3939
this.source = source;
4040
this.backpressure = backpressure;
4141
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import io.reactivex.exceptions.MissingBackpressureException;
17+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
18+
import io.reactivex.internal.util.BackpressureHelper;
19+
import io.reactivex.plugins.RxJavaPlugins;
20+
import org.reactivestreams.Publisher;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
24+
import java.util.concurrent.atomic.AtomicLong;
25+
26+
public final class FlowableOnBackpressureError<T> extends AbstractFlowableWithUpstream<T, T> {
27+
28+
29+
public FlowableOnBackpressureError(Publisher<T> source) {
30+
super(source);
31+
}
32+
33+
34+
@Override
35+
protected void subscribeActual(Subscriber<? super T> s) {
36+
this.source.subscribe(new BackpressureErrorSubscriber<T>(s));
37+
}
38+
39+
static final class BackpressureErrorSubscriber<T>
40+
extends AtomicLong implements Subscriber<T>, Subscription {
41+
42+
final Subscriber<? super T> actual;
43+
Subscription s;
44+
boolean done;
45+
46+
BackpressureErrorSubscriber(Subscriber<? super T> actual) {
47+
this.actual = actual;
48+
}
49+
50+
@Override
51+
public void onSubscribe(Subscription s) {
52+
if (SubscriptionHelper.validate(this.s, s)) {
53+
this.s = s;
54+
actual.onSubscribe(this);
55+
s.request(Long.MAX_VALUE);
56+
}
57+
}
58+
59+
@Override
60+
public void onNext(T t) {
61+
if (done) {
62+
return;
63+
}
64+
long r = get();
65+
if (r != 0L) {
66+
actual.onNext(t);
67+
if (r != Long.MAX_VALUE) {
68+
decrementAndGet();
69+
}
70+
} else {
71+
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
72+
}
73+
}
74+
75+
@Override
76+
public void onError(Throwable t) {
77+
if (done) {
78+
RxJavaPlugins.onError(t);
79+
return;
80+
}
81+
done = true;
82+
actual.onError(t);
83+
}
84+
85+
@Override
86+
public void onComplete() {
87+
if (done) {
88+
return;
89+
}
90+
done = true;
91+
actual.onComplete();
92+
}
93+
94+
@Override
95+
public void request(long n) {
96+
if (SubscriptionHelper.validate(n)) {
97+
BackpressureHelper.add(this, n);
98+
}
99+
}
100+
101+
@Override
102+
public void cancel() {
103+
s.cancel();
104+
}
105+
}
106+
}

src/test/java/io/reactivex/BackpressureEnumTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void backpressureOverflowStrategy() {
3232

3333
@Test
3434
public void backpressureStrategy() {
35-
assertEquals(3, BackpressureStrategy.values().length);
35+
assertEquals(5, BackpressureStrategy.values().length);
3636

3737
assertNotNull(BackpressureStrategy.valueOf("BUFFER"));
3838
}

0 commit comments

Comments
 (0)