Skip to content

Commit a7f80c3

Browse files
TakeUntil & NotificationLite
A rewrite of TakeUntil that doesn't leak NotificationLite.COMPLETE outside of itself. It causes problems to leak the sentinels as other operators also using NotificationLite get confused.
1 parent be7cd75 commit a7f80c3

File tree

5 files changed

+100
-87
lines changed

5 files changed

+100
-87
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7261,7 +7261,7 @@ public final Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Schedu
72617261
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#takeuntil">RxJava Wiki: takeUntil()</a>
72627262
*/
72637263
public final <E> Observable<T> takeUntil(Observable<? extends E> other) {
7264-
return OperatorTakeUntil.takeUntil(this, other);
7264+
return lift(new OperatorTakeUntil<T, E>(other));
72657265
}
72667266

72677267
/**

rxjava-core/src/main/java/rx/internal/operators/NotificationLite.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,18 @@ public static <T> NotificationLite<T> instance() {
5353

5454
private static final Object ON_COMPLETED_SENTINEL = new Serializable() {
5555
private static final long serialVersionUID = 1;
56+
57+
public String toString() {
58+
return "Notification=>Completed";
59+
}
5660
};
5761

5862
private static final Object ON_NEXT_NULL_SENTINEL = new Serializable() {
5963
private static final long serialVersionUID = 2;
64+
65+
public String toString() {
66+
return "Notification=>NULL";
67+
}
6068
};
6169

6270
private static class OnErrorSentinel implements Serializable {
@@ -66,6 +74,10 @@ private static class OnErrorSentinel implements Serializable {
6674
public OnErrorSentinel(Throwable e) {
6775
this.e = e;
6876
}
77+
78+
public String toString() {
79+
return "Notification=>Error:" + e.getMessage();
80+
}
6981
}
7082

7183
/**

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeUntil.java

Lines changed: 44 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -16,100 +16,64 @@
1616
package rx.internal.operators;
1717

1818
import rx.Observable;
19+
import rx.Observable.Operator;
1920
import rx.Subscriber;
20-
import rx.functions.Func1;
21-
22-
import static rx.Observable.Operator;
2321

2422
/**
2523
* Returns an Observable that emits the items from the source Observable until another Observable
2624
* emits an item.
2725
* <p>
2826
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/takeUntil.png">
2927
*/
30-
public final class OperatorTakeUntil {
31-
32-
/**
33-
* Returns the values from the source observable sequence until the other observable sequence produces a value.
34-
*
35-
* @param source
36-
* the source sequence to propagate elements for.
37-
* @param other
38-
* the observable sequence that terminates propagation of elements of the source sequence.
39-
* @param <T>
40-
* the type of source.
41-
* @param <E>
42-
* the other type.
43-
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
44-
*/
45-
public static <T, E> Observable<T> takeUntil(final Observable<? extends T> source, final Observable<? extends E> other) {
46-
Observable<Object> s = source.lift(new SourceObservable<T>());
47-
Observable<Object> o = other.lift(new OtherObservable<E>());
48-
49-
Observable<Object> result = Observable.merge(s, o);
50-
51-
final NotificationLite<T> notification = NotificationLite.instance();
52-
53-
return result.takeWhile(new Func1<Object, Boolean>() {
28+
public final class OperatorTakeUntil<T, E> implements Operator<T, T> {
29+
30+
private final Observable<? extends E> other;
31+
32+
public OperatorTakeUntil(final Observable<? extends E> other) {
33+
this.other = other;
34+
}
35+
36+
@Override
37+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
38+
final Subscriber<T> parent = new Subscriber<T>(child) {
39+
40+
@Override
41+
public void onCompleted() {
42+
child.onCompleted();
43+
}
44+
45+
@Override
46+
public void onError(Throwable e) {
47+
child.onError(e);
48+
}
49+
5450
@Override
55-
public Boolean call(Object args) {
56-
return !notification.isCompleted(args);
51+
public void onNext(T t) {
52+
child.onNext(t);
5753
}
58-
}).map(new Func1<Object, T>() {
54+
55+
};
56+
57+
other.unsafeSubscribe(new Subscriber<E>(child) {
58+
59+
@Override
60+
public void onCompleted() {
61+
parent.onCompleted();
62+
}
63+
5964
@Override
60-
public T call(Object args) {
61-
return notification.getValue(args);
65+
public void onError(Throwable e) {
66+
parent.onError(e);
6267
}
68+
69+
@Override
70+
public void onNext(E t) {
71+
parent.onCompleted();
72+
}
73+
6374
});
64-
}
6575

66-
private final static class SourceObservable<T> implements Operator<Object, T> {
67-
68-
private final NotificationLite<T> notification = NotificationLite.instance();
69-
70-
@Override
71-
public Subscriber<? super T> call(final Subscriber<? super Object> subscriber) {
72-
return new Subscriber<T>(subscriber) {
73-
@Override
74-
public void onCompleted() {
75-
subscriber.onNext(notification.completed());
76-
}
77-
78-
@Override
79-
public void onError(Throwable e) {
80-
subscriber.onError(e);
81-
}
82-
83-
@Override
84-
public void onNext(T args) {
85-
subscriber.onNext(notification.next(args));
86-
}
87-
};
88-
}
76+
return parent;
8977
}
9078

91-
private final static class OtherObservable<E> implements Operator<Object, E> {
92-
93-
private final NotificationLite<E> notification = NotificationLite.instance();
94-
95-
@Override
96-
public Subscriber<? super E> call(final Subscriber<? super Object> subscriber) {
97-
return new Subscriber<E>(subscriber) {
98-
@Override
99-
public void onCompleted() {
100-
subscriber.onNext(notification.completed());
101-
}
102-
103-
@Override
104-
public void onError(Throwable e) {
105-
subscriber.onError(e);
106-
}
107-
108-
@Override
109-
public void onNext(E args) {
110-
subscriber.onNext(notification.completed());
111-
}
112-
};
113-
}
114-
}
11579
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import org.junit.Test;
21+
22+
23+
public class NotificationLiteTest {
24+
25+
@Test
26+
public void testComplete() {
27+
NotificationLite<Object> on = NotificationLite.instance();
28+
Object n = on.next("Hello");
29+
Object c = on.completed();
30+
31+
assertTrue(on.isCompleted(c));
32+
assertFalse(on.isCompleted(n));
33+
34+
assertEquals("Hello", on.getValue(n));
35+
}
36+
37+
38+
}

rxjava-core/src/test/java/rx/internal/operators/OperatorTakeUntilTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static org.mockito.Mockito.mock;
1919
import static org.mockito.Mockito.times;
2020
import static org.mockito.Mockito.verify;
21-
import static rx.internal.operators.OperatorTakeUntil.takeUntil;
2221

2322
import org.junit.Test;
2423

@@ -38,7 +37,7 @@ public void testTakeUntil() {
3837
TestObservable other = new TestObservable(sOther);
3938

4039
Observer<String> result = mock(Observer.class);
41-
Observable<String> stringObservable = takeUntil(Observable.create(source), Observable.create(other));
40+
Observable<String> stringObservable = Observable.create(source).takeUntil(Observable.create(other));
4241
stringObservable.subscribe(result);
4342
source.sendOnNext("one");
4443
source.sendOnNext("two");
@@ -65,7 +64,7 @@ public void testTakeUntilSourceCompleted() {
6564
TestObservable other = new TestObservable(sOther);
6665

6766
Observer<String> result = mock(Observer.class);
68-
Observable<String> stringObservable = takeUntil(Observable.create(source), Observable.create(other));
67+
Observable<String> stringObservable = Observable.create(source).takeUntil(Observable.create(other));
6968
stringObservable.subscribe(result);
7069
source.sendOnNext("one");
7170
source.sendOnNext("two");
@@ -88,7 +87,7 @@ public void testTakeUntilSourceError() {
8887
Throwable error = new Throwable();
8988

9089
Observer<String> result = mock(Observer.class);
91-
Observable<String> stringObservable = takeUntil(Observable.create(source), Observable.create(other));
90+
Observable<String> stringObservable = Observable.create(source).takeUntil(Observable.create(other));
9291
stringObservable.subscribe(result);
9392
source.sendOnNext("one");
9493
source.sendOnNext("two");
@@ -114,7 +113,7 @@ public void testTakeUntilOtherError() {
114113
Throwable error = new Throwable();
115114

116115
Observer<String> result = mock(Observer.class);
117-
Observable<String> stringObservable = takeUntil(Observable.create(source), Observable.create(other));
116+
Observable<String> stringObservable = Observable.create(source).takeUntil(Observable.create(other));
118117
stringObservable.subscribe(result);
119118
source.sendOnNext("one");
120119
source.sendOnNext("two");
@@ -143,7 +142,7 @@ public void testTakeUntilOtherCompleted() {
143142
TestObservable other = new TestObservable(sOther);
144143

145144
Observer<String> result = mock(Observer.class);
146-
Observable<String> stringObservable = takeUntil(Observable.create(source), Observable.create(other));
145+
Observable<String> stringObservable = Observable.create(source).takeUntil(Observable.create(other));
147146
stringObservable.subscribe(result);
148147
source.sendOnNext("one");
149148
source.sendOnNext("two");

0 commit comments

Comments
 (0)