1616package rx .internal .operators ;
1717
1818import java .util .ArrayList ;
19+ import java .util .Collection ;
1920import java .util .List ;
20- import java .util .concurrent .atomic .AtomicInteger ;
21+ import java .util .concurrent .ConcurrentLinkedQueue ;
22+ import java .util .concurrent .atomic .AtomicReference ;
2123
2224import rx .Observable ;
2325import rx .Observable .OnSubscribe ;
26+ import rx .Producer ;
2427import rx .Subscriber ;
28+ import rx .functions .Action0 ;
29+ import rx .subscriptions .Subscriptions ;
2530
2631/**
2732 * Given multiple {@link Observable}s, propagates the one that first emits an item.
@@ -262,22 +267,23 @@ public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? exten
262267
263268 private static final class AmbSubscriber <T > extends Subscriber <T > {
264269
265- private static final int NONE = -1 ;
266-
267270 private final Subscriber <? super T > subscriber ;
268- private final int index ;
269- private final AtomicInteger choice ;
271+ private final Selection <T > selection ;
270272
271- private AmbSubscriber (Subscriber <? super T > subscriber , int index , AtomicInteger choice ) {
273+ private AmbSubscriber (long requested , Subscriber <? super T > subscriber , Selection < T > selection ) {
272274 this .subscriber = subscriber ;
273- this .choice = choice ;
274- this .index = index ;
275+ this .selection = selection ;
276+ // initial request
277+ request (requested );
278+ }
279+
280+ private final void requestMore (long n ) {
281+ request (n );
275282 }
276283
277284 @ Override
278285 public void onNext (T args ) {
279286 if (!isSelected ()) {
280- unsubscribe ();
281287 return ;
282288 }
283289 subscriber .onNext (args );
@@ -286,7 +292,6 @@ public void onNext(T args) {
286292 @ Override
287293 public void onCompleted () {
288294 if (!isSelected ()) {
289- unsubscribe ();
290295 return ;
291296 }
292297 subscriber .onCompleted ();
@@ -295,44 +300,102 @@ public void onCompleted() {
295300 @ Override
296301 public void onError (Throwable e ) {
297302 if (!isSelected ()) {
298- unsubscribe ();
299303 return ;
300304 }
301305 subscriber .onError (e );
302306 }
303307
304308 private boolean isSelected () {
305- int ch = choice .get ();
306- if (ch == NONE ) {
307- return choice .compareAndSet (NONE , index );
309+ if (selection .choice .get () == this ) {
310+ // fast-path
311+ return true ;
312+ } else {
313+ if (selection .choice .compareAndSet (null , this )) {
314+ selection .unsubscribeOthers (this );
315+ return true ;
316+ } else {
317+ // we lost so unsubscribe ... and force cleanup again due to possible race conditions
318+ selection .unsubscribeLosers ();
319+ return false ;
320+ }
321+ }
322+ }
323+ }
324+
325+ private static class Selection <T > {
326+ final AtomicReference <AmbSubscriber <T >> choice = new AtomicReference <AmbSubscriber <T >>();
327+ final Collection <AmbSubscriber <T >> ambSubscribers = new ConcurrentLinkedQueue <AmbSubscriber <T >>();
328+
329+ public void unsubscribeLosers () {
330+ AmbSubscriber <T > winner = choice .get ();
331+ if (winner != null ) {
332+ unsubscribeOthers (winner );
333+ }
334+ }
335+
336+ public void unsubscribeOthers (AmbSubscriber <T > notThis ) {
337+ for (AmbSubscriber <T > other : ambSubscribers ) {
338+ if (other != notThis ) {
339+ other .unsubscribe ();
340+ }
308341 }
309- return ch == index ;
342+ ambSubscribers . clear () ;
310343 }
344+
311345 }
312346
313347 private final Iterable <? extends Observable <? extends T >> sources ;
348+ private final Selection <T > selection = new Selection <T >();
314349
315350 private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
316351 this .sources = sources ;
317352 }
318353
319354 @ Override
320- public void call (Subscriber <? super T > subscriber ) {
321- AtomicInteger choice = new AtomicInteger (AmbSubscriber .NONE );
322- int index = 0 ;
323- for (Observable <? extends T > source : sources ) {
324- if (subscriber .isUnsubscribed ()) {
325- break ;
355+ public void call (final Subscriber <? super T > subscriber ) {
356+ subscriber .add (Subscriptions .create (new Action0 () {
357+
358+ @ Override
359+ public void call () {
360+ if (selection .choice .get () != null ) {
361+ // there is a single winner so we unsubscribe it
362+ selection .choice .get ().unsubscribe ();
363+ }
364+ // if we are racing with others still existing, we'll also unsubscribe them
365+ if (!selection .ambSubscribers .isEmpty ()) {
366+ for (AmbSubscriber <T > other : selection .ambSubscribers ) {
367+ other .unsubscribe ();
368+ }
369+ selection .ambSubscribers .clear ();
370+ }
326371 }
327- if (choice .get () != AmbSubscriber .NONE ) {
328- // Already choose someone, the rest Observables can be skipped.
329- break ;
372+
373+ }));
374+ subscriber .setProducer (new Producer () {
375+
376+ @ Override
377+ public void request (long n ) {
378+ if (selection .choice .get () != null ) {
379+ // propagate the request to that single Subscriber that won
380+ selection .choice .get ().requestMore (n );
381+ } else {
382+ for (Observable <? extends T > source : sources ) {
383+ if (subscriber .isUnsubscribed ()) {
384+ break ;
385+ }
386+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387+ selection .ambSubscribers .add (ambSubscriber );
388+ // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389+ if (selection .choice .get () != null ) {
390+ // Already chose one, the rest can be skipped and we can clean up
391+ selection .unsubscribeOthers (selection .choice .get ());
392+ break ;
393+ }
394+ source .unsafeSubscribe (ambSubscriber );
395+ }
396+ }
330397 }
331- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(subscriber , index , choice );
332- subscriber .add (ambSubscriber );
333- source .unsafeSubscribe (ambSubscriber );
334- index ++;
335- }
398+ });
336399 }
337400
338401}
0 commit comments