Skip to content

Commit cd717d6

Browse files
committed
OperatorTakeLast
1 parent d62ddb7 commit cd717d6

File tree

6 files changed

+305
-343
lines changed

6 files changed

+305
-343
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6356,7 +6356,7 @@ public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
63566356
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-takelast">RxJava Wiki: takeLast()</a>
63576357
*/
63586358
public final Observable<T> takeLast(final int count) {
6359-
return create(OperationTakeLast.takeLast(this, count));
6359+
return lift(new OperatorTakeLast<T>(count));
63606360
}
63616361

63626362
/**
@@ -6400,10 +6400,7 @@ public final Observable<T> takeLast(int count, long time, TimeUnit unit) {
64006400
* if {@code count} is less than zero
64016401
*/
64026402
public final Observable<T> takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) {
6403-
if (count < 0) {
6404-
throw new IllegalArgumentException("count >= 0 required");
6405-
}
6406-
return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler));
6403+
return lift(new OperatorTakeLastTimed<T>(count, time, unit, scheduler));
64076404
}
64086405

64096406
/**
@@ -6441,7 +6438,7 @@ public final Observable<T> takeLast(long time, TimeUnit unit) {
64416438
* provided by {@code scheduler}
64426439
*/
64436440
public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler) {
6444-
return create(OperationTakeLast.takeLast(this, time, unit, scheduler));
6441+
return lift(new OperatorTakeLastTimed<T>(time, unit, scheduler));
64456442
}
64466443

64476444
/**

rxjava-core/src/main/java/rx/operators/OperationTakeLast.java

Lines changed: 0 additions & 245 deletions
This file was deleted.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
21+
import java.util.ArrayDeque;
22+
import java.util.Deque;
23+
24+
/**
25+
* Returns an Observable that emits the last <code>count</code> items emitted by the source
26+
* Observable.
27+
* <p>
28+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/last.png">
29+
*/
30+
public final class OperatorTakeLast<T> implements Operator<T, T> {
31+
32+
private final int count;
33+
34+
public OperatorTakeLast(int count) {
35+
if (count < 0) {
36+
throw new IndexOutOfBoundsException("count could not be negative");
37+
}
38+
this.count = count;
39+
}
40+
41+
@Override
42+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
43+
return new Subscriber<T>(subscriber) {
44+
45+
private NotificationLite<T> notification = NotificationLite.instance();
46+
/**
47+
* Store the last count elements until now.
48+
*/
49+
private Deque<Object> deque = new ArrayDeque<Object>();
50+
51+
@Override
52+
public void onCompleted() {
53+
try {
54+
for (Object value : deque) {
55+
subscriber.onNext(notification.getValue(value));
56+
}
57+
} catch (Throwable e) {
58+
onError(e);
59+
return;
60+
}
61+
deque.clear();
62+
subscriber.onCompleted();
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
deque.clear();
68+
subscriber.onError(e);
69+
}
70+
71+
@Override
72+
public void onNext(T value) {
73+
if (count == 0) {
74+
// If count == 0, we do not need to put value into deque and
75+
// remove it at once. We can ignore the value directly.
76+
return;
77+
}
78+
if (deque.size() == count) {
79+
deque.removeFirst();
80+
}
81+
deque.offerLast(notification.next(value));
82+
}
83+
};
84+
}
85+
}

0 commit comments

Comments
 (0)