Skip to content

Commit 3a3d4df

Browse files
JakeWhartonakarnokd
authored andcommitted
Switch (almost) all chain operators to ObservableWithUpstream. (#4319)
1 parent b3f64d9 commit 3a3d4df

16 files changed

+53
-78
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableAll.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
import io.reactivex.internal.disposables.DisposableHelper;
1919
import io.reactivex.plugins.RxJavaPlugins;
2020

21-
public final class ObservableAll<T> extends Observable<Boolean> {
22-
final ObservableSource<T> source;
23-
21+
public final class ObservableAll<T> extends ObservableWithUpstream<T, Boolean> {
2422
final Predicate<? super T> predicate;
2523
public ObservableAll(ObservableSource<T> source, Predicate<? super T> predicate) {
26-
this.source = source;
24+
super(source);
2725
this.predicate = predicate;
2826
}
2927

src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
import io.reactivex.functions.Predicate;
1818
import io.reactivex.internal.disposables.DisposableHelper;
1919

20-
public final class ObservableAny<T> extends Observable<Boolean> {
21-
final ObservableSource<T> source;
20+
public final class ObservableAny<T> extends ObservableWithUpstream<T, Boolean> {
2221
final Predicate<? super T> predicate;
2322
public ObservableAny(ObservableSource<T> source, Predicate<? super T> predicate) {
24-
this.source = source;
23+
super(source);
2524
this.predicate = predicate;
2625
}
2726

src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717
import java.util.concurrent.Callable;
1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

20-
import io.reactivex.Observable;
2120
import io.reactivex.ObservableSource;
2221
import io.reactivex.Observer;
2322
import io.reactivex.disposables.Disposable;
2423
import io.reactivex.internal.disposables.*;
2524

26-
public final class ObservableBuffer<T, U extends Collection<? super T>> extends Observable<U> {
27-
final ObservableSource<T> source;
25+
public final class ObservableBuffer<T, U extends Collection<? super T>> extends ObservableWithUpstream<T, U> {
2826
final int count;
2927
final int skip;
3028
final Callable<U> bufferSupplier;
3129

3230
public ObservableBuffer(ObservableSource<T> source, int count, int skip, Callable<U> bufferSupplier) {
33-
this.source = source;
31+
super(source);
3432
this.count = count;
3533
this.skip = skip;
3634
this.bufferSupplier = bufferSupplier;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.concurrent.Callable;
1818
import java.util.concurrent.atomic.AtomicInteger;
1919

20-
import io.reactivex.Observable;
2120
import io.reactivex.ObservableSource;
2221
import io.reactivex.Observer;
2322
import io.reactivex.disposables.*;
@@ -31,15 +30,14 @@
3130
import io.reactivex.plugins.RxJavaPlugins;
3231

3332
public final class ObservableBufferBoundary<T, U extends Collection<? super T>, Open, Close>
34-
extends Observable<U> {
35-
final ObservableSource<T> source;
33+
extends ObservableWithUpstream<T, U> {
3634
final Callable<U> bufferSupplier;
3735
final ObservableSource<? extends Open> bufferOpen;
3836
final Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose;
3937

4038
public ObservableBufferBoundary(ObservableSource<T> source, ObservableSource<? extends Open> bufferOpen,
4139
Function<? super Open, ? extends ObservableSource<? extends Close>> bufferClose, Callable<U> bufferSupplier) {
42-
this.source = source;
40+
super(source);
4341
this.bufferOpen = bufferOpen;
4442
this.bufferClose = bufferClose;
4543
this.bufferSupplier = bufferSupplier;

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@
2727
import io.reactivex.plugins.RxJavaPlugins;
2828

2929
public final class ObservableBufferBoundarySupplier<T, U extends Collection<? super T>, B>
30-
extends Observable<U> {
31-
final ObservableSource<T> source;
30+
extends ObservableWithUpstream<T, U> {
3231
final Callable<? extends ObservableSource<B>> boundarySupplier;
3332
final Callable<U> bufferSupplier;
3433

3534
public ObservableBufferBoundarySupplier(ObservableSource<T> source, Callable<? extends ObservableSource<B>> boundarySupplier, Callable<U> bufferSupplier) {
36-
this.source = source;
35+
super(source);
3736
this.boundarySupplier = boundarySupplier;
3837
this.bufferSupplier = bufferSupplier;
3938
}

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@
2525
import io.reactivex.observers.SerializedObserver;
2626

2727
public final class ObservableBufferExactBoundary<T, U extends Collection<? super T>, B>
28-
extends Observable<U> {
29-
final ObservableSource<T> source;
28+
extends ObservableWithUpstream<T, U> {
3029
final ObservableSource<B> boundary;
3130
final Callable<U> bufferSupplier;
3231

3332
public ObservableBufferExactBoundary(ObservableSource<T> source, ObservableSource<B> boundary, Callable<U> bufferSupplier) {
34-
this.source = source;
33+
super(source);
3534
this.boundary = boundary;
3635
this.bufferSupplier = bufferSupplier;
3736
}

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.concurrent.atomic.AtomicReference;
1919

2020
import io.reactivex.*;
21-
import io.reactivex.Observable;
2221
import io.reactivex.Observer;
2322
import io.reactivex.Scheduler.Worker;
2423
import io.reactivex.disposables.Disposable;
@@ -29,9 +28,8 @@
2928
import io.reactivex.observers.SerializedObserver;
3029

3130
public final class ObservableBufferTimed<T, U extends Collection<? super T>>
32-
extends Observable<U> {
31+
extends ObservableWithUpstream<T, U> {
3332

34-
final ObservableSource<T> source;
3533
final long timespan;
3634
final long timeskip;
3735
final TimeUnit unit;
@@ -42,7 +40,7 @@ public final class ObservableBufferTimed<T, U extends Collection<? super T>>
4240

4341
public ObservableBufferTimed(ObservableSource<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier, int maxSize,
4442
boolean restartTimerOnMaxSize) {
45-
this.source = source;
43+
super(source);
4644
this.timespan = timespan;
4745
this.timeskip = timeskip;
4846
this.unit = unit;

src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
*
2626
* @param <T> the source element type
2727
*/
28-
public final class ObservableCache<T> extends Observable<T> {
28+
public final class ObservableCache<T> extends ObservableWithUpstream<T, T> {
2929
/** The cache and replay state. */
3030
final CacheState<T> state;
3131

@@ -37,7 +37,7 @@ public final class ObservableCache<T> extends Observable<T> {
3737
* @param source the source Observable to cache
3838
* @return the CachedObservable instance
3939
*/
40-
public static <T> ObservableCache<T> from(Observable<? extends T> source) {
40+
public static <T> ObservableCache<T> from(Observable<T> source) {
4141
return from(source, 16);
4242
}
4343

@@ -48,12 +48,12 @@ public static <T> ObservableCache<T> from(Observable<? extends T> source) {
4848
* @param capacityHint the hint for the internal buffer size
4949
* @return the CachedObservable instance
5050
*/
51-
public static <T> ObservableCache<T> from(Observable<? extends T> source, int capacityHint) {
51+
public static <T> ObservableCache<T> from(Observable<T> source, int capacityHint) {
5252
if (capacityHint < 1) {
5353
throw new IllegalArgumentException("capacityHint > 0 required");
5454
}
5555
CacheState<T> state = new CacheState<T>(source, capacityHint);
56-
return new ObservableCache<T>(state);
56+
return new ObservableCache<T>(source, state);
5757
}
5858

5959
/**
@@ -62,7 +62,8 @@ public static <T> ObservableCache<T> from(Observable<? extends T> source, int ca
6262
* @param onSubscribe
6363
* @param state
6464
*/
65-
private ObservableCache(CacheState<T> state) {
65+
private ObservableCache(Observable<T> source, CacheState<T> state) {
66+
super(source);
6667
this.state = state;
6768
this.once = new AtomicBoolean();
6869
}

src/main/java/io/reactivex/internal/operators/observable/ObservableLift.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,12 @@
2525
* @param <T> the upstream value type
2626
* @param <R> the downstream parameter type
2727
*/
28-
public final class ObservableLift<R, T> extends Observable<R> {
28+
public final class ObservableLift<R, T> extends ObservableWithUpstream<T, R> {
2929
/** The actual operator. */
3030
final ObservableOperator<? extends R, ? super T> operator;
31-
/** The source publisher. */
32-
final ObservableSource<? extends T> source;
33-
34-
public ObservableLift(ObservableSource<? extends T> source, ObservableOperator<? extends R, ? super T> operator) {
35-
this.source = source;
31+
32+
public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
33+
super(source);
3634
this.operator = operator;
3735
}
3836

@@ -44,14 +42,6 @@ public ObservableLift(ObservableSource<? extends T> source, ObservableOperator<?
4442
return operator;
4543
}
4644

47-
/**
48-
* Returns the source of this lift publisher.
49-
* @return the source of this lift publisher
50-
*/
51-
public ObservableSource<? extends T> source() {
52-
return source;
53-
}
54-
5545
@Override
5646
public void subscribeActual(Observer<? super R> s) {
5747
try {

src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@
2222
import io.reactivex.internal.subscribers.observable.ToNotificationObserver;
2323
import io.reactivex.subjects.BehaviorSubject;
2424

25-
public final class ObservableRedo<T> extends Observable<T> {
26-
final ObservableSource<? extends T> source;
25+
public final class ObservableRedo<T> extends ObservableWithUpstream<T, T> {
2726
final Function<? super Observable<Try<Optional<Object>>>, ? extends ObservableSource<?>> manager;
2827

29-
public ObservableRedo(ObservableSource<? extends T> source,
28+
public ObservableRedo(ObservableSource<T> source,
3029
Function<? super Observable<Try<Optional<Object>>>, ? extends ObservableSource<?>> manager) {
31-
this.source = source;
30+
super(source);
3231
this.manager = manager;
3332
}
3433

0 commit comments

Comments
 (0)