File tree Expand file tree Collapse file tree 4 files changed +18
-6
lines changed
rxjava-core/src/main/java/rx/internal/operators Expand file tree Collapse file tree 4 files changed +18
-6
lines changed Original file line number Diff line number Diff line change @@ -70,7 +70,9 @@ public void request(long n) {
70
70
}
71
71
o .onNext (it .next ());
72
72
}
73
- o .onCompleted ();
73
+ if (!o .isUnsubscribed ()) {
74
+ o .onCompleted ();
75
+ }
74
76
} else if (n > 0 ) {
75
77
// backpressure is requested
76
78
long _c = REQUESTED_UPDATER .getAndAdd (this , n );
Original file line number Diff line number Diff line change @@ -64,7 +64,9 @@ public void request(long n) {
64
64
}
65
65
o .onNext ((int ) i );
66
66
}
67
- o .onCompleted ();
67
+ if (!o .isUnsubscribed ()) {
68
+ o .onCompleted ();
69
+ }
68
70
} else if (n > 0 ) {
69
71
// backpressure is requested
70
72
long _c = REQUESTED_UPDATER .getAndAdd (this , n );
Original file line number Diff line number Diff line change @@ -47,24 +47,24 @@ public void onCompleted() {
47
47
onError (e );
48
48
return ;
49
49
}
50
- observer .onCompleted ();
51
50
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
52
51
done = true ;
52
+ observer .onCompleted ();
53
53
}
54
54
55
55
@ Override
56
56
public void onError (Throwable e ) {
57
57
if (done ) {
58
58
return ;
59
59
}
60
+ done = true ;
60
61
try {
61
62
doOnEachObserver .onError (e );
62
63
} catch (Throwable e2 ) {
63
64
observer .onError (e2 );
64
65
return ;
65
66
}
66
67
observer .onError (e );
67
- done = true ;
68
68
}
69
69
70
70
@ Override
Original file line number Diff line number Diff line change @@ -49,32 +49,40 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
49
49
50
50
private int counter = 0 ;
51
51
52
+ private boolean done = false ;
53
+
52
54
@ Override
53
55
public void onNext (T args ) {
54
56
boolean isSelected ;
55
57
try {
56
58
isSelected = predicate .call (args , counter ++);
57
59
} catch (Throwable e ) {
60
+ done = true ;
58
61
subscriber .onError (e );
59
62
unsubscribe ();
60
63
return ;
61
64
}
62
65
if (isSelected ) {
63
66
subscriber .onNext (args );
64
67
} else {
68
+ done = true ;
65
69
subscriber .onCompleted ();
66
70
unsubscribe ();
67
71
}
68
72
}
69
73
70
74
@ Override
71
75
public void onCompleted () {
72
- subscriber .onCompleted ();
76
+ if (!done ) {
77
+ subscriber .onCompleted ();
78
+ }
73
79
}
74
80
75
81
@ Override
76
82
public void onError (Throwable e ) {
77
- subscriber .onError (e );
83
+ if (!done ) {
84
+ subscriber .onError (e );
85
+ }
78
86
}
79
87
80
88
};
You can’t perform that action at this time.
0 commit comments