Skip to content

Commit 3dd1140

Browse files
Publish with Backpressure
1 parent a1aca70 commit 3dd1140

File tree

4 files changed

+563
-66
lines changed

4 files changed

+563
-66
lines changed

src/main/java/rx/Observable.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5388,14 +5388,7 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
53885388
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
53895389
*/
53905390
public final ConnectableObservable<T> publish() {
5391-
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5392-
5393-
@Override
5394-
public Subject<? super T, ? extends T> call() {
5395-
return PublishSubject.<T> create();
5396-
}
5397-
5398-
});
5391+
return OperatorPublish.create(this);
53995392
}
54005393

54015394
/**
@@ -5423,12 +5416,13 @@ public final ConnectableObservable<T> publish() {
54235416
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
54245417
*/
54255418
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
5426-
return multicast(new Func0<Subject<T, T>>() {
5427-
@Override
5428-
public final Subject<T, T> call() {
5429-
return PublishSubject.create();
5430-
}
5431-
}, selector);
5419+
return OperatorPublish.create(this, selector);
5420+
// return multicast(new Func0<Subject<T, T>>() {
5421+
// @Override
5422+
// public final Subject<T, T> call() {
5423+
// return PublishSubject.create();
5424+
// }
5425+
// }, selector);
54325426
}
54335427

54345428
/**

0 commit comments

Comments
 (0)