|
29 | 29 | import rx.util.AtomicObservableSubscription;
|
30 | 30 | import rx.util.functions.Func1;
|
31 | 31 | import rx.util.functions.Func2;
|
32 |
| - |
| 32 | +import rx.subjects.Subject; |
33 | 33 | /**
|
34 | 34 | * Returns a specified number of contiguous values from the start of an observable sequence.
|
35 | 35 | */
|
@@ -178,6 +178,37 @@ public Boolean call(Integer input) {
|
178 | 178 | verify(aObserver, times(1)).onCompleted();
|
179 | 179 | }
|
180 | 180 |
|
| 181 | + @Test |
| 182 | + public void testTakeWhileOnSubject1() { |
| 183 | + Subject<Integer> s = Subject.create(); |
| 184 | + Observable<Integer> w = (Observable<Integer>)s; |
| 185 | + Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() { |
| 186 | + @Override |
| 187 | + public Boolean call(Integer input) { |
| 188 | + return input < 3; |
| 189 | + } |
| 190 | + })); |
| 191 | + |
| 192 | + @SuppressWarnings("unchecked") |
| 193 | + Observer<Integer> aObserver = mock(Observer.class); |
| 194 | + take.subscribe(aObserver); |
| 195 | + |
| 196 | + s.onNext(1); |
| 197 | + s.onNext(2); |
| 198 | + s.onNext(3); |
| 199 | + s.onNext(4); |
| 200 | + s.onNext(5); |
| 201 | + s.onCompleted(); |
| 202 | + |
| 203 | + verify(aObserver, times(1)).onNext(1); |
| 204 | + verify(aObserver, times(1)).onNext(2); |
| 205 | + verify(aObserver, never()).onNext(3); |
| 206 | + verify(aObserver, never()).onNext(4); |
| 207 | + verify(aObserver, never()).onNext(5); |
| 208 | + verify(aObserver, never()).onError(any(Exception.class)); |
| 209 | + verify(aObserver, times(1)).onCompleted(); |
| 210 | + } |
| 211 | + |
181 | 212 | @Test
|
182 | 213 | public void testTakeWhile2() {
|
183 | 214 | Observable<String> w = Observable.toObservable("one", "two", "three");
|
|
0 commit comments