Skip to content

Commit b8a3cad

Browse files
committed
Reimplement the amb operator
1 parent d46af63 commit b8a3cad

File tree

2 files changed

+54
-53
lines changed

2 files changed

+54
-53
lines changed

rxjava-core/src/main/java/rx/operators/OperationAmb.java

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,31 @@
2020
import java.util.concurrent.atomic.AtomicInteger;
2121

2222
import rx.Observable;
23-
import rx.Observable.OnSubscribeFunc;
23+
import rx.Observable.OnSubscribe;
2424
import rx.Observer;
25-
import rx.Subscription;
26-
import rx.subscriptions.CompositeSubscription;
25+
import rx.Subscriber;
2726

2827
/**
2928
* Propagates the observable sequence that reacts first.
3029
*/
31-
public class OperationAmb {
30+
public final class OperationAmb<T> implements OnSubscribe<T>{
3231

33-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
32+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
3433
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
3534
sources.add(o1);
3635
sources.add(o2);
3736
return amb(sources);
3837
}
3938

40-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
39+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
4140
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
4241
sources.add(o1);
4342
sources.add(o2);
4443
sources.add(o3);
4544
return amb(sources);
4645
}
4746

48-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
47+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
4948
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
5049
sources.add(o1);
5150
sources.add(o2);
@@ -54,7 +53,7 @@ public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<
5453
return amb(sources);
5554
}
5655

57-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
56+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
5857
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
5958
sources.add(o1);
6059
sources.add(o2);
@@ -64,7 +63,7 @@ public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<
6463
return amb(sources);
6564
}
6665

67-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
66+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
6867
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
6968
sources.add(o1);
7069
sources.add(o2);
@@ -75,7 +74,7 @@ public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<
7574
return amb(sources);
7675
}
7776

78-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
77+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
7978
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
8079
sources.add(o1);
8180
sources.add(o2);
@@ -87,7 +86,7 @@ public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<
8786
return amb(sources);
8887
}
8988

90-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
89+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
9190
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
9291
sources.add(o1);
9392
sources.add(o2);
@@ -100,7 +99,7 @@ public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<
10099
return amb(sources);
101100
}
102101

103-
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
102+
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
104103
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
105104
sources.add(o1);
106105
sources.add(o2);
@@ -114,40 +113,19 @@ public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<
114113
return amb(sources);
115114
}
116115

117-
public static <T> OnSubscribeFunc<T> amb(
118-
final Iterable<? extends Observable<? extends T>> sources) {
119-
return new OnSubscribeFunc<T>() {
120-
121-
@Override
122-
public Subscription onSubscribe(final Observer<? super T> observer) {
123-
AtomicInteger choice = new AtomicInteger(AmbObserver.NONE);
124-
int index = 0;
125-
CompositeSubscription parentSubscription = new CompositeSubscription();
126-
for (Observable<? extends T> source : sources) {
127-
SafeObservableSubscription subscription = new SafeObservableSubscription();
128-
AmbObserver<T> ambObserver = new AmbObserver<T>(
129-
subscription, observer, index, choice);
130-
parentSubscription.add(subscription.wrap(source
131-
.subscribe(ambObserver)));
132-
index++;
133-
}
134-
return parentSubscription;
135-
}
136-
};
116+
public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? extends T>> sources) {
117+
return new OperationAmb<T>(sources);
137118
}
138119

