Skip to content

Commit 49075d6

Browse files
committed
OperatorTakeUntil
1 parent d62ddb7 commit 49075d6

File tree

3 files changed

+19
-30
lines changed

3 files changed

+19
-30
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6555,7 +6555,7 @@ public final Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Schedu
65556555
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-takeuntil">RxJava Wiki: takeUntil()</a>
65566556
*/
65576557
public final <E> Observable<T> takeUntil(Observable<? extends E> other) {
6558-
return OperationTakeUntil.takeUntil(this, other);
6558+
return OperatorTakeUntil.takeUntil(this, other);
65596559
}
65606560

65616561
/**

rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java renamed to rxjava-core/src/main/java/rx/operators/OperatorTakeUntil.java

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,18 @@
1616
package rx.operators;
1717

1818
import rx.Observable;
19-
import rx.Observable.OnSubscribeFunc;
20-
import rx.Observer;
2119
import rx.Subscriber;
22-
import rx.Subscription;
2320
import rx.functions.Func1;
2421

22+
import static rx.Observable.Operator;
23+
2524
/**
2625
* Returns an Observable that emits the items from the source Observable until another Observable
2726
* emits an item.
2827
* <p>
2928
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/takeUntil.png">
3029
*/
31-
public class OperationTakeUntil {
30+
public final class OperatorTakeUntil {
3231

3332
/**
3433
* Returns the values from the source observable sequence until the other observable sequence produces a value.
@@ -44,8 +43,8 @@ public class OperationTakeUntil {
4443
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
4544
*/
4645
public static <T, E> Observable<T> takeUntil(final Observable<? extends T> source, final Observable<? extends E> other) {
47-
Observable<Notification<T>> s = Observable.create(new SourceObservable<T>(source));
48-
Observable<Notification<T>> o = Observable.create(new OtherObservable<T, E>(other));
46+
Observable<Notification<T>> s = source.lift(new SourceObservable<T>());
47+
Observable<Notification<T>> o = other.lift(new OtherObservable<T, E>());
4948

5049
Observable<Notification<T>> result = Observable.merge(s, o);
5150

@@ -81,19 +80,14 @@ private Notification(boolean halt, T value) {
8180

8281
}
8382

84-
private static class SourceObservable<T> implements OnSubscribeFunc<Notification<T>> {
85-
private final Observable<? extends T> sequence;
86-
87-
private SourceObservable(Observable<? extends T> sequence) {
88-
this.sequence = sequence;
89-
}
83+
private static class SourceObservable<T> implements Operator<Notification<T>, T> {
9084

9185
@Override
92-
public Subscription onSubscribe(final Observer<? super Notification<T>> notificationObserver) {
93-
return sequence.unsafeSubscribe(new Subscriber<T>() {
86+
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> notificationObserver) {
87+
return new Subscriber<T>(notificationObserver) {
9488
@Override
9589
public void onCompleted() {
96-
notificationObserver.onNext(Notification.<T> halt());
90+
notificationObserver.onNext(Notification.<T>halt());
9791
}
9892

9993
@Override
@@ -105,23 +99,18 @@ public void onError(Throwable e) {
10599
public void onNext(T args) {
106100
notificationObserver.onNext(Notification.value(args));
107101
}
108-
});
102+
};
109103
}
110104
}
111105

112-
private static class OtherObservable<T, E> implements OnSubscribeFunc<Notification<T>> {
113-
private final Observable<? extends E> sequence;
114-
115-
private OtherObservable(Observable<? extends E> sequence) {
116-
this.sequence = sequence;
117-
}
106+
private static class OtherObservable<T, E> implements Operator<Notification<T>, E> {
118107

119108
@Override
120-
public Subscription onSubscribe(final Observer<? super Notification<T>> notificationObserver) {
121-
return sequence.unsafeSubscribe(new Subscriber<E>() {
109+
public Subscriber<? super E> call(final Subscriber<? super Notification<T>> notificationObserver) {
110+
return new Subscriber<E>(notificationObserver) {
122111
@Override
123112
public void onCompleted() {
124-
notificationObserver.onNext(Notification.<T> halt());
113+
notificationObserver.onNext(Notification.<T>halt());
125114
}
126115

127116
@Override
@@ -131,9 +120,9 @@ public void onError(Throwable e) {
131120

132121
@Override
133122
public void onNext(E args) {
134-
notificationObserver.onNext(Notification.<T> halt());
123+
notificationObserver.onNext(Notification.<T>halt());
135124
}
136-
});
125+
};
137126
}
138127
}
139128
}

rxjava-core/src/test/java/rx/operators/OperationTakeUntilTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorTakeUntilTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
import static org.mockito.Mockito.mock;
1919
import static org.mockito.Mockito.times;
2020
import static org.mockito.Mockito.verify;
21-
import static rx.operators.OperationTakeUntil.takeUntil;
21+
import static rx.operators.OperatorTakeUntil.takeUntil;
2222

2323
import org.junit.Test;
2424

2525
import rx.Observable;
2626
import rx.Observer;
2727
import rx.Subscription;
2828

29-
public class OperationTakeUntilTest {
29+
public class OperatorTakeUntilTest {
3030

3131
@Test
3232
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)