Skip to content

Commit 162e042

Browse files
Merge pull request #1938 from akarnokd/OperatorAnyFix
Any/All should not unsubscribe downstream.
2 parents e4239b5 + a40a659 commit 162e042

File tree

4 files changed

+44
-18
lines changed

4 files changed

+44
-18
lines changed

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public OperatorAll(Func1<? super T, Boolean> predicate) {
3434

3535
@Override
3636
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
37-
return new Subscriber<T>(child) {
37+
Subscriber<T> s = new Subscriber<T>() {
3838
boolean done;
3939

4040
@Override
@@ -65,5 +65,7 @@ public void onCompleted() {
6565
}
6666
}
6767
};
68+
child.add(s);
69+
return s;
6870
}
6971
}

src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public OperatorAny(Func1<? super T, Boolean> predicate, boolean returnOnEmpty) {
3636

3737
@Override
3838
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
39-
return new Subscriber<T>(child) {
39+
Subscriber<T> s = new Subscriber<T>() {
4040
boolean hasElements;
4141
boolean done;
4242

@@ -74,5 +74,7 @@ public void onCompleted() {
7474
}
7575

7676
};
77+
child.add(s);
78+
return s;
7779
}
7880
}

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertFalse;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.verify;
21-
import static org.mockito.Mockito.verifyNoMoreInteractions;
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.Arrays;
22+
import java.util.concurrent.TimeUnit;
2223

2324
import org.junit.Test;
2425

25-
import rx.Observable;
26-
import rx.Observer;
26+
import rx.*;
2727
import rx.functions.Func1;
2828

29-
import java.util.Arrays;
30-
3129
public class OperatorAllTest {
3230

3331
@Test
@@ -113,4 +111,21 @@ public Boolean call(Integer i) {
113111
});
114112
assertFalse(allOdd.toBlocking().first());
115113
}
114+
@Test(timeout = 5000)
115+
public void testIssue1935NoUnsubscribeDownstream() {
116+
Observable<Integer> source = Observable.just(1)
117+
.all(new Func1<Object, Boolean>() {
118+
@Override
119+
public Boolean call(Object t1) {
120+
return false;
121+
}
122+
})
123+
.flatMap(new Func1<Boolean, Observable<Integer>>() {
124+
@Override
125+
public Observable<Integer> call(Boolean t1) {
126+
return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
127+
}
128+
});
129+
assertEquals((Object)2, source.toBlocking().first());
130+
}
116131
}

src/test/java/rx/internal/operators/OperatorAnyTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,16 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertTrue;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.never;
21-
import static org.mockito.Mockito.times;
22-
import static org.mockito.Mockito.verify;
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
2320

2421
import java.util.Arrays;
22+
import java.util.concurrent.TimeUnit;
2523

2624
import org.junit.Test;
2725

28-
import rx.Observable;
29-
import rx.Observer;
26+
import rx.*;
3027
import rx.functions.Func1;
31-
import rx.functions.Functions;
3228
import rx.internal.util.UtilityFunctions;
3329

3430
public class OperatorAnyTest {
@@ -213,4 +209,15 @@ public Boolean call(Integer i) {
213209
});
214210
assertTrue(anyEven.toBlocking().first());
215211
}
212+
@Test(timeout = 5000)
213+
public void testIssue1935NoUnsubscribeDownstream() {
214+
Observable<Integer> source = Observable.just(1).isEmpty()
215+
.flatMap(new Func1<Boolean, Observable<Integer>>() {
216+
@Override
217+
public Observable<Integer> call(Boolean t1) {
218+
return Observable.just(2).delay(500, TimeUnit.MILLISECONDS);
219+
}
220+
});
221+
assertEquals((Object)2, source.toBlocking().first());
222+
}
216223
}

0 commit comments

Comments
 (0)