@@ -5086,68 +5086,6 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
50865086 return merge (this , t1 );
50875087 }
50885088
5089- /**
5090- * Returns an Observable that emits items produced by multicasting the source Observable within a selector
5091- * function.
5092- * <p>
5093- * This is largely a helper function used by RxJava for other forms of multicasting, such as
5094- * {@link #publish} and {@link #publishLast}.
5095- * <dl>
5096- * <dt><b>Backpressure Support:</b></dt>
5097- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5098- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5099- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5100- * <dt><b>Scheduler:</b></dt>
5101- * <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5102- * </dl>
5103- *
5104- * @warn javadocs incomplete: description needs improvement
5105- * @param subjectFactory
5106- * the {@link Subject} factory
5107- * @warn javadocs incomplete: "subjectFactory" parameter described poorly
5108- * @param selector
5109- * the selector function, which can use the multicasted source Observable subject to the policies
5110- * enforced by the created {@code Subject}
5111- * @warn javadocs incomplete: "selector" parameter described poorly
5112- * @return an Observable that emits the items produced by multicasting the source Observable within a
5113- * selector function
5114- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava: Observable.publish() and Observable.multicast</a>
5115- * @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5116- */
5117- public final <TIntermediate , TResult > Observable <TResult > multicast (
5118- final Func0 <? extends Subject <? super T , ? extends TIntermediate >> subjectFactory ,
5119- final Func1 <? super Observable <TIntermediate >, ? extends Observable <TResult >> selector ) {
5120- return create (new OnSubscribeMulticastSelector <T , TIntermediate , TResult >(this , subjectFactory , selector ));
5121- }
5122-
5123- /**
5124- * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5125- * into the specified subject. A Connectable Observable resembles an ordinary Observable, except that it
5126- * does not begin emitting items when it is subscribed to, but only when its {@code connect} method
5127- * is called.
5128- * <dl>
5129- * <dt><b>Backpressure Support:</b></dt>
5130- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5131- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5132- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5133- * <dt><b>Scheduler:</b></dt>
5134- * <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5135- * </dl>
5136- *
5137- * @param subjectFactory
5138- * a function that creates a new {@link Subject} for the {@link ConnectableObservable} to push
5139- * source items into
5140- * @param <R>
5141- * the type of items emitted by the resulting {@code ConnectableObservable}
5142- * @return a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5143- * into the specified {@link Subject}
5144- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
5145- * @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5146- */
5147- public final <R > ConnectableObservable <R > multicast (Func0 <? extends Subject <? super T , ? extends R >> subjectFactory ) {
5148- return new OperatorMulticast <T , R >(this , subjectFactory );
5149- }
5150-
51515089 /**
51525090 * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
51535091 * asynchronously with an unbounded buffer.
@@ -5372,10 +5310,6 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
53725310 * <p>
53735311 * <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="">
53745312 * <dl>
5375- * <dt><b>Backpressure Support:</b></dt>
5376- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5377- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5378- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
53795313 * <dt><b>Scheduler:</b></dt>
53805314 * <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
53815315 * </dl>
@@ -5395,10 +5329,6 @@ public final ConnectableObservable<T> publish() {
53955329 * <p>
53965330 * <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="">
53975331 * <dl>
5398- * <dt><b>Backpressure Support:</b></dt>
5399- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5400- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5401- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
54025332 * <dt><b>Scheduler:</b></dt>
54035333 * <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
54045334 * </dl>
@@ -5415,12 +5345,6 @@ public final ConnectableObservable<T> publish() {
54155345 */
54165346 public final <R > Observable <R > publish (Func1 <? super Observable <T >, ? extends Observable <R >> selector ) {
54175347 return OperatorPublish .create (this , selector );
5418- // return multicast(new Func0<Subject<T, T>>() {
5419- // @Override
5420- // public final Subject<T, T> call() {
5421- // return PublishSubject.create();
5422- // }
5423- // }, selector);
54245348 }
54255349
54265350 /**
@@ -5430,10 +5354,6 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
54305354 * <p>
54315355 * <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.if.png" alt="">
54325356 * <dl>
5433- * <dt><b>Backpressure Support:</b></dt>
5434- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5435- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5436- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
54375357 * <dt><b>Scheduler:</b></dt>
54385358 * <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
54395359 * </dl>
@@ -5452,12 +5372,7 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
54525372 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
54535373 */
54545374 public final <R > Observable <R > publish (Func1 <? super Observable <T >, ? extends Observable <R >> selector , final T initialValue ) {
5455- return multicast (new Func0 <Subject <T , T >>() {
5456- @ Override
5457- public final Subject <T , T > call () {
5458- return BehaviorSubject .create (initialValue );
5459- }
5460- }, selector );
5375+ return concatWith (just (initialValue )).publish (selector );
54615376 }
54625377
54635378 /**
@@ -5467,10 +5382,6 @@ public final Subject<T, T> call() {
54675382 * <p>
54685383 * <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.i.png" alt="">
54695384 * <dl>
5470- * <dt><b>Backpressure Support:</b></dt>
5471- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5472- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5473- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
54745385 * <dt><b>Scheduler:</b></dt>
54755386 * <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
54765387 * </dl>
@@ -5483,14 +5394,7 @@ public final Subject<T, T> call() {
54835394 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
54845395 */
54855396 public final ConnectableObservable <T > publish (final T initialValue ) {
5486- return new OperatorMulticast <T , T >(this , new Func0 <Subject <? super T , ? extends T >>() {
5487-
5488- @ Override
5489- public Subject <? super T , ? extends T > call () {
5490- return BehaviorSubject .<T > create (initialValue );
5491- }
5492-
5493- });
5397+ return concatWith (just (initialValue )).publish ();
54945398 }
54955399
54965400 /**
@@ -5512,14 +5416,7 @@ public final ConnectableObservable<T> publish(final T initialValue) {
55125416 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
55135417 */
55145418 public final ConnectableObservable <T > publishLast () {
5515- return new OperatorMulticast <T , T >(this , new Func0 <Subject <? super T , ? extends T >>() {
5516-
5517- @ Override
5518- public Subject <? super T , ? extends T > call () {
5519- return AsyncSubject .<T > create ();
5520- }
5521-
5522- });
5419+ return takeLast (1 ).publish ();
55235420 }
55245421
55255422 /**
@@ -5546,12 +5443,7 @@ public final ConnectableObservable<T> publishLast() {
55465443 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
55475444 */
55485445 public final <R > Observable <R > publishLast (Func1 <? super Observable <T >, ? extends Observable <R >> selector ) {
5549- return multicast (new Func0 <Subject <T , T >>() {
5550- @ Override
5551- public final Subject <T , T > call () {
5552- return AsyncSubject .create ();
5553- }
5554- }, selector );
5446+ return takeLast (1 ).publish (selector );
55555447 }
55565448
55575449 /**
0 commit comments