Skip to content

Commit 5962c76

Browse files
Merge pull request #1784 from benjchristensen/publish-backpressure
Publish with Backpressure
2 parents 9126954 + 3dd1140 commit 5962c76

File tree

4 files changed

+563
-66
lines changed

4 files changed

+563
-66
lines changed

src/main/java/rx/Observable.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5386,14 +5386,7 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
53865386
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
53875387
*/
53885388
public final ConnectableObservable<T> publish() {
5389-
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5390-
5391-
@Override
5392-
public Subject<? super T, ? extends T> call() {
5393-
return PublishSubject.<T> create();
5394-
}
5395-
5396-
});
5389+
return OperatorPublish.create(this);
53975390
}
53985391

53995392
/**
@@ -5421,12 +5414,13 @@ public final ConnectableObservable<T> publish() {
54215414
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
54225415
*/
54235416
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
5424-
return multicast(new Func0<Subject<T, T>>() {
5425-
@Override
5426-
public final Subject<T, T> call() {
5427-
return PublishSubject.create();
5428-
}
5429-
}, selector);
5417+
return OperatorPublish.create(this, selector);
5418+
// return multicast(new Func0<Subject<T, T>>() {
5419+
// @Override
5420+
// public final Subject<T, T> call() {
5421+
// return PublishSubject.create();
5422+
// }
5423+
// }, selector);
54305424
}
54315425

54325426
/**

0 commit comments

Comments
 (0)