Skip to content

Commit a40a659

Browse files
committed
Fixed OperatorAll unsubscribing downstream.
1 parent ea26109 commit a40a659

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public OperatorAll(Func1<? super T, Boolean> predicate) {
3434

3535
@Override
3636
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
37-
return new Subscriber<T>(child) {
37+
Subscriber<T> s = new Subscriber<T>() {
3838
boolean done;
3939

4040
@Override
@@ -65,5 +65,7 @@ public void onCompleted() {
6565
}
6666
}
6767
};
68+
child.add(s);
69+
return s;
6870
}
6971
}

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertFalse;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.verify;
21-
import static org.mockito.Mockito.verifyNoMoreInteractions;
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.Arrays;
22+
import java.util.concurrent.TimeUnit;
2223

2324
import org.junit.Test;
2425

25-
import rx.Observable;
26-
import rx.Observer;
26+
import rx.*;
2727
import rx.functions.Func1;
2828

29-
import java.util.Arrays;
30-
3129
public class OperatorAllTest {
3230

3331
@Test
@@ -113,4 +111,21 @@ public Boolean call(Integer i) {
113111
});
114112
assertFalse(allOdd.toBlocking().first());
115113
}
114+
@Test(timeout = 5000)
115+
public void testIssue1935NoUnsubscribeDownstream() {
116+
Observable<Integer> source = Observable.just(1)
117+
.all(new Func1<Object, Boolean>() {
118+
@Override
119+
public Boolean call(Object t1) {
120+
return false;
121+
}
122+
})
123+
.flatMap(new Func1<Boolean, Observable<Integer>>() {
124+
@Override
125+
public Observable<Integer> call(Boolean t1) {
126+
return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
127+
}
128+
});
129+
assertEquals((Object)2, source.toBlocking().first());
130+
}
116131
}

0 commit comments

Comments
 (0)