Skip to content

Commit 900dca8

Browse files
committed
Publish and PublishLast overloads
1 parent fe438ca commit 900dca8

File tree

1 file changed

+78
-1
lines changed

1 file changed

+78
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,11 @@
107107
import rx.operators.OperationZip;
108108
import rx.operators.SafeObservableSubscription;
109109
import rx.operators.SafeObserver;
110-
import rx.plugins.RxJavaErrorHandler;
111110
import rx.plugins.RxJavaObservableExecutionHook;
112111
import rx.plugins.RxJavaPlugins;
113112
import rx.schedulers.Schedulers;
114113
import rx.subjects.AsyncSubject;
114+
import rx.subjects.BehaviorSubject;
115115
import rx.subjects.PublishSubject;
116116
import rx.subjects.ReplaySubject;
117117
import rx.subjects.Subject;
@@ -5226,6 +5226,59 @@ public ConnectableObservable<T> publish() {
52265226
return OperationMulticast.multicast(this, PublishSubject.<T> create());
52275227
}
52285228

5229+
/**
5230+
* Create a connectable observable sequence that shares a single
5231+
* subscription to the underlying sequence and starts with initialValue.
5232+
* @param initialValue the initial value of the underlying BehaviorSubject
5233+
* @return a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
5234+
*/
5235+
public ConnectableObservable<T> publish(T initialValue) {
5236+
return OperationMulticast.multicast(this, BehaviorSubject.<T> create(initialValue));
5237+
}
5238+
5239+
/**
5240+
* Create an observable sequence that is the result of invoking the
5241+
* selector on a connectable observable sequence that shares a single
5242+
* subscription to the underlying sequence.
5243+
* @param <R> the result type
5244+
* @param selector function which can use the multicasted source
5245+
* sequence as many times as needed, without causing multiple
5246+
* subscriptions to the source sequence. Subscribers to the given
5247+
* source will receive all notifications of the source from the time
5248+
* of the subscription on.
5249+
* @return an observable sequence that is the result of invoking the
5250+
* selector on a connectable observable sequence that shares a single
5251+
* subscription to the underlying sequence.
5252+
*/
5253+
public <R> Observable<R> publish(Func1<Observable<T>, Observable<R>> selector) {
5254+
return multicast(new Func0<Subject<T, T>>() {
5255+
@Override
5256+
public Subject<T, T> call() {
5257+
return PublishSubject.create();
5258+
}
5259+
}, selector);
5260+
}
5261+
5262+
/**
5263+
* Create an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
5264+
* @param <R> the result type
5265+
* @param selector function which can use the multicasted source
5266+
* sequence as many times as needed, without causing multiple
5267+
* subscriptions to the source sequence. Subscribers to the given
5268+
* source will receive all notifications of the source from the time
5269+
* of the subscription on
5270+
* @param initialValue the initial value of the underlying BehaviorSubject
5271+
* @return an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue
5272+
*/
5273+
public <R> Observable<R> publish(Func1<Observable<T>, Observable<R>> selector, final T initialValue) {
5274+
return multicast(new Func0<Subject<T, T>>() {
5275+
@Override
5276+
public Subject<T, T> call() {
5277+
return BehaviorSubject.create(initialValue);
5278+
}
5279+
}, selector);
5280+
}
5281+
52295282
/**
52305283
* Returns a {@link ConnectableObservable} that emits only the last item
52315284
* emitted by the source Observable.
@@ -5238,6 +5291,30 @@ public ConnectableObservable<T> publish() {
52385291
public ConnectableObservable<T> publishLast() {
52395292
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
52405293
}
5294+
5295+
/**
5296+
* Create an observable sequence that is the result of invoking the
5297+
* selector on a connectable observable sequence that shares a single
5298+
* subscription to the underlying sequence containing only the last
5299+
* notification.
5300+
* @param <R> the result type
5301+
* @param selector function which can use the multicasted source
5302+
* sequence as many times as needed, without causing multiple
5303+
* subscriptions to the source sequence. Subscribers to the given
5304+
* source will only receive the last notification of the source
5305+
* @return an observable sequence that is the result of invoking the
5306+
* selector on a connectable observable sequence that shares a single
5307+
* subscription to the underlying sequence containing only the last
5308+
* notification.
5309+
*/
5310+
public <R> Observable<R> publishLast(Func1<Observable<T>, Observable<R>> selector) {
5311+
return multicast(new Func0<Subject<T, T>>() {
5312+
@Override
5313+
public Subject<T, T> call() {
5314+
return AsyncSubject.create();
5315+
}
5316+
}, selector);
5317+
}
52415318

52425319
/**
52435320
* Synonymous with <code>reduce()</code>.

0 commit comments

Comments
 (0)