1212 */
1313package rx ;
1414
15- import static rx .functions .Functions .* ;
15+ import static rx .functions .Functions .alwaysFalse ;
1616
1717import java .util .ArrayList ;
1818import java .util .Arrays ;
115115import rx .operators .OperatorOnErrorFlatMap ;
116116import rx .operators .OperatorOnErrorResumeNextViaFunction ;
117117import rx .operators .OperatorParallel ;
118+ import rx .operators .OperatorPivot ;
118119import rx .operators .OperatorRepeat ;
119120import rx .operators .OperatorRetry ;
120121import rx .operators .OperatorScan ;
@@ -1640,8 +1641,18 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16401641 * @return an Observable that emits {@code value} as a single item and then completes
16411642 * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
16421643 */
1643- public final static <T > Observable <T > just (T value ) {
1644- return from (Arrays .asList (value ));
1644+ public final static <T > Observable <T > just (final T value ) {
1645+ return Observable .create (new OnSubscribe <T >() {
1646+
1647+ @ Override
1648+ public void call (Subscriber <? super T > s ) {
1649+ if (!s .isUnsubscribed ()) {
1650+ s .onNext (value );
1651+ s .onCompleted ();
1652+ }
1653+ }
1654+
1655+ });
16451656 }
16461657
16471658 /**
@@ -1664,7 +1675,7 @@ public final static <T> Observable<T> just(T value) {
16641675 */
16651676 @ Deprecated
16661677 public final static <T > Observable <T > just (T value , Scheduler scheduler ) {
1667- return from ( Arrays . asList (( value )), scheduler );
1678+ return just ( value ). subscribeOn ( scheduler );
16681679 }
16691680
16701681 /**
@@ -2410,7 +2421,7 @@ public final static <T extends Comparable<? super T>> Observable<T> min(Observab
24102421 * @return an Observable that emits a single item: the source Observable
24112422 */
24122423 public final Observable <Observable <T >> nest () {
2413- return from (this );
2424+ return just (this );
24142425 }
24152426
24162427 /**
@@ -2477,6 +2488,16 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
24772488 return OperationParallelMerge .parallelMerge (source , parallelObservables , scheduler );
24782489 }
24792490
2491+ /**
2492+ * Pivot GroupedObservable streams without serializing/synchronizing to a single stream first.
2493+ *
2494+ * @param groups
2495+ * @return
2496+ */
2497+ public static final <K1 , K2 , T > Observable <GroupedObservable <K2 , GroupedObservable <K1 , T >>> pivot (Observable <GroupedObservable <K1 , GroupedObservable <K2 , T >>> groups ) {
2498+ return groups .lift (new OperatorPivot <K1 , K2 , T >());
2499+ }
2500+
24802501 /**
24812502 * Returns an Observable that emits a sequence of Integers within a specified range.
24822503 * <p>
0 commit comments