Skip to content

Commit 2a92d7e

Browse files
Merge pull request #541 from akarnokd/SkipUntil
Operation SkipUntil
2 parents fd7982a + a17b592 commit 2a92d7e

File tree

3 files changed

+293
-0
lines changed

3 files changed

+293
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import rx.operators.OperationScan;
7777
import rx.operators.OperationSkip;
7878
import rx.operators.OperationSkipLast;
79+
import rx.operators.OperationSkipUntil;
7980
import rx.operators.OperationSkipWhile;
8081
import rx.operators.OperationSubscribeOn;
8182
import rx.operators.OperationSum;
@@ -6125,4 +6126,18 @@ public <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T, ? ex
61256126
public <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func0<? extends Map<K, Collection<V>>> mapFactory, Func1<? super K, ? extends Collection<V>> collectionFactory) {
61266127
return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory, collectionFactory));
61276128
}
6129+
6130+
/**
6131+
* Return an Observable that skips elements from the source Observable until the secondary
6132+
* observable emits an element.
6133+
*
6134+
* @param other the other Observable that has to emit an element before this
6135+
* Observable's elements are relayed
6136+
* @return an Observable that skips elements from the source Observable until the secondary
6137+
* observable emits an element.
6138+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229358.aspx'>MSDN: Observable.SkipUntil</a>
6139+
*/
6140+
public <U> Observable<T> skipUntil(Observable<U> other) {
6141+
return create(new OperationSkipUntil<T, U>(this, other));
6142+
}
61286143
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import rx.Observable;
20+
import rx.Observable.OnSubscribeFunc;
21+
import rx.Observer;
22+
import rx.Subscription;
23+
import rx.subscriptions.CompositeSubscription;
24+
import rx.subscriptions.SerialSubscription;
25+
26+
/**
27+
* Skip elements from the source Observable until the secondary
28+
* observable fires an element.
29+
*
30+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229358.aspx'>MSDN: Observable.SkipUntil</a>
31+
*/
32+
public class OperationSkipUntil<T, U> implements OnSubscribeFunc<T> {
33+
protected final Observable<T> source;
34+
protected final Observable<U> other;
35+
public OperationSkipUntil(Observable<T> source, Observable<U> other) {
36+
this.source = source;
37+
this.other = other;
38+
}
39+
40+
@Override
41+
public Subscription onSubscribe(Observer<? super T> t1) {
42+
return new ResultManager(t1).init();
43+
}
44+
/** Manage the source and other observers. */
45+
private class ResultManager implements Subscription, Observer<T> {
46+
final Observer<? super T> observer;
47+
final CompositeSubscription cancel;
48+
final Object guard = new Object();
49+
final AtomicBoolean running = new AtomicBoolean();
50+
public ResultManager(Observer<? super T> observer) {
51+
this.observer = observer;
52+
this.cancel = new CompositeSubscription();
53+
}
54+
public ResultManager init() {
55+
56+
SerialSubscription toSource = new SerialSubscription();
57+
SerialSubscription toOther = new SerialSubscription();
58+
59+
cancel.add(toSource);
60+
cancel.add(toOther);
61+
62+
toSource.setSubscription(source.subscribe(this));
63+
toOther.setSubscription(other.subscribe(new OtherObserver(toOther)));
64+
65+
return this;
66+
}
67+
68+
@Override
69+
public void unsubscribe() {
70+
cancel.unsubscribe();
71+
}
72+
73+
@Override
74+
public void onNext(T args) {
75+
if (running.get()) {
76+
observer.onNext(args);
77+
}
78+
}
79+
80+
@Override
81+
public void onError(Throwable e) {
82+
synchronized (guard) {
83+
observer.onError(e);
84+
unsubscribe();
85+
}
86+
}
87+
88+
@Override
89+
public void onCompleted() {
90+
synchronized (guard) {
91+
observer.onCompleted();
92+
unsubscribe();
93+
}
94+
}
95+
96+
/** Observe the other stream. */
97+
private class OtherObserver implements Observer<U> {
98+
final Subscription self;
99+
public OtherObserver(Subscription self) {
100+
this.self = self;
101+
}
102+
103+
@Override
104+
public void onNext(U args) {
105+
running.set(true);
106+
self.unsubscribe();
107+
}
108+
109+
@Override
110+
public void onError(Throwable e) {
111+
ResultManager.this.onError(e);
112+
}
113+
114+
@Override
115+
public void onCompleted() {
116+
self.unsubscribe();
117+
}
118+
119+
}
120+
}
121+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
import static org.mockito.Matchers.any;
21+
import org.mockito.Mock;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
import org.mockito.MockitoAnnotations;
26+
import rx.Observable;
27+
import rx.Observer;
28+
import rx.subjects.PublishSubject;
29+
30+
public class OperationSkipUntilTest {
31+
@Mock
32+
Observer<Object> observer;
33+
34+
@Before
35+
public void before() {
36+
MockitoAnnotations.initMocks(this);
37+
}
38+
39+
@Test
40+
public void normal1() {
41+
PublishSubject<Integer> source = PublishSubject.create();
42+
PublishSubject<Integer> other = PublishSubject.create();
43+
44+
Observable<Integer> m = source.skipUntil(other);
45+
m.subscribe(observer);
46+
47+
source.onNext(0);
48+
source.onNext(1);
49+
50+
other.onNext(100);
51+
52+
source.onNext(2);
53+
source.onNext(3);
54+
source.onNext(4);
55+
source.onCompleted();
56+
57+
verify(observer, never()).onError(any(Throwable.class));
58+
verify(observer, times(1)).onNext(2);
59+
verify(observer, times(1)).onNext(3);
60+
verify(observer, times(1)).onNext(4);
61+
verify(observer, times(1)).onCompleted();
62+
}
63+
@Test
64+
public void otherNeverFires() {
65+
PublishSubject<Integer> source = PublishSubject.create();
66+
67+
Observable<Integer> m = source.skipUntil(Observable.never());
68+
69+
m.subscribe(observer);
70+
71+
source.onNext(0);
72+
source.onNext(1);
73+
source.onNext(2);
74+
source.onNext(3);
75+
source.onNext(4);
76+
source.onCompleted();
77+
78+
verify(observer, never()).onError(any(Throwable.class));
79+
verify(observer, never()).onNext(any());
80+
verify(observer, times(1)).onCompleted();
81+
}
82+
@Test
83+
public void otherEmpty() {
84+
PublishSubject<Integer> source = PublishSubject.create();
85+
86+
Observable<Integer> m = source.skipUntil(Observable.empty());
87+
88+
m.subscribe(observer);
89+
90+
verify(observer, never()).onError(any(Throwable.class));
91+
verify(observer, never()).onNext(any());
92+
verify(observer, never()).onCompleted();
93+
}
94+
@Test
95+
public void otherFiresAndCompletes() {
96+
PublishSubject<Integer> source = PublishSubject.create();
97+
PublishSubject<Integer> other = PublishSubject.create();
98+
99+
Observable<Integer> m = source.skipUntil(other);
100+
m.subscribe(observer);
101+
102+
source.onNext(0);
103+
source.onNext(1);
104+
105+
other.onNext(100);
106+
other.onCompleted();
107+
108+
source.onNext(2);
109+
source.onNext(3);
110+
source.onNext(4);
111+
source.onCompleted();
112+
113+
verify(observer, never()).onError(any(Throwable.class));
114+
verify(observer, times(1)).onNext(2);
115+
verify(observer, times(1)).onNext(3);
116+
verify(observer, times(1)).onNext(4);
117+
verify(observer, times(1)).onCompleted();
118+
}
119+
@Test
120+
public void sourceThrows() {
121+
PublishSubject<Integer> source = PublishSubject.create();
122+
PublishSubject<Integer> other = PublishSubject.create();
123+
124+
Observable<Integer> m = source.skipUntil(other);
125+
m.subscribe(observer);
126+
127+
source.onNext(0);
128+
source.onNext(1);
129+
130+
other.onNext(100);
131+
other.onCompleted();
132+
133+
source.onNext(2);
134+
source.onError(new RuntimeException("Forced failure"));
135+
136+
verify(observer, times(1)).onNext(2);
137+
verify(observer, times(1)).onError(any(Throwable.class));
138+
verify(observer, never()).onCompleted();
139+
}
140+
@Test
141+
public void otherThrowsImmediately() {
142+
PublishSubject<Integer> source = PublishSubject.create();
143+
PublishSubject<Integer> other = PublishSubject.create();
144+
145+
Observable<Integer> m = source.skipUntil(other);
146+
m.subscribe(observer);
147+
148+
source.onNext(0);
149+
source.onNext(1);
150+
151+
other.onError(new RuntimeException("Forced failure"));
152+
153+
verify(observer, never()).onNext(any());
154+
verify(observer, times(1)).onError(any(Throwable.class));
155+
verify(observer, never()).onCompleted();
156+
}
157+
}

0 commit comments

Comments
 (0)