Skip to content

Commit fc9dbd5

Browse files
Merge pull request #1457 from benjchristensen/mergeDelayError-backpressure
MergeDelayError & OnErrorFlatMap w/ Merge
2 parents cb20468 + cc11773 commit fc9dbd5

File tree

5 files changed

+148
-149
lines changed

5 files changed

+148
-149
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,18 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.Queue;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
1820
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
1921
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2022

2123
import rx.Observable;
2224
import rx.Observable.Operator;
2325
import rx.Producer;
2426
import rx.Subscriber;
27+
import rx.exceptions.CompositeException;
2528
import rx.exceptions.MissingBackpressureException;
29+
import rx.exceptions.OnErrorThrowable;
2630
import rx.functions.Func1;
2731
import rx.internal.util.RxRingBuffer;
2832
import rx.internal.util.ScalarSynchronousObservable;
@@ -33,17 +37,26 @@
3337
* <p>
3438
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/Netflix/RxJava/images/rx-operators/merge.png" alt="">
3539
* <p>
36-
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single
37-
* {@code Observable}, by using the merge operation.
40+
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
3841
*
3942
* @param <T>
4043
* the type of the items emitted by both the source and merged {@code Observable}s
4144
*/
42-
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
45+
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
46+
47+
public OperatorMerge() {
48+
this.delayErrors = false;
49+
}
50+
51+
public OperatorMerge(boolean delayErrors) {
52+
this.delayErrors = delayErrors;
53+
}
54+
55+
private final boolean delayErrors;
4356

4457
@Override
4558
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
46-
return new MergeSubscriber<T>(child);
59+
return new MergeSubscriber<T>(child, delayErrors);
4760

4861
}
4962

@@ -53,6 +66,8 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
5366
private final MergeProducer<T> mergeProducer;
5467
private int wip;
5568
private boolean completed;
69+
private final boolean delayErrors;
70+
private ConcurrentLinkedQueue<Throwable> exceptions;
5671

5772
private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
5873

@@ -77,10 +92,11 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
7792
* } </pre>
7893
*/
7994

