21
21
22
22
import rx .Observable ;
23
23
import rx .Observable .OnSubscribe ;
24
- import rx .Observer ;
25
24
import rx .Subscriber ;
26
25
27
26
/**
@@ -121,12 +120,12 @@ private static final class AmbSubscriber<T> extends Subscriber<T> {
121
120
122
121
private static final int NONE = -1 ;
123
122
124
- private Observer <? super T > observer ;
125
- private int index ;
126
- private AtomicInteger choice ;
123
+ private final Subscriber <? super T > subscriber ;
124
+ private final int index ;
125
+ private final AtomicInteger choice ;
127
126
128
- private AmbSubscriber (Subscriber <? super T > observer , int index , AtomicInteger choice ) {
129
- this .observer = observer ;
127
+ private AmbSubscriber (Subscriber <? super T > subscriber , int index , AtomicInteger choice ) {
128
+ this .subscriber = subscriber ;
130
129
this .choice = choice ;
131
130
this .index = index ;
132
131
}
@@ -137,7 +136,7 @@ public void onNext(T args) {
137
136
unsubscribe ();
138
137
return ;
139
138
}
140
- observer .onNext (args );
139
+ subscriber .onNext (args );
141
140
}
142
141
143
142
@ Override
@@ -146,7 +145,7 @@ public void onCompleted() {
146
145
unsubscribe ();
147
146
return ;
148
147
}
149
- observer .onCompleted ();
148
+ subscriber .onCompleted ();
150
149
}
151
150
152
151
@ Override
@@ -155,7 +154,7 @@ public void onError(Throwable e) {
155
154
unsubscribe ();
156
155
return ;
157
156
}
158
- observer .onError (e );
157
+ subscriber .onError (e );
159
158
}
160
159
161
160
private boolean isSelected () {
@@ -180,6 +179,10 @@ public void call(Subscriber<? super T> subscriber) {
180
179
if (subscriber .isUnsubscribed ()) {
181
180
break ;
182
181
}
182
+ if (choice .get () != AmbSubscriber .NONE ) {
183
+ // Already choose someone, the rest Observables can be skipped.
184
+ break ;
185
+ }
183
186
AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(subscriber , index , choice );
184
187
subscriber .add (ambSubscriber );
185
188
source .subscribe (ambSubscriber );
0 commit comments