Skip to content

Commit b3f64d9

Browse files
JakeWhartonakarnokd
authored andcommitted
Switch (almost) all chain operators to FlowableWithUpstream. (#4318)
1 parent 5ae5478 commit b3f64d9

File tree

64 files changed

+144
-270
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+144
-270
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableAll.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,16 @@
1414

1515
import org.reactivestreams.*;
1616

17-
import io.reactivex.Flowable;
1817
import io.reactivex.functions.Predicate;
1918
import io.reactivex.internal.subscriptions.*;
2019
import io.reactivex.plugins.RxJavaPlugins;
2120

22-
public final class FlowableAll<T> extends Flowable<Boolean> {
23-
24-
final Publisher<T> source;
21+
public final class FlowableAll<T> extends FlowableWithUpstream<T, Boolean> {
2522

2623
final Predicate<? super T> predicate;
2724

2825
public FlowableAll(Publisher<T> source, Predicate<? super T> predicate) {
29-
this.source = source;
26+
super(source);
3027
this.predicate = predicate;
3128
}
3229

src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414

1515
import org.reactivestreams.*;
1616

17-
import io.reactivex.Flowable;
1817
import io.reactivex.functions.Predicate;
1918
import io.reactivex.internal.subscriptions.*;
2019

21-
public final class FlowableAny<T> extends Flowable<Boolean> {
22-
final Publisher<T> source;
20+
public final class FlowableAny<T> extends FlowableWithUpstream<T, Boolean> {
2321
final Predicate<? super T> predicate;
2422
public FlowableAny(Publisher<T> source, Predicate<? super T> predicate) {
25-
this.source = source;
23+
super(source);
2624
this.predicate = predicate;
2725
}
2826

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.reactivestreams.*;
2121

22-
import io.reactivex.Flowable;
2322
import io.reactivex.disposables.*;
2423
import io.reactivex.functions.Function;
2524
import io.reactivex.internal.fuseable.SimpleQueue;
@@ -31,15 +30,14 @@
3130
import io.reactivex.subscribers.*;
3231

3332
public final class FlowableBufferBoundary<T, U extends Collection<? super T>, Open, Close>
34-
extends Flowable<U> {
35-
final Publisher<T> source;
33+
extends FlowableWithUpstream<T, U> {
3634
final Callable<U> bufferSupplier;
3735
final Publisher<? extends Open> bufferOpen;
3836
final Function<? super Open, ? extends Publisher<? extends Close>> bufferClose;
3937

4038
public FlowableBufferBoundary(Publisher<T> source, Publisher<? extends Open> bufferOpen,
4139
Function<? super Open, ? extends Publisher<? extends Close>> bufferClose, Callable<U> bufferSupplier) {
42-
this.source = source;
40+
super(source);
4341
this.bufferOpen = bufferOpen;
4442
this.bufferClose = bufferClose;
4543
this.bufferSupplier = bufferSupplier;

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.reactivestreams.*;
2121

22-
import io.reactivex.Flowable;
2322
import io.reactivex.disposables.Disposable;
2423
import io.reactivex.internal.disposables.DisposableHelper;
2524
import io.reactivex.internal.queue.MpscLinkedQueue;
@@ -30,13 +29,12 @@
3029
import io.reactivex.subscribers.*;
3130

3231
public final class FlowableBufferBoundarySupplier<T, U extends Collection<? super T>, B>
33-
extends Flowable<U> {
34-
final Publisher<T> source;
32+
extends FlowableWithUpstream<T, U> {
3533
final Callable<? extends Publisher<B>> boundarySupplier;
3634
final Callable<U> bufferSupplier;
3735

3836
public FlowableBufferBoundarySupplier(Publisher<T> source, Callable<? extends Publisher<B>> boundarySupplier, Callable<U> bufferSupplier) {
39-
this.source = source;
37+
super(source);
4038
this.boundarySupplier = boundarySupplier;
4139
this.bufferSupplier = bufferSupplier;
4240
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.reactivestreams.*;
2020

21-
import io.reactivex.Flowable;
2221
import io.reactivex.disposables.Disposable;
2322
import io.reactivex.internal.queue.MpscLinkedQueue;
2423
import io.reactivex.internal.subscribers.flowable.QueueDrainSubscriber;
@@ -27,13 +26,12 @@
2726
import io.reactivex.subscribers.*;
2827

2928
public final class FlowableBufferExactBoundary<T, U extends Collection<? super T>, B>
30-
extends Flowable<U> {
31-
final Publisher<T> source;
29+
extends FlowableWithUpstream<T, U> {
3230
final Publisher<B> boundary;
3331
final Callable<U> bufferSupplier;
3432

3533
public FlowableBufferExactBoundary(Publisher<T> source, Publisher<B> boundary, Callable<U> bufferSupplier) {
36-
this.source = source;
34+
super(source);
3735
this.boundary = boundary;
3836
this.bufferSupplier = bufferSupplier;
3937
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@
2929
import io.reactivex.internal.util.QueueDrainHelper;
3030
import io.reactivex.subscribers.SerializedSubscriber;
3131

32-
public final class FlowableBufferTimed<T, U extends Collection<? super T>> extends Flowable<U> {
32+
public final class FlowableBufferTimed<T, U extends Collection<? super T>> extends FlowableWithUpstream<T, U> {
3333

34-
final Publisher<T> source;
3534
final long timespan;
3635
final long timeskip;
3736
final TimeUnit unit;
@@ -42,7 +41,7 @@ public final class FlowableBufferTimed<T, U extends Collection<? super T>> exten
4241

4342
public FlowableBufferTimed(Publisher<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier, int maxSize,
4443
boolean restartTimerOnMaxSize) {
45-
this.source = source;
44+
super(source);
4645
this.timespan = timespan;
4746
this.timeskip = timeskip;
4847
this.unit = unit;

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*
2929
* @param <T> the source element type
3030
*/
31-
public final class FlowableCache<T> extends Flowable<T> {
31+
public final class FlowableCache<T> extends FlowableWithUpstream<T, T> {
3232
/** The cache and replay state. */
3333
final CacheState<T> state;
3434

@@ -39,7 +39,7 @@ public final class FlowableCache<T> extends Flowable<T> {
3939
* @param source the source Observable to cache
4040
* @return the CachedObservable instance
4141
*/
42-
public static <T> FlowableCache<T> from(Flowable<? extends T> source) {
42+
public static <T> FlowableCache<T> from(Flowable<T> source) {
4343
return from(source, 16);
4444
}
4545

@@ -50,20 +50,21 @@ public static <T> FlowableCache<T> from(Flowable<? extends T> source) {
5050
* @param capacityHint the hint for the internal buffer size
5151
* @return the CachedObservable instance
5252
*/
53-
public static <T> FlowableCache<T> from(Flowable<? extends T> source, int capacityHint) {
53+
public static <T> FlowableCache<T> from(Flowable<T> source, int capacityHint) {
5454
if (capacityHint < 1) {
5555
throw new IllegalArgumentException("capacityHint > 0 required");
5656
}
5757
CacheState<T> state = new CacheState<T>(source, capacityHint);
58-
return new FlowableCache<T>(state);
58+
return new FlowableCache<T>(source, state);
5959
}
6060

6161
/**
6262
* Private constructor because state needs to be shared between the Observable body and
6363
* the onSubscribe function.
6464
* @param state
6565
*/
66-
private FlowableCache(CacheState<T> state) {
66+
private FlowableCache(Flowable<T> source, CacheState<T> state) {
67+
super(source);
6768
this.state = state;
6869
this.once = new AtomicBoolean();
6970
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,17 @@
1616

1717
import org.reactivestreams.*;
1818

19-
import io.reactivex.Flowable;
2019
import io.reactivex.exceptions.Exceptions;
2120
import io.reactivex.functions.BiConsumer;
2221
import io.reactivex.internal.subscriptions.*;
2322

24-
public final class FlowableCollect<T, U> extends Flowable<U> {
23+
public final class FlowableCollect<T, U> extends FlowableWithUpstream<T, U> {
2524

26-
final Publisher<T> source;
2725
final Callable<? extends U> initialSupplier;
2826
final BiConsumer<? super U, ? super T> collector;
2927

3028
public FlowableCollect(Publisher<T> source, Callable<? extends U> initialSupplier, BiConsumer<? super U, ? super T> collector) {
31-
this.source = source;
29+
super(source);
3230
this.initialSupplier = initialSupplier;
3331
this.collector = collector;
3432
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableCount.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515

1616
import org.reactivestreams.*;
1717

18-
import io.reactivex.Flowable;
1918
import io.reactivex.internal.subscriptions.*;
2019

21-
public final class FlowableCount<T> extends Flowable<Long> {
22-
23-
final Publisher<T> source;
20+
public final class FlowableCount<T> extends FlowableWithUpstream<T, Long> {
2421

2522
public FlowableCount(Publisher<T> source) {
26-
this.source = source;
23+
super(source);
2724
}
2825

2926
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.Flowable;
2120
import io.reactivex.disposables.Disposable;
2221
import io.reactivex.functions.Function;
2322
import io.reactivex.internal.disposables.DisposableHelper;
@@ -26,12 +25,11 @@
2625
import io.reactivex.plugins.RxJavaPlugins;
2726
import io.reactivex.subscribers.*;
2827

29-
public final class FlowableDebounce<T, U> extends Flowable<T> {
30-
final Publisher<T> source;
28+
public final class FlowableDebounce<T, U> extends FlowableWithUpstream<T, T> {
3129
final Function<? super T, ? extends Publisher<U>> debounceSelector;
3230

3331
public FlowableDebounce(Publisher<T> source, Function<? super T, ? extends Publisher<U>> debounceSelector) {
34-
this.source = source;
32+
super(source);
3533
this.debounceSelector = debounceSelector;
3634
}
3735

0 commit comments

Comments
 (0)