80-
public MergeSubscriber(Subscriber<? super T> actual) {
95+
public MergeSubscriber(Subscriber<? super T> actual, boolean delayErrors) {
8196
super(actual);
8297
this.actual = actual;
8398
this.mergeProducer = new MergeProducer<T>(this);
99+
this.delayErrors = delayErrors;
84100
// decoupled the subscription chain because we need to decouple and control backpressure
85101
actual.add(this);
86102
actual.setProducer(mergeProducer);
@@ -337,8 +353,26 @@ public Boolean call(InnerSubscriber<T> s) {
337353

338354
@Override
339355
public void onError(Throwable e) {
340-
actual.onError(e);
341-
unsubscribe();
356+
if (delayErrors) {
357+
synchronized (this) {
358+
if (exceptions == null) {
359+
exceptions = new ConcurrentLinkedQueue<Throwable>();
360+
}
361+
}
362+
exceptions.add(e);
363+
boolean sendOnComplete = false;
364+
synchronized (this) {
365+
wip--;
366+
if (wip == 0 && completed) {
367+
sendOnComplete = true;
368+
}
369+
}
370+
if (sendOnComplete) {
371+
drainAndComplete();
372+
}
373+
} else {
374+
actual.onError(e);
375+
}
342376
}
343377

344378
@Override
@@ -372,7 +406,25 @@ void completeInner(InnerSubscriber<T> s) {
372406

373407
private void drainAndComplete() {
374408
drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not
375-
actual.onCompleted();
409+
if (delayErrors) {
410+
Queue<Throwable> es = null;
411+
synchronized (this) {
412+
es = exceptions;
413+
}
414+
if (es != null) {
415+
if (es.isEmpty()) {
416+
actual.onCompleted();
417+
} else if (es.size() == 1) {
418+
actual.onError(es.poll());
419+
} else {
420+
actual.onError(new CompositeException(es));
421+
}
422+
} else {
423+
actual.onCompleted();
424+
}
425+
} else {
426+
actual.onCompleted();
427+
}
376428
}
377429

378430
}
@@ -493,7 +545,12 @@ private void emit(T t, boolean complete) {
493545
if (complete) {
494546
parentSubscriber.completeInner(this);
495547
} else {
496-
parentSubscriber.actual.onNext(t);
548+
try {
549+
parentSubscriber.actual.onNext(t);
550+
} catch (Throwable e) {
551+
// special error handling due to complexity of merge
552+
onError(OnErrorThrowable.addValueAsLastCause(e, t));
553+
}
497554
emitted++;
498555
}
499556
} else {
@@ -503,7 +560,12 @@ private void emit(T t, boolean complete) {
503560
if (complete) {
504561
parentSubscriber.completeInner(this);
505562
} else {
506-
parentSubscriber.actual.onNext(t);
563+
try {
564+
parentSubscriber.actual.onNext(t);
565+
} catch (Throwable e) {
566+
// special error handling due to complexity of merge
567+
onError(OnErrorThrowable.addValueAsLastCause(e, t));
568+
}
507569
emitted++;
508570
producer.REQUESTED.decrementAndGet(producer);
509571
}
@@ -585,8 +647,13 @@ private int drainRequested() {
585647
} else if (q.isCompleted(o)) {
586648
parentSubscriber.completeInner(this);
587649
} else {
588-
if (!q.accept(o, parentSubscriber.actual)) {
589-
emitted++;
650+
try {
651+
if (!q.accept(o, parentSubscriber.actual)) {
652+
emitted++;
653+
}
654+
} catch (Throwable e) {
655+
// special error handling due to complexity of merge
656+
onError(OnErrorThrowable.addValueAsLastCause(e, o));
590657
}
591658
}
592659
}
@@ -604,8 +671,13 @@ private int drainAll() {
604671
if (q.isCompleted(o)) {
605672
parentSubscriber.completeInner(this);
606673
} else {
607-
if (!q.accept(o, parentSubscriber.actual)) {
608-
emitted++;
674+
try {
675+
if (!q.accept(o, parentSubscriber.actual)) {
676+
emitted++;
677+
}
678+
} catch (Throwable e) {
679+
// special error handling due to complexity of merge
680+
onError(OnErrorThrowable.addValueAsLastCause(e, o));
609681
}
610682
}
611683
}
Lines changed: 22 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,20 @@
1-
/**
2-
* Copyright 2014 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20-
import rx.Observable;
21-
import rx.Observable.Operator;
22-
import rx.Subscriber;
23-
import rx.exceptions.CompositeException;
24-
import rx.observers.SerializedSubscriber;
25-
import rx.subscriptions.CompositeSubscription;
26-
2718
/**
2819
* This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
2920
* an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
@@ -37,113 +28,15 @@
3728
* This operation allows an Observer to receive all successfully emitted items from all of the
3829
* source Observables without being interrupted by an error notification from one of them.
3930
* <p>
40-
* <em>Note:</em> If this is used on an Observable that never completes, it will never call
41-
* {@code onError} and will effectively swallow errors.
31+
* <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
4232
*
43-
* @param <T> the source and result value type
33+
* @param <T>
34+
* the source and result value type
4435
*/
45-
public final class OperatorMergeDelayError<T> implements Operator<T, Observable<? extends T>> {
46-
47-
@Override
48-
public Subscriber<? super Observable<? extends T>> call(Subscriber<? super T> child) {
49-
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
50-
final CompositeSubscription csub = new CompositeSubscription();
51-
child.add(csub);
52-
53-
return new MergeDelayErrorSubscriber<T>(s, csub);
54-
}
55-
56-
static final class MergeDelayErrorSubscriber<T> extends Subscriber<Observable<? extends T>> {
57-
final Subscriber<? super T> s;
58-
final CompositeSubscription csub;
59-
final ConcurrentLinkedQueue<Throwable> exceptions = new ConcurrentLinkedQueue<Throwable>();
60-
61-
volatile int wip;
62-
@SuppressWarnings("rawtypes")
63-
static final AtomicIntegerFieldUpdater<MergeDelayErrorSubscriber> WIP_UPDATER
64-
= AtomicIntegerFieldUpdater.newUpdater(MergeDelayErrorSubscriber.class, "wip");
65-
66-
public MergeDelayErrorSubscriber(Subscriber<? super T> s, CompositeSubscription csub) {
67-
super(s);
68-
this.s = s;
69-
this.csub = csub;
70-
this.wip = 1;
71-
}
72-
73-
@Override
74-
public void onNext(Observable<? extends T> t) {
75-
WIP_UPDATER.incrementAndGet(this);
76-
77-
Subscriber<T> itemSub = new Subscriber<T>() {
78-
/** Make sure terminal events are handled once to avoid wip problems. */
79-
boolean once = true;
80-
@Override
81-
public void onNext(T t) {
82-
// prevent misbehaving source to emit past the error
83-
if (once) {
84-
try {
85-
s.onNext(t);
86-
} catch (Throwable e) {
87-
// in case the source doesn't properly handle exceptions
88-
onError(e);
89-
}
90-
}
91-
}
92-
93-
@Override
94-
public void onError(Throwable e) {
95-
if (once) {
96-
once = false;
97-
error(e);
98-
}
99-
}
100-
101-
@Override
102-
public void onCompleted() {
103-
if (once) {
104-
once = false;
105-
try {
106-
complete();
107-
} finally {
108-
csub.remove(this);
109-
}
110-
}
111-
}
112-
113-
};
114-
csub.add(itemSub);
115-
116-
t.unsafeSubscribe(itemSub);
117-
}
118-
119-
@Override
120-
public void onError(Throwable e) {
121-
error(e);
122-
}
123-
124-
@Override
125-
public void onCompleted() {
126-
complete();
127-
}
36+
public final class OperatorMergeDelayError<T> extends OperatorMerge<T> {
12837

129-
void error(Throwable e) {
130-
exceptions.add(e);
131-
complete();
132-
}
133-
134-
void complete() {
135-
if (WIP_UPDATER.decrementAndGet(this) == 0) {
136-
if (exceptions.isEmpty()) {
137-
s.onCompleted();
138-
} else
139-
if (exceptions.size() > 1) {
140-
s.onError(new CompositeException(exceptions));
141-
} else {
142-
s.onError(exceptions.peek());
143-
}
144-
exceptions.clear();
145-
unsubscribe();
146-
}
147-
}
38+
public OperatorMergeDelayError() {
39+
super(true);
14840
}
41+
14942
}

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ public void onError(final Throwable e) {
107107
if (terminated) {
108108
return;
109109
}
110-
terminated = true;
111110
if (emitting) {
112111
if (queue == null) {
113112
queue = new FastList();
@@ -121,6 +120,9 @@ public void onError(final Throwable e) {
121120
}
122121
drainQueue(list);
123122
actual.onError(e);
123+
synchronized(this) {
124+
emitting = false;
125+
}
124126
}
125127

126128
@Override

0 commit comments

Comments
 (0)