@@ -84,8 +84,8 @@ public void onNext(T value) {
84
84
public void onError (Exception ex ) {
85
85
/* remember what the current subscription is so we can determine if someone unsubscribes concurrently */
86
86
AtomicObservableSubscription currentSubscription = subscriptionRef .get ();
87
- // check that we have not been unsubscribed before we can process the error
88
- if (currentSubscription != null ) {
87
+ // check that we have not been unsubscribed and not already resumed before we can process the error
88
+ if (currentSubscription == subscription ) {
89
89
/* error occurred, so switch subscription to the 'resumeSequence' */
90
90
AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription (resumeSequence .subscribe (observer ));
91
91
/* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */
@@ -148,8 +148,8 @@ public void testResumeNext() {
148
148
@ Test
149
149
public void testMapResumeAsyncNext () {
150
150
Subscription sr = mock (Subscription .class );
151
- // Trigger failure on the second event
152
- Observable <String > w = Observable .from ("one" , "fail" , "two" , "three" );
151
+ // Trigger multiple failures
152
+ Observable <String > w = Observable .from ("one" , "fail" , "two" , "three" , "fail" );
153
153
// Resume Observable is async
154
154
TestObservable resume = new TestObservable (sr , "twoResume" , "threeResume" );
155
155
0 commit comments