Skip to content

Commit de31844

Browse files
Implemented Skip using Lift in Observable and OperatorSkip
Deleted non-time part from OperationSkip Moved tests to OperatorSkipTest
1 parent cbbf514 commit de31844

File tree

6 files changed

+92
-188
lines changed

6 files changed

+92
-188
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -49,87 +49,7 @@
4949
import rx.observables.ConnectableObservable;
5050
import rx.observables.GroupedObservable;
5151
import rx.observers.SafeSubscriber;
52-
import rx.operators.OnSubscribeFromIterable;
53-
import rx.operators.OnSubscribeRange;
54-
import rx.operators.OperationAll;
55-
import rx.operators.OperationAmb;
56-
import rx.operators.OperationAny;
57-
import rx.operators.OperationAsObservable;
58-
import rx.operators.OperationAverage;
59-
import rx.operators.OperationBuffer;
60-
import rx.operators.OperationCache;
61-
import rx.operators.OperationCombineLatest;
62-
import rx.operators.OperationConcat;
63-
import rx.operators.OperationDebounce;
64-
import rx.operators.OperationDefaultIfEmpty;
65-
import rx.operators.OperationDefer;
66-
import rx.operators.OperationDelay;
67-
import rx.operators.OperationDematerialize;
68-
import rx.operators.OperationDistinct;
69-
import rx.operators.OperationDistinctUntilChanged;
70-
import rx.operators.OperationElementAt;
71-
import rx.operators.OperationFinally;
72-
import rx.operators.OperationFlatMap;
73-
import rx.operators.OperationGroupByUntil;
74-
import rx.operators.OperationGroupJoin;
75-
import rx.operators.OperationInterval;
76-
import rx.operators.OperationJoin;
77-
import rx.operators.OperationJoinPatterns;
78-
import rx.operators.OperationMaterialize;
79-
import rx.operators.OperationMergeDelayError;
80-
import rx.operators.OperationMergeMaxConcurrent;
81-
import rx.operators.OperationMinMax;
82-
import rx.operators.OperationMulticast;
83-
import rx.operators.OperationOnErrorResumeNextViaObservable;
84-
import rx.operators.OperationOnErrorReturn;
85-
import rx.operators.OperationOnExceptionResumeNextViaObservable;
86-
import rx.operators.OperationParallelMerge;
87-
import rx.operators.OperationReplay;
88-
import rx.operators.OperationRetry;
89-
import rx.operators.OperationSample;
90-
import rx.operators.OperationSequenceEqual;
91-
import rx.operators.OperationSingle;
92-
import rx.operators.OperationSkip;
93-
import rx.operators.OperationSkipLast;
94-
import rx.operators.OperationSkipUntil;
95-
import rx.operators.OperationSkipWhile;
96-
import rx.operators.OperationSum;
97-
import rx.operators.OperationSwitch;
98-
import rx.operators.OperationSynchronize;
99-
import rx.operators.OperationTakeLast;
100-
import rx.operators.OperationTakeTimed;
101-
import rx.operators.OperationTakeUntil;
102-
import rx.operators.OperationTakeWhile;
103-
import rx.operators.OperationThrottleFirst;
104-
import rx.operators.OperationTimeInterval;
105-
import rx.operators.OperationTimer;
106-
import rx.operators.OperationToMap;
107-
import rx.operators.OperationToMultimap;
108-
import rx.operators.OperationToObservableFuture;
109-
import rx.operators.OperationUsing;
110-
import rx.operators.OperationWindow;
111-
import rx.operators.OperatorCast;
112-
import rx.operators.OperatorDoOnEach;
113-
import rx.operators.OperatorFilter;
114-
import rx.operators.OperatorGroupBy;
115-
import rx.operators.OperatorMap;
116-
import rx.operators.OperatorMerge;
117-
import rx.operators.OperatorObserveOn;
118-
import rx.operators.OperatorOnErrorResumeNextViaFunction;
119-
import rx.operators.OperatorOnErrorFlatMap;
120-
import rx.operators.OperatorParallel;
121-
import rx.operators.OperatorRepeat;
122-
import rx.operators.OperatorScan;
123-
import rx.operators.OperatorSubscribeOn;
124-
import rx.operators.OperatorTake;
125-
import rx.operators.OperatorTimeout;
126-
import rx.operators.OperatorTimeoutWithSelector;
127-
import rx.operators.OperatorTimestamp;
128-
import rx.operators.OperatorToObservableList;
129-
import rx.operators.OperatorToObservableSortedList;
130-
import rx.operators.OperatorUnsubscribeOn;
131-
import rx.operators.OperatorZip;
132-
import rx.operators.OperatorZipIterable;
52+
import rx.operators.*;
13353
import rx.plugins.RxJavaObservableExecutionHook;
13454
import rx.plugins.RxJavaPlugins;
13555
import rx.schedulers.Schedulers;
@@ -6274,7 +6194,7 @@ public final Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Bool
62746194
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-skip">RxJava Wiki: skip()</a>
62756195
*/
62766196
public final Observable<T> skip(int num) {
6277-
return create(OperationSkip.skip(this, num));
6197+
return lift(new OperatorSkip<T>(num));
62786198
}
62796199