139-
private static class AmbObserver<T> implements Observer<T> {
120+
private static final class AmbSubscriber<T> extends Subscriber<T> {
140121

141122
private static final int NONE = -1;
142123

143-
private Subscription subscription;
144124
private Observer<? super T> observer;
145125
private int index;
146126
private AtomicInteger choice;
147127

148-
private AmbObserver(Subscription subscription,
149-
Observer<? super T> observer, int index, AtomicInteger choice) {
150-
this.subscription = subscription;
128+
private AmbSubscriber(Subscriber<? super T> observer, int index, AtomicInteger choice) {
151129
this.observer = observer;
152130
this.choice = choice;
153131
this.index = index;
@@ -156,7 +134,7 @@ private AmbObserver(Subscription subscription,
156134
@Override
157135
public void onNext(T args) {
158136
if (!isSelected()) {
159-
subscription.unsubscribe();
137+
unsubscribe();
160138
return;
161139
}
162140
observer.onNext(args);
@@ -165,7 +143,7 @@ public void onNext(T args) {
165143
@Override
166144
public void onCompleted() {
167145
if (!isSelected()) {
168-
subscription.unsubscribe();
146+
unsubscribe();
169147
return;
170148
}
171149
observer.onCompleted();
@@ -174,7 +152,7 @@ public void onCompleted() {
174152
@Override
175153
public void onError(Throwable e) {
176154
if (!isSelected()) {
177-
subscription.unsubscribe();
155+
unsubscribe();
178156
return;
179157
}
180158
observer.onError(e);
@@ -188,4 +166,25 @@ private boolean isSelected() {
188166
}
189167
}
190168

169+
private final Iterable<? extends Observable<? extends T>> sources;
170+
171+
private OperationAmb(Iterable<? extends Observable<? extends T>> sources) {
172+
this.sources = sources;
173+
}
174+
175+
@Override
176+
public void call(Subscriber<? super T> subscriber) {
177+
AtomicInteger choice = new AtomicInteger(AmbSubscriber.NONE);
178+
int index = 0;
179+
for (Observable<? extends T> source : sources) {
180+
if (subscriber.isUnsubscribed()) {
181+
break;
182+
}
183+
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(subscriber, index, choice);
184+
subscriber.add(ambSubscriber);
185+
source.subscribe(ambSubscriber);
186+
index++;
187+
}
188+
}
189+
191190
}

rxjava-core/src/test/java/rx/operators/OperationAmbTest.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Mockito.*;
19-
import static rx.operators.OperationAmb.*;
18+
import static org.mockito.Mockito.inOrder;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.times;
21+
import static rx.operators.OperationAmb.amb;
2022

2123
import java.io.IOException;
2224
import java.util.concurrent.TimeUnit;
@@ -26,10 +28,10 @@
2628
import org.mockito.InOrder;
2729

2830
import rx.Observable;
29-
import rx.Observable.OnSubscribeFunc;
31+
import rx.Observable.OnSubscribe;
3032
import rx.Observer;
3133
import rx.Scheduler.Inner;
32-
import rx.Subscription;
34+
import rx.Subscriber;
3335
import rx.functions.Action1;
3436
import rx.schedulers.TestScheduler;
3537
import rx.subscriptions.CompositeSubscription;
@@ -45,17 +47,18 @@ public void setUp() {
4547

4648
private Observable<String> createObservable(final String[] values,
4749
final long interval, final Throwable e) {
48-
return Observable.create(new OnSubscribeFunc<String>() {
50+
return Observable.create(new OnSubscribe<String>() {
4951

5052
@Override
51-
public Subscription onSubscribe(final Observer<? super String> observer) {
53+
public void call(final Subscriber<? super String> subscriber) {
5254
CompositeSubscription parentSubscription = new CompositeSubscription();
55+
subscriber.add(parentSubscription);
5356
long delay = interval;
5457
for (final String value : values) {
5558
parentSubscription.add(scheduler.schedule(new Action1<Inner>() {
5659
@Override
5760
public void call(Inner inner) {
58-
observer.onNext(value);
61+
subscriber.onNext(value);
5962
}
6063
}, delay, TimeUnit.MILLISECONDS));
6164
delay += interval;
@@ -64,13 +67,12 @@ public void call(Inner inner) {
6467
@Override
6568
public void call(Inner inner) {
6669
if (e == null) {
67-
observer.onCompleted();
70+
subscriber.onCompleted();
6871
} else {
69-
observer.onError(e);
72+
subscriber.onError(e);
7073
}
7174
}
7275
}, delay, TimeUnit.MILLISECONDS));
73-
return parentSubscription;
7476
}
7577
});
7678
}
@@ -104,12 +106,12 @@ public void testAmb() {
104106

105107
@Test
106108
public void testAmb2() {
107-
IOException needHappenedException = new IOException(
109+
IOException expectedException = new IOException(
108110
"fake exception");
109111
Observable<String> observable1 = createObservable(new String[] {},
110112
2000, new IOException("fake exception"));
111113
Observable<String> observable2 = createObservable(new String[] {
112-
"2", "22", "222", "2222" }, 1000, needHappenedException);
114+
"2", "22", "222", "2222" }, 1000, expectedException);
113115
Observable<String> observable3 = createObservable(new String[] {},
114116
3000, new IOException("fake exception"));
115117

@@ -127,7 +129,7 @@ public void testAmb2() {
127129
inOrder.verify(observer, times(1)).onNext("22");
128130
inOrder.verify(observer, times(1)).onNext("222");
129131
inOrder.verify(observer, times(1)).onNext("2222");
130-
inOrder.verify(observer, times(1)).onError(needHappenedException);
132+
inOrder.verify(observer, times(1)).onError(expectedException);
131133
inOrder.verifyNoMoreInteractions();
132134
}
133135

0 commit comments

Comments
 (0)