1616package rx .internal .operators ;
1717
1818import java .util .ArrayList ;
19+ import java .util .Collection ;
1920import java .util .List ;
20- import java .util .concurrent .atomic .AtomicInteger ;
21+ import java .util .concurrent .ConcurrentLinkedQueue ;
22+ import java .util .concurrent .atomic .AtomicReference ;
2123
2224import rx .Observable ;
2325import rx .Observable .OnSubscribe ;
2426import rx .Producer ;
2527import rx .Subscriber ;
28+ import rx .functions .Action0 ;
29+ import rx .subscriptions .Subscriptions ;
2630
2731/**
2832 * Given multiple {@link Observable}s, propagates the one that first emits an item.
@@ -263,26 +267,23 @@ public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? exten
263267
264268 private static final class AmbSubscriber <T > extends Subscriber <T > {
265269
266- private static final int NONE = -1 ;
267-
268270 private final Subscriber <? super T > subscriber ;
269- private final int index ;
270- private final AtomicInteger choice ;
271+ private final Selection <T > selection ;
271272
272- private AmbSubscriber (Subscriber <? super T > subscriber , int index , AtomicInteger choice ) {
273+ private AmbSubscriber (long requested , Subscriber <? super T > subscriber , Selection < T > selection ) {
273274 this .subscriber = subscriber ;
274- this .choice = choice ;
275- this .index = index ;
275+ this .selection = selection ;
276+ // initial request
277+ request (requested );
276278 }
277279
278- public void requestMore (long n ) {
280+ private final void requestMore (long n ) {
279281 request (n );
280282 }
281283
282284 @ Override
283285 public void onNext (T args ) {
284286 if (!isSelected ()) {
285- unsubscribe ();
286287 return ;
287288 }
288289 subscriber .onNext (args );
@@ -291,7 +292,6 @@ public void onNext(T args) {
291292 @ Override
292293 public void onCompleted () {
293294 if (!isSelected ()) {
294- unsubscribe ();
295295 return ;
296296 }
297297 subscriber .onCompleted ();
@@ -300,66 +300,99 @@ public void onCompleted() {
300300 @ Override
301301 public void onError (Throwable e ) {
302302 if (!isSelected ()) {
303- unsubscribe ();
304303 return ;
305304 }
306305 subscriber .onError (e );
307306 }
308307
309308 private boolean isSelected () {
310- int ch = choice .get ();
311- if (ch == NONE ) {
312- return choice .compareAndSet (NONE , index );
309+ if (selection .choice .get () == this ) {
310+ // fast-path
311+ return true ;
312+ } else {
313+ if (selection .choice .compareAndSet (null , this )) {
314+ selection .unsubscribeOthers (this );
315+ return true ;
316+ } else {
317+ // we lost so unsubscribe ... and force cleanup again due to possible race conditions
318+ selection .unsubscribeLosers ();
319+ return false ;
320+ }
321+ }
322+ }
323+ }
324+
325+ private static class Selection <T > {
326+ final AtomicReference <AmbSubscriber <T >> choice = new AtomicReference <AmbSubscriber <T >>();
327+ final Collection <AmbSubscriber <T >> ambSubscribers = new ConcurrentLinkedQueue <AmbSubscriber <T >>();
328+
329+ public void unsubscribeLosers () {
330+ AmbSubscriber <T > winner = choice .get ();
331+ if (winner != null ) {
332+ unsubscribeOthers (winner );
333+ }
334+ }
335+
336+ public void unsubscribeOthers (AmbSubscriber <T > notThis ) {
337+ for (AmbSubscriber <T > other : ambSubscribers ) {
338+ if (other != notThis ) {
339+ other .unsubscribe ();
340+ }
313341 }
314- return ch == index ;
342+ ambSubscribers . clear () ;
315343 }
344+
316345 }
317346
318347 private final Iterable <? extends Observable <? extends T >> sources ;
348+ private final Selection <T > selection = new Selection <T >();
319349
320350 private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
321351 this .sources = sources ;
322352 }
323353
324354 @ Override
325- public void call (Subscriber <? super T > subscriber ) {
326- final AtomicInteger choice = new AtomicInteger (AmbSubscriber .NONE );
327- final List <AmbSubscriber <T >> ambSubscribers = new ArrayList <AmbSubscriber <T >>();
328- int index = 0 ;
329- for (Observable <? extends T > source : sources ) {
330- if (subscriber .isUnsubscribed ()) {
331- break ;
332- }
333- if (choice .get () != AmbSubscriber .NONE ) {
334- // Already choose someone, the rest Observables can be skipped.
335- break ;
355+ public void call (final Subscriber <? super T > subscriber ) {
356+ subscriber .add (Subscriptions .create (new Action0 () {
357+
358+ @ Override
359+ public void call () {
360+ if (selection .choice .get () != null ) {
361+ // there is a single winner so we unsubscribe it
362+ selection .choice .get ().unsubscribe ();
363+ }
364+ // if we are racing with others still existing, we'll also unsubscribe them
365+ if (!selection .ambSubscribers .isEmpty ()) {
366+ for (AmbSubscriber <T > other : selection .ambSubscribers ) {
367+ other .unsubscribe ();
368+ }
369+ selection .ambSubscribers .clear ();
370+ }
336371 }
337- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(subscriber , index , choice );
338- ambSubscribers .add (ambSubscriber );
339- subscriber .add (ambSubscriber );
340- source .unsafeSubscribe (ambSubscriber );
341- index ++;
342- }
343- // setProducer at the end so that `ambSubscribers` can be finalized before `subscriber` calls `request`
372+
373+ }));
344374 subscriber .setProducer (new Producer () {
345375
346- private volatile AmbSubscriber <T > selectedAmbSubscriber ;
347-
348376 @ Override
349377 public void request (long n ) {
350- if (choice .get () == AmbSubscriber .NONE ) {
351- for (AmbSubscriber <T > ambSubscriber : ambSubscribers ) {
352- // Once one Observable emits a message, `unsubscribe` of other Observables will be called
353- // and further messages will be dropped. Therefore, requesting all sources won't cause
354- // the backpressure issue.
355- ambSubscriber .requestMore (n );
356- }
357- }
358- else {
359- if (selectedAmbSubscriber == null ) {
360- selectedAmbSubscriber = ambSubscribers .get (choice .get ());
378+ if (selection .choice .get () != null ) {
379+ // propagate the request to that single Subscriber that won
380+ selection .choice .get ().requestMore (n );
381+ } else {
382+ for (Observable <? extends T > source : sources ) {
383+ if (subscriber .isUnsubscribed ()) {
384+ break ;
385+ }
386+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387+ selection .ambSubscribers .add (ambSubscriber );
388+ // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389+ if (selection .choice .get () != null ) {
390+ // Already chose one, the rest can be skipped and we can clean up
391+ selection .unsubscribeOthers (selection .choice .get ());
392+ break ;
393+ }
394+ source .unsafeSubscribe (ambSubscriber );
361395 }
362- selectedAmbSubscriber .requestMore (n );
363396 }
364397 }
365398 });
0 commit comments