Skip to content

Commit 877ee89

Browse files
Merge pull request #1454 from zsxwing/issue1451
Fix issue #1451
2 parents 201f54d + 6bbd921 commit 877ee89

File tree

5 files changed

+86
-4
lines changed

5 files changed

+86
-4
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ public void request(long n) {
7070
}
7171
o.onNext(it.next());
7272
}
73-
o.onCompleted();
73+
if (!o.isUnsubscribed()) {
74+
o.onCompleted();
75+
}
7476
} else if(n > 0) {
7577
// backpressure is requested
7678
long _c = REQUESTED_UPDATER.getAndAdd(this, n);

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ public void request(long n) {
6464
}
6565
o.onNext((int) i);
6666
}
67-
o.onCompleted();
67+
if (!o.isUnsubscribed()) {
68+
o.onCompleted();
69+
}
6870
} else if (n > 0) {
6971
// backpressure is requested
7072
long _c = REQUESTED_UPDATER.getAndAdd(this, n);

rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,31 @@ public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
3333
@Override
3434
public Subscriber<? super T> call(final Subscriber<? super T> observer) {
3535
return new Subscriber<T>(observer) {
36+
37+
private boolean done = false;
38+
3639
@Override
3740
public void onCompleted() {
41+
if (done) {
42+
return;
43+
}
3844
try {
3945
doOnEachObserver.onCompleted();
4046
} catch (Throwable e) {
4147
onError(e);
4248
return;
4349
}
50+
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
51+
done = true;
4452
observer.onCompleted();
4553
}
4654

4755
@Override
4856
public void onError(Throwable e) {
57+
if (done) {
58+
return;
59+
}
60+
done = true;
4961
try {
5062
doOnEachObserver.onError(e);
5163
} catch (Throwable e2) {
@@ -57,6 +69,9 @@ public void onError(Throwable e) {
5769

5870
@Override
5971
public void onNext(T value) {
72+
if (done) {
73+
return;
74+
}
6075
try {
6176
doOnEachObserver.onNext(value);
6277
} catch (Throwable e) {

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeWhile.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,32 +49,40 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
4949

5050
private int counter = 0;
5151

52+
private boolean done = false;
53+
5254
@Override
5355
public void onNext(T args) {
5456
boolean isSelected;
5557
try {
5658
isSelected = predicate.call(args, counter++);
5759
} catch (Throwable e) {
60+
done = true;
5861
subscriber.onError(e);
5962
unsubscribe();
6063
return;
6164
}
6265
if (isSelected) {
6366
subscriber.onNext(args);
6467
} else {
68+
done = true;
6569
subscriber.onCompleted();
6670
unsubscribe();
6771
}
6872
}
6973

7074
@Override
7175
public void onCompleted() {
72-
subscriber.onCompleted();
76+
if (!done) {
77+
subscriber.onCompleted();
78+
}
7379
}
7480

7581
@Override
7682
public void onError(Throwable e) {
77-
subscriber.onError(e);
83+
if (!done) {
84+
subscriber.onError(e);
85+
}
7886
}
7987

8088
};

rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
@@ -30,6 +31,9 @@
3031
import rx.functions.Action1;
3132
import rx.functions.Func1;
3233

34+
import java.util.List;
35+
import java.util.concurrent.atomic.AtomicInteger;
36+
3337
public class OperatorDoOnEachTest {
3438

3539
@Mock
@@ -114,4 +118,55 @@ public void call(String s) {
114118

115119
}
116120

121+
@Test
122+
public void testIssue1451Case1() {
123+
// https://github.com/Netflix/RxJava/issues/1451
124+
int[] nums = {1, 2, 3};
125+
final AtomicInteger count = new AtomicInteger();
126+
for (final int n : nums) {
127+
Observable
128+
.from(Boolean.TRUE, Boolean.FALSE)
129+
.takeWhile(new Func1<Boolean, Boolean>() {
130+
@Override
131+
public Boolean call(Boolean value) {
132+
return value;
133+
}
134+
})
135+
.toList()
136+
.doOnNext(new Action1<List<Boolean>>() {
137+
@Override
138+
public void call(List<Boolean> booleans) {
139+
count.incrementAndGet();
140+
}
141+
})
142+
.subscribe();
143+
}
144+
assertEquals(nums.length, count.get());
145+
}
146+
147+
@Test
148+
public void testIssue1451Case2() {
149+
// https://github.com/Netflix/RxJava/issues/1451
150+
int[] nums = {1, 2, 3};
151+
final AtomicInteger count = new AtomicInteger();
152+
for (final int n : nums) {
153+
Observable
154+
.from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
155+
.takeWhile(new Func1<Boolean, Boolean>() {
156+
@Override
157+
public Boolean call(Boolean value) {
158+
return value;
159+
}
160+
})
161+
.toList()
162+
.doOnNext(new Action1<List<Boolean>>() {
163+
@Override
164+
public void call(List<Boolean> booleans) {
165+
count.incrementAndGet();
166+
}
167+
})
168+
.subscribe();
169+
}
170+
assertEquals(nums.length, count.get());
171+
}
117172
}

0 commit comments

Comments
 (0)