62806200
/**

rxjava-core/src/main/java/rx/operators/OperationSkip.java

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -39,81 +39,6 @@
3939
*/
4040
public final class OperationSkip {
4141

42-
/**
43-
* Skips a specified number of contiguous values from the start of a Observable sequence and then returns the remaining values.
44-
*
45-
* @param items
46-
* @param num
47-
* @return the observable sequence starting after a number of skipped values
48-
*
49-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">Observable.Skip(TSource) Method</a>
50-
*/
51-
public static <T> OnSubscribeFunc<T> skip(final Observable<? extends T> items, final int num) {
52-
// wrap in a Observable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
53-
return new OnSubscribeFunc<T>() {
54-
55-
@Override
56-
public Subscription onSubscribe(Observer<? super T> observer) {
57-
return new Skip<T>(items, num).onSubscribe(observer);
58-
}
59-
60-
};
61-
}
62-
63-
/**
64-
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
65-
* <p>
66-
* It IS thread-safe from within it while receiving onNext events from multiple threads.
67-
*
68-
* @param <T>
69-
*/
70-
private static class Skip<T> implements OnSubscribeFunc<T> {
71-
private final int num;
72-
private final Observable<? extends T> items;
73-
74-
Skip(final Observable<? extends T> items, final int num) {
75-
this.num = num;
76-
this.items = items;
77-
}
78-
79-
public Subscription onSubscribe(Observer<? super T> observer) {
80-
return items.subscribe(new ItemObserver(observer));
81-
}
82-
83-
/**
84-
* Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count.
85-
*/
86-
private class ItemObserver implements Observer<T> {
87-
88-
private AtomicInteger counter = new AtomicInteger();
89-
private final Observer<? super T> observer;
90-
91-
public ItemObserver(Observer<? super T> observer) {
92-
this.observer = observer;
93-
}
94-
95-
@Override
96-
public void onCompleted() {
97-
observer.onCompleted();
98-
}
99-
100-
@Override
101-
public void onError(Throwable e) {
102-
observer.onError(e);
103-
}
104-
105-
@Override
106-
public void onNext(T args) {
107-
// skip them until we reach the 'num' value
108-
if (counter.incrementAndGet() > num) {
109-
observer.onNext(args);
110-
}
111-
}
112-
113-
}
114-
115-
}
116-
11742
/**
11843
* Skip the items after subscription for the given duration.
11944
*

rxjava-core/src/main/java/rx/operators/OperatorMap.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,4 @@ public void onNext(T t) {
6161
}
6262

6363
}
64+
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.Subscriber;
5+
6+
/**
7+
* Returns an Observable that skips the first <code>num</code> items emitted by the source
8+
* Observable.
9+
* <p>
10+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/skip.png">
11+
* <p>
12+
* You can ignore the first <code>num</code> items emitted by an Observable and attend only to
13+
* those items that come after, by modifying the Observable with the skip operation.
14+
*/
15+
public final class OperatorSkip<T> implements Observable.Operator<T, T> {
16+
17+
int n;
18+
19+
public OperatorSkip(int n) {
20+
this.n = n;
21+
}
22+
23+
@Override
24+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
25+
return new Subscriber<T>(child) {
26+
27+
@Override
28+
public void onCompleted() {
29+
child.onCompleted();
30+
}
31+
32+
@Override
33+
public void onError(Throwable e) {
34+
child.onError(e);
35+
}
36+
37+
@Override
38+
public void onNext(T t) {
39+
if(n <= 0) {
40+
child.onNext(t);
41+
} else {
42+
n -= 1;
43+
}
44+
}
45+
46+
};
47+
}
48+
49+
}

