Skip to content

Commit 53b4659

Browse files
committed
Fix issue #1451
1 parent cb75b97 commit 53b4659

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorDoOnEach.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,45 @@ public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
3333
@Override
3434
public Subscriber<? super T> call(final Subscriber<? super T> observer) {
3535
return new Subscriber<T>(observer) {
36+
37+
private boolean done = false;
38+
3639
@Override
3740
public void onCompleted() {
41+
if (done) {
42+
return;
43+
}
3844
try {
3945
doOnEachObserver.onCompleted();
4046
} catch (Throwable e) {
4147
onError(e);
4248
return;
4349
}
4450
observer.onCompleted();
51+
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
52+
done = true;
4553
}
4654

4755
@Override
4856
public void onError(Throwable e) {
57+
if (done) {
58+
return;
59+
}
4960
try {
5061
doOnEachObserver.onError(e);
5162
} catch (Throwable e2) {
5263
observer.onError(e2);
5364
return;
5465
}
5566
observer.onError(e);
67+
done = true;
5668
}
5769

5870
@Override
5971
public void onNext(T value) {
72+
if (done) {
73+
return;
74+
}
6075
try {
6176
doOnEachObserver.onNext(value);
6277
} catch (Throwable e) {

rxjava-core/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
@@ -30,6 +31,9 @@
3031
import rx.functions.Action1;
3132
import rx.functions.Func1;
3233

34+
import java.util.List;
35+
import java.util.concurrent.atomic.AtomicInteger;
36+
3337
public class OperatorDoOnEachTest {
3438

3539
@Mock
@@ -114,4 +118,55 @@ public void call(String s) {
114118

115119
}
116120

121+
@Test
122+
public void testIssue1451Case1() {
123+
// https://github.com/Netflix/RxJava/issues/1451
124+
int[] nums = {1, 2, 3};
125+
final AtomicInteger count = new AtomicInteger();
126+
for (final int n : nums) {
127+
Observable
128+
.from(Boolean.TRUE, Boolean.FALSE)
129+
.takeWhile(new Func1<Boolean, Boolean>() {
130+
@Override
131+
public Boolean call(Boolean value) {
132+
return value;
133+
}
134+
})
135+
.toList()
136+
.doOnNext(new Action1<List<Boolean>>() {
137+
@Override
138+
public void call(List<Boolean> booleans) {
139+
count.incrementAndGet();
140+
}
141+
})
142+
.subscribe();
143+
}
144+
assertEquals(nums.length, count.get());
145+
}
146+
147+
@Test
148+
public void testIssue1451Case2() {
149+
// https://github.com/Netflix/RxJava/issues/1451
150+
int[] nums = {1, 2, 3};
151+
final AtomicInteger count = new AtomicInteger();
152+
for (final int n : nums) {
153+
Observable
154+
.from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
155+
.takeWhile(new Func1<Boolean, Boolean>() {
156+
@Override
157+
public Boolean call(Boolean value) {
158+
return value;
159+
}
160+
})
161+
.toList()
162+
.doOnNext(new Action1<List<Boolean>>() {
163+
@Override
164+
public void call(List<Boolean> booleans) {
165+
count.incrementAndGet();
166+
}
167+
})
168+
.subscribe();
169+
}
170+
assertEquals(nums.length, count.get());
171+
}
117172
}

0 commit comments

Comments
 (0)