Skip to content

Commit cd4d7ec

Browse files
Merge pull request #833 from benjchristensen/take-issue-830
Take operator was breaking the unsubscribe chain
2 parents d93dc37 + ff394f7 commit cd4d7ec

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

rxjava-core/src/main/java/rx/operators/OperatorTake.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,22 @@ public OperatorTake(int limit) {
3838
}
3939

4040
@Override
41-
public Subscriber<? super T> call(final Subscriber<? super T> o) {
42-
CompositeSubscription parent = new CompositeSubscription();
41+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
42+
final CompositeSubscription parent = new CompositeSubscription();
4343
if (limit == 0) {
44-
o.onCompleted();
44+
child.onCompleted();
4545
parent.unsubscribe();
4646
}
47+
4748
/*
4849
* We decouple the parent and child subscription so there can be multiple take() in a chain
4950
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
5051
*
5152
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
53+
*
54+
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
5255
*/
56+
child.add(parent);
5357
return new Subscriber<T>(parent) {
5458

5559
int count = 0;
@@ -58,24 +62,24 @@ public Subscriber<? super T> call(final Subscriber<? super T> o) {
5862
@Override
5963
public void onCompleted() {
6064
if (!completed) {
61-
o.onCompleted();
65+
child.onCompleted();
6266
}
6367
}
6468

6569
@Override
6670
public void onError(Throwable e) {
6771
if (!completed) {
68-
o.onError(e);
72+
child.onError(e);
6973
}
7074
}
7175

7276
@Override
7377
public void onNext(T i) {
7478
if (!isUnsubscribed()) {
75-
o.onNext(i);
79+
child.onNext(i);
7680
if (++count >= limit) {
7781
completed = true;
78-
o.onCompleted();
82+
child.onCompleted();
7983
unsubscribe();
8084
}
8185
}

rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.Arrays;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.concurrent.atomic.AtomicLong;
2526

2627
import org.junit.Test;
@@ -210,6 +211,33 @@ public void call(Long l) {
210211
assertEquals(10, count.get());
211212
}
212213

214+
@Test(timeout = 2000)
215+
public void testMultiTake() {
216+
final AtomicInteger count = new AtomicInteger();
217+
Observable.create(new OnSubscribe<Integer>() {
218+
219+
@Override
220+
public void call(Subscriber<? super Integer> s) {
221+
for (int i = 0; !s.isUnsubscribed(); i++) {
222+
System.out.println("Emit: " + i);
223+
count.incrementAndGet();
224+
s.onNext(i);
225+
}
226+
}
227+
228+
}).take(100).take(1).toBlockingObservable().forEach(new Action1<Integer>() {
229+
230+
@Override
231+
public void call(Integer t1) {
232+
System.out.println("Receive: " + t1);
233+
234+
}
235+
236+
});
237+
238+
assertEquals(1, count.get());
239+
}
240+
213241
private static class TestObservableFunc implements Observable.OnSubscribeFunc<String> {
214242

215243
final Subscription s;

0 commit comments

Comments
 (0)