15
15
*/
16
16
package rx .internal .operators ;
17
17
18
+ import java .util .Queue ;
19
+ import java .util .concurrent .ConcurrentLinkedQueue ;
18
20
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
19
21
import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
20
22
21
23
import rx .Observable ;
22
24
import rx .Observable .Operator ;
23
25
import rx .Producer ;
24
26
import rx .Subscriber ;
27
+ import rx .exceptions .CompositeException ;
25
28
import rx .exceptions .MissingBackpressureException ;
29
+ import rx .exceptions .OnErrorThrowable ;
26
30
import rx .functions .Func1 ;
27
31
import rx .internal .util .RxRingBuffer ;
28
32
import rx .internal .util .ScalarSynchronousObservable ;
33
37
* <p>
34
38
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/Netflix/RxJava/images/rx-operators/merge.png" alt="">
35
39
* <p>
36
- * You can combine the items emitted by multiple {@code Observable}s so that they act like a single
37
- * {@code Observable}, by using the merge operation.
40
+ * You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
38
41
*
39
42
* @param <T>
40
43
* the type of the items emitted by both the source and merged {@code Observable}s
41
44
*/
42
- public final class OperatorMerge <T > implements Operator <T , Observable <? extends T >> {
45
+ public class OperatorMerge <T > implements Operator <T , Observable <? extends T >> {
46
+
47
+ public OperatorMerge () {
48
+ this .delayErrors = false ;
49
+ }
50
+
51
+ public OperatorMerge (boolean delayErrors ) {
52
+ this .delayErrors = delayErrors ;
53
+ }
54
+
55
+ private final boolean delayErrors ;
43
56
44
57
@ Override
45
58
public Subscriber <Observable <? extends T >> call (final Subscriber <? super T > child ) {
46
- return new MergeSubscriber <T >(child );
59
+ return new MergeSubscriber <T >(child , delayErrors );
47
60
48
61
}
49
62
@@ -53,6 +66,8 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
53
66
private final MergeProducer <T > mergeProducer ;
54
67
private int wip ;
55
68
private boolean completed ;
69
+ private final boolean delayErrors ;
70
+ private ConcurrentLinkedQueue <Throwable > exceptions ;
56
71
57
72
private volatile SubscriptionIndexedRingBuffer <InnerSubscriber <T >> childrenSubscribers ;
58
73
@@ -77,10 +92,11 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
77
92
* } </pre>
78
93
*/
79
94
80
- public MergeSubscriber (Subscriber <? super T > actual ) {
95
+ public MergeSubscriber (Subscriber <? super T > actual , boolean delayErrors ) {
81
96
super (actual );
82
97
this .actual = actual ;
83
98
this .mergeProducer = new MergeProducer <T >(this );
99
+ this .delayErrors = delayErrors ;
84
100
// decoupled the subscription chain because we need to decouple and control backpressure
85
101
actual .add (this );
86
102
actual .setProducer (mergeProducer );
@@ -337,8 +353,26 @@ public Boolean call(InnerSubscriber<T> s) {
337
353
338
354
@ Override
339
355
public void onError (Throwable e ) {
340
- actual .onError (e );
341
- unsubscribe ();
356
+ if (delayErrors ) {
357
+ synchronized (this ) {
358
+ if (exceptions == null ) {
359
+ exceptions = new ConcurrentLinkedQueue <Throwable >();
360
+ }
361
+ }
362
+ exceptions .add (e );
363
+ boolean sendOnComplete = false ;
364
+ synchronized (this ) {
365
+ wip --;
366
+ if (wip == 0 && completed ) {
367
+ sendOnComplete = true ;
368
+ }
369
+ }
370
+ if (sendOnComplete ) {
371
+ drainAndComplete ();
372
+ }
373
+ } else {
374
+ actual .onError (e );
375
+ }
342
376
}
343
377
344
378
@ Override
@@ -372,7 +406,25 @@ void completeInner(InnerSubscriber<T> s) {
372
406
373
407
private void drainAndComplete () {
374
408
drainQueuesIfNeeded (); // TODO need to confirm whether this is needed or not
375
- actual .onCompleted ();
409
+ if (delayErrors ) {
410
+ Queue <Throwable > es = null ;
411
+ synchronized (this ) {
412
+ es = exceptions ;
413
+ }
414
+ if (es != null ) {
415
+ if (es .isEmpty ()) {
416
+ actual .onCompleted ();
417
+ } else if (es .size () == 1 ) {
418
+ actual .onError (es .poll ());
419
+ } else {
420
+ actual .onError (new CompositeException (es ));
421
+ }
422
+ } else {
423
+ actual .onCompleted ();
424
+ }
425
+ } else {
426
+ actual .onCompleted ();
427
+ }
376
428
}
377
429
378
430
}
@@ -493,7 +545,12 @@ private void emit(T t, boolean complete) {
493
545
if (complete ) {
494
546
parentSubscriber .completeInner (this );
495
547
} else {
496
- parentSubscriber .actual .onNext (t );
548
+ try {
549
+ parentSubscriber .actual .onNext (t );
550
+ } catch (Throwable e ) {
551
+ // special error handling due to complexity of merge
552
+ onError (OnErrorThrowable .addValueAsLastCause (e , t ));
553
+ }
497
554
emitted ++;
498
555
}
499
556
} else {
@@ -503,7 +560,12 @@ private void emit(T t, boolean complete) {
503
560
if (complete ) {
504
561
parentSubscriber .completeInner (this );
505
562
} else {
506
- parentSubscriber .actual .onNext (t );
563
+ try {
564
+ parentSubscriber .actual .onNext (t );
565
+ } catch (Throwable e ) {
566
+ // special error handling due to complexity of merge
567
+ onError (OnErrorThrowable .addValueAsLastCause (e , t ));
568
+ }
507
569
emitted ++;
508
570
producer .REQUESTED .decrementAndGet (producer );
509
571
}
@@ -585,8 +647,13 @@ private int drainRequested() {
585
647
} else if (q .isCompleted (o )) {
586
648
parentSubscriber .completeInner (this );
587
649
} else {
588
- if (!q .accept (o , parentSubscriber .actual )) {
589
- emitted ++;
650
+ try {
651
+ if (!q .accept (o , parentSubscriber .actual )) {
652
+ emitted ++;
653
+ }
654
+ } catch (Throwable e ) {
655
+ // special error handling due to complexity of merge
656
+ onError (OnErrorThrowable .addValueAsLastCause (e , o ));
590
657
}
591
658
}
592
659
}
@@ -604,8 +671,13 @@ private int drainAll() {
604
671
if (q .isCompleted (o )) {
605
672
parentSubscriber .completeInner (this );
606
673
} else {
607
- if (!q .accept (o , parentSubscriber .actual )) {
608
- emitted ++;
674
+ try {
675
+ if (!q .accept (o , parentSubscriber .actual )) {
676
+ emitted ++;
677
+ }
678
+ } catch (Throwable e ) {
679
+ // special error handling due to complexity of merge
680
+ onError (OnErrorThrowable .addValueAsLastCause (e , o ));
609
681
}
610
682
}
611
683
}
0 commit comments