Skip to content

Commit 9f84ecb

Browse files
Merge pull request #1050 from zsxwing/skip-last
Reimplement the 'SkipLast' operator
2 parents 50ba150 + a2604a8 commit 9f84ecb

File tree

7 files changed

+421
-422
lines changed

7 files changed

+421
-422
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import rx.operators.OperationSample;
8282
import rx.operators.OperationSequenceEqual;
8383
import rx.operators.OperationSkip;
84-
import rx.operators.OperationSkipLast;
8584
import rx.operators.OperationSkipUntil;
8685
import rx.operators.OperationSum;
8786
import rx.operators.OperationSwitch;
@@ -118,6 +117,8 @@
118117
import rx.operators.OperatorSerialize;
119118
import rx.operators.OperatorSingle;
120119
import rx.operators.OperatorSkip;
120+
import rx.operators.OperatorSkipLast;
121+
import rx.operators.OperatorSkipLastTimed;
121122
import rx.operators.OperatorSkipWhile;
122123
import rx.operators.OperatorSubscribeOn;
123124
import rx.operators.OperatorSynchronize;
@@ -6042,7 +6043,7 @@ public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
60426043
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
60436044
*/
60446045
public final Observable<T> skipLast(int count) {
6045-
return create(OperationSkipLast.skipLast(this, count));
6046+
return lift(new OperatorSkipLast<T>(count));
60466047
}
60476048

60486049
/**
@@ -6082,7 +6083,7 @@ public final Observable<T> skipLast(long time, TimeUnit unit) {
60826083
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
60836084
*/
60846085
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler) {
6085-
return create(new OperationSkipLast.SkipLastTimed<T>(this, time, unit, scheduler));
6086+
return lift(new OperatorSkipLastTimed<T>(time, unit, scheduler));
60866087
}
60876088

60886089
/**

rxjava-core/src/main/java/rx/operators/OperationSkipLast.java

Lines changed: 0 additions & 205 deletions
This file was deleted.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.ArrayDeque;
19+
import java.util.Deque;
20+
21+
import rx.Observable.Operator;
22+
import rx.Subscriber;
23+
24+
/**
25+
* Bypasses a specified number of elements at the end of an observable sequence.
26+
*/
27+
public class OperatorSkipLast<T> implements Operator<T, T> {
28+
29+
private final int count;
30+
31+
public OperatorSkipLast(int count) {
32+
if (count < 0) {
33+
throw new IndexOutOfBoundsException("count could not be negative");
34+
}
35+
this.count = count;
36+
}
37+
38+
@Override
39+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
40+
return new Subscriber<T>(subscriber) {
41+
42+
private final NotificationLite<T> on = NotificationLite.instance();
43+
44+
/**
45+
* Store the last count elements until now.
46+
*/
47+
private final Deque<Object> deque = new ArrayDeque<Object>();
48+
49+
@Override
50+
public void onCompleted() {
51+
subscriber.onCompleted();
52+
}
53+
54+
@Override
55+
public void onError(Throwable e) {
56+
subscriber.onError(e);
57+
}
58+
59+
@Override
60+
public void onNext(T value) {
61+
if (count == 0) {
62+
// If count == 0, we do not need to put value into deque
63+
// and remove it at once. We can emit the value
64+
// directly.
65+
subscriber.onNext(value);
66+
return;
67+
}
68+
if (deque.size() == count) {
69+
subscriber.onNext(on.getValue(deque.removeFirst()));
70+
}
71+
deque.offerLast(on.next(value));
72+
}
73+
74+
};
75+
}
76+
77+
}

0 commit comments

Comments
 (0)