Skip to content

Commit b5e4933

Browse files
Take: Fix Terminal State Handling
- even though unit tests don’t see it because of SafeObserver, the Take operator should not emit onCompleted more than once
1 parent 60ac55f commit b5e4933

File tree

3 files changed

+15
-12
lines changed

3 files changed

+15
-12
lines changed

rxjava-core/src/main/java/rx/operators/OperatorTake.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,28 @@ public Observer<T> call(final Observer<? super T> o, final OperatorSubscription
4747
return new Observer<T>() {
4848

4949
int count = 0;
50+
boolean completed = false;
5051

5152
@Override
5253
public void onCompleted() {
53-
o.onCompleted();
54+
if (!completed) {
55+
o.onCompleted();
56+
}
5457
}
5558

5659
@Override
5760
public void onError(Throwable e) {
58-
o.onError(e);
61+
if (!completed) {
62+
o.onError(e);
63+
}
5964
}
6065

6166
@Override
6267
public void onNext(T i) {
6368
if (!s.isUnsubscribed()) {
6469
o.onNext(i);
6570
if (++count >= limit) {
71+
completed = true;
6672
o.onCompleted();
6773
s.unsubscribe();
6874
}

rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ public void call() {
2727
/**
2828
* Observable.range(0, 10).take(5);
2929
*
30-
* Run: 10 - 6,660,042 ops/sec
31-
* Run: 11 - 6,721,423 ops/sec
32-
* Run: 12 - 6,556,035 ops/sec
33-
* Run: 13 - 6,692,284 ops/sec
34-
* Run: 14 - 6,731,287 ops/sec
30+
* Run: 10 - 8,780,556 ops/sec
31+
* Run: 11 - 8,822,590 ops/sec
32+
* Run: 12 - 8,842,733 ops/sec
33+
* Run: 13 - 8,825,486 ops/sec
34+
* Run: 14 - 8,771,545 ops/sec
3535
*/
3636
public long timeTake5() {
3737

rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,19 @@
1818
import static org.junit.Assert.*;
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
21-
import static rx.operators.OperatorTake.*;
2221

2322
import java.util.Arrays;
24-
import java.util.concurrent.TimeUnit;
2523
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
2625

2726
import org.junit.Test;
2827
import org.mockito.InOrder;
2928

3029
import rx.Observable;
3130
import rx.Observer;
3231
import rx.Subscription;
33-
import rx.operators.OperationSkipTest.CustomException;
34-
import rx.schedulers.TestScheduler;
35-
import rx.subjects.PublishSubject;
3632
import rx.subscriptions.Subscriptions;
33+
import rx.util.functions.Action0;
3734
import rx.util.functions.Func1;
3835

3936
public class OperatorTakeTest {

0 commit comments

Comments
 (0)