Skip to content

Commit 0b91c5b

Browse files
Merge pull request #738 from akarnokd/PublishAndPublishLast2
Publish and PublishLast overloads
2 parents 17307e4 + 0ad8980 commit 0b91c5b

File tree

1 file changed

+78
-1
lines changed

1 file changed

+78
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@
108108
import rx.operators.OperationZip;
109109
import rx.operators.SafeObservableSubscription;
110110
import rx.operators.SafeObserver;
111-
import rx.plugins.RxJavaErrorHandler;
112111
import rx.plugins.RxJavaObservableExecutionHook;
113112
import rx.plugins.RxJavaPlugins;
114113
import rx.schedulers.Schedulers;
115114
import rx.subjects.AsyncSubject;
115+
import rx.subjects.BehaviorSubject;
116116
import rx.subjects.PublishSubject;
117117
import rx.subjects.ReplaySubject;
118118
import rx.subjects.Subject;
@@ -5491,6 +5491,59 @@ public ConnectableObservable<T> publish() {
54915491
return OperationMulticast.multicast(this, PublishSubject.<T> create());
54925492
}
54935493

5494+
/**
5495+
* Create a connectable observable sequence that shares a single
5496+
* subscription to the underlying sequence and starts with initialValue.
5497+
* @param initialValue the initial value of the underlying BehaviorSubject
5498+
* @return a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
5499+
*/
5500+
public ConnectableObservable<T> publish(T initialValue) {
5501+
return OperationMulticast.multicast(this, BehaviorSubject.<T> create(initialValue));
5502+
}
5503+
5504+
/**
5505+
* Create an observable sequence that is the result of invoking the
5506+
* selector on a connectable observable sequence that shares a single
5507+
* subscription to the underlying sequence.
5508+
* @param <R> the result type
5509+
* @param selector function which can use the multicasted source
5510+
* sequence as many times as needed, without causing multiple
5511+
* subscriptions to the source sequence. Subscribers to the given
5512+
* source will receive all notifications of the source from the time
5513+
* of the subscription on.
5514+
* @return an observable sequence that is the result of invoking the
5515+
* selector on a connectable observable sequence that shares a single
5516+
* subscription to the underlying sequence.
5517+
*/
5518+
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
5519+
return multicast(new Func0<Subject<T, T>>() {
5520+
@Override
5521+
public Subject<T, T> call() {
5522+
return PublishSubject.create();
5523+
}
5524+
}, selector);
5525+
}
5526+
5527+
/**
5528+
* Create an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
5529+
* @param <R> the result type
5530+
* @param selector function which can use the multicasted source
5531+
* sequence as many times as needed, without causing multiple
5532+
* subscriptions to the source sequence. Subscribers to the given
5533+
* source will receive all notifications of the source from the time
5534+
* of the subscription on
5535+
* @param initialValue the initial value of the underlying BehaviorSubject
5536+
* @return an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue
5537+
*/
5538+
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
5539+
return multicast(new Func0<Subject<T, T>>() {
5540+
@Override
5541+
public Subject<T, T> call() {
5542+
return BehaviorSubject.create(initialValue);
5543+
}
5544+
}, selector);
5545+
}
5546+
54945547
/**
54955548
* Returns a {@link ConnectableObservable} that emits only the last item
54965549
* emitted by the source Observable.
@@ -5503,6 +5556,30 @@ public ConnectableObservable<T> publish() {
55035556
public ConnectableObservable<T> publishLast() {
55045557
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
55055558
}
5559+
5560+
/**
5561+
* Create an observable sequence that is the result of invoking the
5562+
* selector on a connectable observable sequence that shares a single
5563+
* subscription to the underlying sequence containing only the last
5564+
* notification.
5565+
* @param <R> the result type
5566+
* @param selector function which can use the multicasted source
5567+
* sequence as many times as needed, without causing multiple
5568+
* subscriptions to the source sequence. Subscribers to the given
5569+
* source will only receive the last notification of the source
5570+
* @return an observable sequence that is the result of invoking the
5571+
* selector on a connectable observable sequence that shares a single
5572+
* subscription to the underlying sequence containing only the last
5573+
* notification.
5574+
*/
5575+
public <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
5576+
return multicast(new Func0<Subject<T, T>>() {
5577+
@Override
5578+
public Subject<T, T> call() {
5579+
return AsyncSubject.create();
5580+
}
5581+
}, selector);
5582+
}
55065583

55075584
/**
55085585
* Synonymous with <code>reduce()</code>.

0 commit comments

Comments
 (0)