Skip to content

Commit 312cbf1

Browse files
Merge pull request #1135 from zsxwing/take-until
OperatorTakeUntil
2 parents eebcd08 + 129e531 commit 312cbf1

File tree

4 files changed

+118
-142
lines changed

4 files changed

+118
-142
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6552,7 +6552,7 @@ public final Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Schedu
65526552
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-takeuntil">RxJava Wiki: takeUntil()</a>
65536553
*/
65546554
public final <E> Observable<T> takeUntil(Observable<? extends E> other) {
6555-
return OperationTakeUntil.takeUntil(this, other);
6555+
return OperatorTakeUntil.takeUntil(this, other);
65566556
}
65576557

65586558
/**

rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java

Lines changed: 0 additions & 139 deletions
This file was deleted.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Subscriber;
20+
import rx.functions.Func1;
21+
22+
import static rx.Observable.Operator;
23+
24+
/**
25+
* Returns an Observable that emits the items from the source Observable until another Observable
26+
* emits an item.
27+
* <p>
28+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/takeUntil.png">
29+
*/
30+
public final class OperatorTakeUntil {
31+
32+
/**
33+
* Returns the values from the source observable sequence until the other observable sequence produces a value.
34+
*
35+
* @param source
36+
* the source sequence to propagate elements for.
37+
* @param other
38+
* the observable sequence that terminates propagation of elements of the source sequence.
39+
* @param <T>
40+
* the type of source.
41+
* @param <E>
42+
* the other type.
43+
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
44+
*/
45+
public static <T, E> Observable<T> takeUntil(final Observable<? extends T> source, final Observable<? extends E> other) {
46+
Observable<Object> s = source.lift(new SourceObservable<T>());
47+
Observable<Object> o = other.lift(new OtherObservable<E>());
48+
49+
Observable<Object> result = Observable.merge(s, o);
50+
51+
final NotificationLite<T> notification = NotificationLite.instance();
52+
53+
return result.takeWhile(new Func1<Object, Boolean>() {
54+
@Override
55+
public Boolean call(Object args) {
56+
return !notification.isCompleted(args);
57+
}
58+
}).map(new Func1<Object, T>() {
59+
@Override
60+
public T call(Object args) {
61+
return notification.getValue(args);
62+
}
63+
});
64+
}
65+
66+
private final static class SourceObservable<T> implements Operator<Object, T> {
67+
68+
private final NotificationLite<T> notification = NotificationLite.instance();
69+
70+
@Override
71+
public Subscriber<? super T> call(final Subscriber<? super Object> subscriber) {
72+
return new Subscriber<T>(subscriber) {
73+
@Override
74+
public void onCompleted() {
75+
subscriber.onNext(notification.completed());
76+
}
77+
78+
@Override
79+
public void onError(Throwable e) {
80+
subscriber.onError(e);
81+
}
82+
83+
@Override
84+
public void onNext(T args) {
85+
subscriber.onNext(notification.next(args));
86+
}
87+
};
88+
}
89+
}
90+
91+
private final static class OtherObservable<E> implements Operator<Object, E> {
92+
93+
private final NotificationLite<E> notification = NotificationLite.instance();
94+
95+
@Override
96+
public Subscriber<? super E> call(final Subscriber<? super Object> subscriber) {
97+
return new Subscriber<E>(subscriber) {
98+
@Override
99+
public void onCompleted() {
100+
subscriber.onNext(notification.completed());
101+
}
102+
103+
@Override
104+
public void onError(Throwable e) {
105+
subscriber.onError(e);
106+
}
107+
108+
@Override
109+
public void onNext(E args) {
110+
subscriber.onNext(notification.completed());
111+
}
112+
};
113+
}
114+
}
115+
}

rxjava-core/src/test/java/rx/operators/OperationTakeUntilTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorTakeUntilTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
import static org.mockito.Mockito.mock;
1919
import static org.mockito.Mockito.times;
2020
import static org.mockito.Mockito.verify;
21-
import static rx.operators.OperationTakeUntil.takeUntil;
21+
import static rx.operators.OperatorTakeUntil.takeUntil;
2222

2323
import org.junit.Test;
2424

2525
import rx.Observable;
2626
import rx.Observer;
2727
import rx.Subscription;
2828

29-
public class OperationTakeUntilTest {
29+
public class OperatorTakeUntilTest {
3030

3131
@Test
3232
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)