Skip to content

Commit 07e2769

Browse files
Pivot Operator
1 parent bcded86 commit 07e2769

File tree

5 files changed

+736
-8
lines changed

5 files changed

+736
-8
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
package rx;
1414

15-
import static rx.functions.Functions.*;
15+
import static rx.functions.Functions.alwaysFalse;
1616

1717
import java.util.ArrayList;
1818
import java.util.Arrays;
@@ -115,6 +115,7 @@
115115
import rx.operators.OperatorOnErrorFlatMap;
116116
import rx.operators.OperatorOnErrorResumeNextViaFunction;
117117
import rx.operators.OperatorParallel;
118+
import rx.operators.OperatorPivot;
118119
import rx.operators.OperatorRepeat;
119120
import rx.operators.OperatorRetry;
120121
import rx.operators.OperatorScan;
@@ -1640,8 +1641,18 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16401641
* @return an Observable that emits {@code value} as a single item and then completes
16411642
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-just">RxJava Wiki: just()</a>
16421643
*/
1643-
public final static <T> Observable<T> just(T value) {
1644-
return from(Arrays.asList(value));
1644+
public final static <T> Observable<T> just(final T value) {
1645+
return Observable.create(new OnSubscribe<T>() {
1646+
1647+
@Override
1648+
public void call(Subscriber<? super T> s) {
1649+
if (!s.isUnsubscribed()) {
1650+
s.onNext(value);
1651+
s.onCompleted();
1652+
}
1653+
}
1654+
1655+
});
16451656
}
16461657

16471658
/**
@@ -1664,7 +1675,7 @@ public final static <T> Observable<T> just(T value) {
16641675
*/
16651676
@Deprecated
16661677
public final static <T> Observable<T> just(T value, Scheduler scheduler) {
1667-
return from(Arrays.asList((value)), scheduler);
1678+
return just(value).subscribeOn(scheduler);
16681679
}
16691680

16701681
/**
@@ -2410,7 +2421,7 @@ public final static <T extends Comparable<? super T>> Observable<T> min(Observab
24102421
* @return an Observable that emits a single item: the source Observable
24112422
*/
24122423
public final Observable<Observable<T>> nest() {
2413-
return from(this);
2424+
return just(this);
24142425
}
24152426

24162427
/**
@@ -2477,6 +2488,16 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
24772488
return OperationParallelMerge.parallelMerge(source, parallelObservables, scheduler);
24782489
}
24792490

2491+
/**
2492+
* Pivot GroupedObservable streams without serializing/synchronizing to a single stream first.
2493+
*
2494+
* @param groups
2495+
* @return
2496+
*/
2497+
public static final <K1, K2, T> Observable<GroupedObservable<K2, GroupedObservable<K1, T>>> pivot(Observable<GroupedObservable<K1, GroupedObservable<K2, T>>> groups) {
2498+
return groups.lift(new OperatorPivot<K1, K2, T>());
2499+
}
2500+
24802501
/**
24812502
* Returns an Observable that emits a sequence of Integers within a specified range.
24822503
* <p>

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.observables;
1717

1818
import rx.Observable;
19+
import rx.Subscriber;
1920
import rx.functions.Func1;
2021

2122
/**
@@ -31,6 +32,16 @@
3132
public class GroupedObservable<K, T> extends Observable<T> {
3233
private final K key;
3334

35+
public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o) {
36+
return new GroupedObservable<K, T>(key, new OnSubscribe<T>() {
37+
38+
@Override
39+
public void call(Subscriber<? super T> s) {
40+
o.subscribe(s);
41+
}
42+
});
43+
}
44+
3445
public GroupedObservable(K key, OnSubscribe<T> onSubscribe) {
3546
super(onSubscribe);
3647
this.key = key;

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
import java.util.ArrayList;
44

5-
import javax.management.NotificationListener;
6-
7-
import rx.Notification;
85
import rx.Observer;
96
import rx.operators.NotificationLite;
107

@@ -54,6 +51,7 @@ public void onCompleted() {
5451
queue.add(on.completed());
5552
}
5653
}
54+
5755
if (canEmit) {
5856
// we won the right to emit
5957
try {

0 commit comments

Comments
 (0)