Skip to content

Commit a4d3f72

Browse files
JakeWhartonakarnokd
authored andcommitted
Add 'WithUpstream' interfaces to connectable implementations. (#4333)
1 parent d37f29e commit a4d3f72

File tree

4 files changed

+46
-26
lines changed

4 files changed

+46
-26
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
* manner.
3434
* @param <T> the value type
3535
*/
36-
public final class FlowablePublish<T> extends ConnectableFlowable<T> {
36+
public final class FlowablePublish<T> extends ConnectableFlowable<T> implements FlowableWithUpstream<T> {
3737
/** The source observable. */
38-
final Publisher<? extends T> source;
38+
final Publisher<T> source;
3939
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
4040
final AtomicReference<PublishSubscriber<T>> current;
4141

@@ -51,7 +51,7 @@ public final class FlowablePublish<T> extends ConnectableFlowable<T> {
5151
* @param bufferSize the size of the prefetch buffer
5252
* @return the connectable observable
5353
*/
54-
public static <T> ConnectableFlowable<T> create(Flowable<? extends T> source, final int bufferSize) {
54+
public static <T> ConnectableFlowable<T> create(Flowable<T> source, final int bufferSize) {
5555
// the current connection to source needs to be shared between the operator and its onSubscribe call
5656
final AtomicReference<PublishSubscriber<T>> curr = new AtomicReference<PublishSubscriber<T>>();
5757
Publisher<T> onSubscribe = new Publisher<T>() {
@@ -121,7 +121,7 @@ public void subscribe(Subscriber<? super T> child) {
121121
return new FlowablePublish<T>(onSubscribe, source, curr, bufferSize);
122122
}
123123

124-
public static <T, R> Flowable<R> create(final Flowable<? extends T> source,
124+
public static <T, R> Flowable<R> create(final Flowable<T> source,
125125
final Function<? super Flowable<T>, ? extends Publisher<R>> selector, final int bufferSize) {
126126
return unsafeCreate(new Publisher<R>() {
127127
@Override
@@ -151,14 +151,19 @@ public void accept(Disposable r) {
151151
});
152152
}
153153

154-
private FlowablePublish(Publisher<T> onSubscribe, Publisher<? extends T> source,
154+
private FlowablePublish(Publisher<T> onSubscribe, Publisher<T> source,
155155
final AtomicReference<PublishSubscriber<T>> current, int bufferSize) {
156156
this.onSubscribe = onSubscribe;
157157
this.source = source;
158158
this.current = current;
159159
this.bufferSize = bufferSize;
160160
}
161161

162+
@Override
163+
public Publisher<T> source() {
164+
return source;
165+
}
166+
162167
@Override
163168
protected void subscribeActual(Subscriber<? super T> s) {
164169
onSubscribe.subscribe(s);

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import io.reactivex.plugins.RxJavaPlugins;
3131
import io.reactivex.schedulers.Timed;
3232

33-
public final class FlowableReplay<T> extends ConnectableFlowable<T> {
33+
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements FlowableWithUpstream<T> {
3434
/** The source observable. */
35-
final Publisher<? extends T> source;
35+
final Publisher<T> source;
3636
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
3737
final AtomicReference<ReplaySubscriber<T>> current;
3838
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
@@ -142,7 +142,7 @@ public static <T> ConnectableFlowable<T> createFrom(Flowable<? extends T> source
142142
* @param bufferSize
143143
* @return the new ConnectableObservable instance
144144
*/
145-
public static <T> ConnectableFlowable<T> create(Flowable<? extends T> source,
145+
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
146146
final int bufferSize) {
147147
if (bufferSize == Integer.MAX_VALUE) {
148148
return createFrom(source);
@@ -164,7 +164,7 @@ public ReplayBuffer<T> call() {
164164
* @param scheduler
165165
* @return the new ConnectableObservable instance
166166
*/
167-
public static <T> ConnectableFlowable<T> create(Flowable<? extends T> source,
167+
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
168168
long maxAge, TimeUnit unit, Scheduler scheduler) {
169169
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE);
170170
}
@@ -179,7 +179,7 @@ public static <T> ConnectableFlowable<T> create(Flowable<? extends T> source,
179179
* @param bufferSize
180180
* @return the new NbpConnectableObservable instance
181181
*/
182-
public static <T> ConnectableFlowable<T> create(Flowable<? extends T> source,
182+
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
183183
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {
184184
return create(source, new Callable<ReplayBuffer<T>>() {
185185
@Override
@@ -195,7 +195,7 @@ public ReplayBuffer<T> call() {
195195
* @param bufferFactory the factory to instantiate the appropriate buffer when the observable becomes active
196196
* @return the connectable observable
197197
*/
198-
static <T> ConnectableFlowable<T> create(Flowable<? extends T> source,
198+
static <T> ConnectableFlowable<T> create(Flowable<T> source,
199199
final Callable<? extends ReplayBuffer<T>> bufferFactory) {
200200
// the current connection to source needs to be shared between the operator and its onSubscribe call
201201
final AtomicReference<ReplaySubscriber<T>> curr = new AtomicReference<ReplaySubscriber<T>>();
@@ -248,7 +248,7 @@ public void subscribe(Subscriber<? super T> child) {
248248
return new FlowableReplay<T>(onSubscribe, source, curr, bufferFactory);
249249
}
250250

251-
private FlowableReplay(Publisher<T> onSubscribe, Flowable<? extends T> source,
251+
private FlowableReplay(Publisher<T> onSubscribe, Flowable<T> source,
252252
final AtomicReference<ReplaySubscriber<T>> current,
253253
final Callable<? extends ReplayBuffer<T>> bufferFactory) {
254254
this.onSubscribe = onSubscribe;
@@ -257,6 +257,11 @@ private FlowableReplay(Publisher<T> onSubscribe, Flowable<? extends T> source,
257257
this.bufferFactory = bufferFactory;
258258
}
259259

260+
@Override
261+
public Publisher<T> source() {
262+
return source;
263+
}
264+
260265
@Override
261266
protected void subscribeActual(Subscriber<? super T> s) {
262267
onSubscribe.subscribe(s);

src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
* manner.
3030
* @param <T> the value type
3131
*/
32-
public final class ObservablePublish<T> extends ConnectableObservable<T> {
32+
public final class ObservablePublish<T> extends ConnectableObservable<T> implements ObservableWithUpstream<T> {
3333
/** The source observable. */
34-
final ObservableSource<? extends T> source;
34+
final ObservableSource<T> source;
3535
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
3636
final AtomicReference<PublishSubscriber<T>> current;
3737

@@ -47,7 +47,7 @@ public final class ObservablePublish<T> extends ConnectableObservable<T> {
4747
* @param bufferSize the size of the prefetch buffer
4848
* @return the connectable observable
4949
*/
50-
public static <T> ConnectableObservable<T> create(ObservableSource<? extends T> source, final int bufferSize) {
50+
public static <T> ConnectableObservable<T> create(ObservableSource<T> source, final int bufferSize) {
5151
// the current connection to source needs to be shared between the operator and its onSubscribe call
5252
final AtomicReference<PublishSubscriber<T>> curr = new AtomicReference<PublishSubscriber<T>>();
5353
ObservableSource<T> onSubscribe = new ObservableSource<T>() {
@@ -117,7 +117,7 @@ public void subscribe(Observer<? super T> child) {
117117
return new ObservablePublish<T>(onSubscribe, source, curr, bufferSize);
118118
}
119119

120-
public static <T, R> Observable<R> create(final ObservableSource<? extends T> source,
120+
public static <T, R> Observable<R> create(final ObservableSource<T> source,
121121
final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize) {
122122
return new Observable<R>() {
123123
@Override
@@ -148,14 +148,19 @@ public void accept(Disposable r) {
148148
};
149149
}
150150

151-
private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<? extends T> source,
151+
private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<T> source,
152152
final AtomicReference<PublishSubscriber<T>> current, int bufferSize) {
153153
this.onSubscribe = onSubscribe;
154154
this.source = source;
155155
this.current = current;
156156
this.bufferSize = bufferSize;
157157
}
158-
158+
159+
@Override
160+
public ObservableSource<T> source() {
161+
return source;
162+
}
163+
159164
@Override
160165
protected void subscribeActual(Observer<? super T> observer) {
161166
onSubscribe.subscribe(observer);

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import io.reactivex.observables.ConnectableObservable;
2929
import io.reactivex.schedulers.Timed;
3030

31-
public final class ObservableReplay<T> extends ConnectableObservable<T> {
31+
public final class ObservableReplay<T> extends ConnectableObservable<T> implements ObservableWithUpstream<T> {
3232
/** The source observable. */
33-
final Observable<? extends T> source;
33+
final Observable<T> source;
3434
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
3535
final AtomicReference<ReplaySubscriber<T>> current;
3636
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
@@ -126,7 +126,7 @@ public static <T> ConnectableObservable<T> createFrom(Observable<? extends T> so
126126
* @param bufferSize
127127
* @return the new NbpConnectableObservable instance
128128
*/
129-
public static <T> ConnectableObservable<T> create(Observable<? extends T> source,
129+
public static <T> ConnectableObservable<T> create(Observable<T> source,
130130
final int bufferSize) {
131131
if (bufferSize == Integer.MAX_VALUE) {
132132
return createFrom(source);
@@ -148,7 +148,7 @@ public ReplayBuffer<T> call() {
148148
* @param scheduler
149149
* @return the new NbpConnectableObservable instance
150150
*/
151-
public static <T> ConnectableObservable<T> create(Observable<? extends T> source,
151+
public static <T> ConnectableObservable<T> create(Observable<T> source,
152152
long maxAge, TimeUnit unit, Scheduler scheduler) {
153153
return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE);
154154
}
@@ -163,7 +163,7 @@ public static <T> ConnectableObservable<T> create(Observable<? extends T> source
163163
* @param bufferSize
164164
* @return the new NbpConnectableObservable instance
165165
*/
166-
public static <T> ConnectableObservable<T> create(Observable<? extends T> source,
166+
public static <T> ConnectableObservable<T> create(Observable<T> source,
167167
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {
168168
return create(source, new Callable<ReplayBuffer<T>>() {
169169
@Override
@@ -179,7 +179,7 @@ public ReplayBuffer<T> call() {
179179
* @param bufferFactory the factory to instantiate the appropriate buffer when the observable becomes active
180180
* @return the connectable observable
181181
*/
182-
static <T> ConnectableObservable<T> create(Observable<? extends T> source,
182+
static <T> ConnectableObservable<T> create(Observable<T> source,
183183
final Callable<? extends ReplayBuffer<T>> bufferFactory) {
184184
// the current connection to source needs to be shared between the operator and its onSubscribe call
185185
final AtomicReference<ReplaySubscriber<T>> curr = new AtomicReference<ReplaySubscriber<T>>();
@@ -236,15 +236,20 @@ public void subscribe(Observer<? super T> child) {
236236
return new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory);
237237
}
238238

239-
private ObservableReplay(ObservableSource<T> onSubscribe, Observable<? extends T> source,
239+
private ObservableReplay(ObservableSource<T> onSubscribe, Observable<T> source,
240240
final AtomicReference<ReplaySubscriber<T>> current,
241241
final Callable<? extends ReplayBuffer<T>> bufferFactory) {
242242
this.onSubscribe = onSubscribe;
243243
this.source = source;
244244
this.current = current;
245245
this.bufferFactory = bufferFactory;
246246
}
247-
247+
248+
@Override
249+
public ObservableSource<T> source() {
250+
return source;
251+
}
252+
248253
@Override
249254
protected void subscribeActual(Observer<? super T> observer) {
250255
onSubscribe.subscribe(observer);

0 commit comments

Comments
 (0)