Skip to content

Commit dfd0642

Browse files
Merge branch 'OperatorSkipUntil' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 626980a + 825e4fb commit dfd0642

File tree

4 files changed

+92
-137
lines changed

4 files changed

+92
-137
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5624,7 +5624,7 @@ public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler schedule
56245624
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229358.aspx">MSDN: Observable.SkipUntil</a>
56255625
*/
56265626
public final <U> Observable<T> skipUntil(Observable<U> other) {
5627-
return create(new OperationSkipUntil<T, U>(this, other));
5627+
return lift(new OperatorSkipUntil<T, U>(other));
56285628
}
56295629

56305630
/**

rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java

Lines changed: 0 additions & 135 deletions
This file was deleted.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import rx.Observable;
20+
import rx.Observable.Operator;
21+
import rx.Subscriber;
22+
import rx.observers.SerializedSubscriber;
23+
24+
/**
25+
* Skip elements from the source Observable until the secondary
26+
* observable fires an element.
27+
*
28+
* If the secondary Observable fires no elements, the primary won't fire any elements.
29+
*
30+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229358.aspx'>MSDN: Observable.SkipUntil</a>
31+
*
32+
* @param <T> the source and result value type
33+
* @param <U> element type of the signalling observable
34+
*/
35+
public final class OperatorSkipUntil<T, U> implements Operator<T, T> {
36+
final Observable<U> other;
37+
38+
public OperatorSkipUntil(Observable<U> other) {
39+
this.other = other;
40+
}
41+
42+
@Override
43+
public Subscriber<? super T> call(Subscriber<? super T> child) {
44+
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
45+
final AtomicBoolean gate = new AtomicBoolean();
46+
// u needs to unsubscribe from other independently of child
47+
Subscriber<U> u = new Subscriber<U>() {
48+
49+
@Override
50+
public void onNext(U t) {
51+
gate.set(true);
52+
unsubscribe();
53+
}
54+
55+
@Override
56+
public void onError(Throwable e) {
57+
s.onError(e);
58+
s.unsubscribe();
59+
}
60+
61+
@Override
62+
public void onCompleted() {
63+
unsubscribe();
64+
}
65+
};
66+
child.add(u);
67+
other.unsafeSubscribe(u);
68+
69+
return new Subscriber<T>(child) {
70+
@Override
71+
public void onNext(T t) {
72+
if (gate.get()) {
73+
s.onNext(t);
74+
}
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
s.onError(e);
80+
unsubscribe();
81+
}
82+
83+
@Override
84+
public void onCompleted() {
85+
s.onCompleted();
86+
unsubscribe();
87+
}
88+
};
89+
}
90+
}

rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorSkipUntilTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import rx.Observer;
3030
import rx.subjects.PublishSubject;
3131

32-
public class OperationSkipUntilTest {
32+
public class OperatorSkipUntilTest {
3333
@Mock
3434
Observer<Object> observer;
3535

0 commit comments

Comments
 (0)