rxjava-core/src/test/java/rx/operators/OperationSkipTest.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import static org.mockito.Matchers.*;
1919
import static org.mockito.Mockito.*;
20-
import static rx.operators.OperationSkip.*;
2120

2221
import java.util.concurrent.TimeUnit;
2322

@@ -31,36 +30,6 @@
3130

3231
public class OperationSkipTest {
3332

34-
@Test
35-
public void testSkip1() {
36-
Observable<String> w = Observable.from("one", "two", "three");
37-
Observable<String> skip = Observable.create(skip(w, 2));
38-
39-
@SuppressWarnings("unchecked")
40-
Observer<String> observer = mock(Observer.class);
41-
skip.subscribe(observer);
42-
verify(observer, never()).onNext("one");
43-
verify(observer, never()).onNext("two");
44-
verify(observer, times(1)).onNext("three");
45-
verify(observer, never()).onError(any(Throwable.class));
46-
verify(observer, times(1)).onCompleted();
47-
}
48-
49-
@Test
50-
public void testSkip2() {
51-
Observable<String> w = Observable.from("one", "two", "three");
52-
Observable<String> skip = Observable.create(skip(w, 1));
53-
54-
@SuppressWarnings("unchecked")
55-
Observer<String> observer = mock(Observer.class);
56-
skip.subscribe(observer);
57-
verify(observer, never()).onNext("one");
58-
verify(observer, times(1)).onNext("two");
59-
verify(observer, times(1)).onNext("three");
60-
verify(observer, never()).onError(any(Throwable.class));
61-
verify(observer, times(1)).onCompleted();
62-
}
63-
6433
@Test
6534
public void testSkipTimed() {
6635
TestScheduler scheduler = new TestScheduler();
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package rx.operators;
2+
3+
import org.junit.Test;
4+
import rx.Observable;
5+
import rx.Observer;
6+
7+
import static org.mockito.Mockito.*;
8+
9+
public class OperatorSkipTest {
10+
11+
@Test
12+
public void testSkip1() {
13+
Observable<String> w = Observable.from("one", "two", "three");
14+
Observable<String> skip = w.lift(new OperatorSkip<String>(2));
15+
16+
@SuppressWarnings("unchecked")
17+
Observer<String> observer = mock(Observer.class);
18+
skip.subscribe(observer);
19+
verify(observer, never()).onNext("one");
20+
verify(observer, never()).onNext("two");
21+
verify(observer, times(1)).onNext("three");
22+
verify(observer, never()).onError(any(Throwable.class));
23+
verify(observer, times(1)).onCompleted();
24+
}
25+
26+
@Test
27+
public void testSkip2() {
28+
Observable<String> w = Observable.from("one", "two", "three");
29+
Observable<String> skip = w.lift(new OperatorSkip<String>(1));
30+
31+
@SuppressWarnings("unchecked")
32+
Observer<String> observer = mock(Observer.class);
33+
skip.subscribe(observer);
34+
verify(observer, never()).onNext("one");
35+
verify(observer, times(1)).onNext("two");
36+
verify(observer, times(1)).onNext("three");
37+
verify(observer, never()).onError(any(Throwable.class));
38+
verify(observer, times(1)).onCompleted();
39+
}
40+
}

0 commit comments

Comments
 (0)