Skip to content

Commit fe9d383

Browse files
committed
Reimplement "SkipLast" operator
1 parent 83407c9 commit fe9d383

File tree

3 files changed

+53
-112
lines changed

3 files changed

+53
-112
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6043,7 +6043,7 @@ public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
60436043
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
60446044
*/
60456045
public final Observable<T> skipLast(int count) {
6046-
return create(OperationSkipLast.skipLast(this, count));
6046+
return lift(new OperationSkipLast<T>(count));
60476047
}
60486048

60496049
/**

rxjava-core/src/main/java/rx/operators/OperationSkipLast.java

Lines changed: 44 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2014 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,10 +21,10 @@
2121
import java.util.LinkedList;
2222
import java.util.List;
2323
import java.util.concurrent.TimeUnit;
24-
import java.util.concurrent.locks.ReentrantLock;
2524

2625
import rx.Observable;
2726
import rx.Observable.OnSubscribeFunc;
27+
import rx.Observable.Operator;
2828
import rx.Observer;
2929
import rx.Scheduler;
3030
import rx.Subscriber;
@@ -34,106 +34,60 @@
3434
/**
3535
* Bypasses a specified number of elements at the end of an observable sequence.
3636
*/
37-
public class OperationSkipLast {
37+
public class OperationSkipLast<T> implements Operator<T, T> {
3838

39-
/**
40-
* Bypasses a specified number of elements at the end of an observable
41-
* sequence.
42-
* <p>
43-
* This operator accumulates a queue with a length enough to store the first
44-
* count elements. As more elements are received, elements are taken from
45-
* the front of the queue and produced on the result sequence. This causes
46-
* elements to be delayed.
47-
*
48-
* @param source
49-
* the source sequence.
50-
* @param count
51-
* number of elements to bypass at the end of the source
52-
* sequence.
53-
* @return An observable sequence containing the source sequence elements
54-
* except for the bypassed ones at the end.
55-
*
56-
* @throws IndexOutOfBoundsException
57-
* count is less than zero.
58-
*/
59-
public static <T> OnSubscribeFunc<T> skipLast(
60-
Observable<? extends T> source, int count) {
61-
return new SkipLast<T>(source, count);
62-
}
39+
private final int count;
6340

64-
private static class SkipLast<T> implements OnSubscribeFunc<T> {
65-
private final int count;
66-
private final Observable<? extends T> source;
67-
68-
private SkipLast(Observable<? extends T> source, int count) {
69-
this.count = count;
70-
this.source = source;
41+
public OperationSkipLast(int count) {
42+
if (count < 0) {
43+
throw new IndexOutOfBoundsException("count could not be negative");
7144
}
45+
this.count = count;
46+
}
7247

73-
public Subscription onSubscribe(final Observer<? super T> observer) {
74-
if (count < 0) {
75-
throw new IndexOutOfBoundsException(
76-
"count could not be negative");
77-
}
78-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
79-
return subscription.wrap(source.unsafeSubscribe(new Subscriber<T>() {
80-
81-
private final ReentrantLock lock = new ReentrantLock();
48+
@Override
49+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
50+
return new Subscriber<T>(subscriber) {
51+
/**
52+
* Store the last count elements until now.
53+
*/
54+
private final Deque<T> deque = new LinkedList<T>();
8255

83-
/**
84-
* Store the last count elements until now.
85-
*/
86-
private final Deque<T> deque = new LinkedList<T>();
56+
@Override
57+
public void onCompleted() {
58+
subscriber.onCompleted();
59+
}
8760

88-
@Override
89-
public void onCompleted() {
90-
observer.onCompleted();
91-
}
61+
@Override
62+
public void onError(Throwable e) {
63+
subscriber.onError(e);
64+
}
9265

93-
@Override
94-
public void onError(Throwable e) {
95-
observer.onError(e);
66+
@Override
67+
public void onNext(T value) {
68+
if (count == 0) {
69+
// If count == 0, we do not need to put value into deque
70+
// and remove it at once. We can emit the value
71+
// directly.
72+
subscriber.onNext(value);
73+
return;
9674
}
97-
98-
@Override
99-
public void onNext(T value) {
100-
if (count == 0) {
101-
// If count == 0, we do not need to put value into deque
102-
// and remove it at once. We can emit the value
103-
// directly.
104-
try {
105-
observer.onNext(value);
106-
} catch (Throwable ex) {
107-
observer.onError(ex);
108-
subscription.unsubscribe();
109-
}
110-
return;
111-
}
112-
lock.lock();
113-
try {
114-
deque.offerLast(value);
115-
if (deque.size() > count) {
116-
// Now deque has count + 1 elements, so the first
117-
// element in the deque definitely does not belong
118-
// to the last count elements of the source
119-
// sequence. We can emit it now.
120-
observer.onNext(deque.removeFirst());
121-
}
122-
} catch (Throwable ex) {
123-
observer.onError(ex);
124-
subscription.unsubscribe();
125-
} finally {
126-
lock.unlock();
127-
}
75+
deque.offerLast(value);
76+
if (deque.size() > count) {
77+
// Now deque has count + 1 elements, so the first
78+
// element in the deque definitely does not belong
79+
// to the last count elements of the source
80+
// sequence. We can emit it now.
81+
subscriber.onNext(deque.removeFirst());
12882
}
83+
}
12984

130-
}));
131-
}
85+
};
13286
}
13387

13488
/**
13589
* Skip delivering values in the time window before the values.
136-
*
90+
*
13791
* @param <T>
13892
* the result value type
13993
*/

rxjava-core/src/test/java/rx/operators/OperationSkipLastTest.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import static org.mockito.Mockito.never;
2222
import static org.mockito.Mockito.times;
2323
import static org.mockito.Mockito.verify;
24-
import static rx.operators.OperationSkipLast.skipLast;
2524

25+
import java.util.Arrays;
2626
import java.util.concurrent.TimeUnit;
2727

2828
import org.junit.Test;
@@ -38,8 +38,7 @@ public class OperationSkipLastTest {
3838

3939
@Test
4040
public void testSkipLastEmpty() {
41-
Observable<String> w = Observable.empty();
42-
Observable<String> observable = Observable.create(skipLast(w, 2));
41+
Observable<String> observable = Observable.<String>empty().skipLast(2);
4342

4443
@SuppressWarnings("unchecked")
4544
Observer<String> observer = mock(Observer.class);
@@ -51,8 +50,7 @@ public void testSkipLastEmpty() {
5150

5251
@Test
5352
public void testSkipLast1() {
54-
Observable<String> w = Observable.from("one", "two", "three");
55-
Observable<String> observable = Observable.create(skipLast(w, 2));
53+
Observable<String> observable = Observable.from(Arrays.asList("one", "two", "three")).skipLast(2);
5654

5755
@SuppressWarnings("unchecked")
5856
Observer<String> observer = mock(Observer.class);
@@ -67,8 +65,7 @@ public void testSkipLast1() {
6765

6866
@Test
6967
public void testSkipLast2() {
70-
Observable<String> w = Observable.from("one", "two");
71-
Observable<String> observable = Observable.create(skipLast(w, 2));
68+
Observable<String> observable = Observable.from(Arrays.asList("one", "two")).skipLast(2);
7269

7370
@SuppressWarnings("unchecked")
7471
Observer<String> observer = mock(Observer.class);
@@ -81,7 +78,7 @@ public void testSkipLast2() {
8178
@Test
8279
public void testSkipLastWithZeroCount() {
8380
Observable<String> w = Observable.from("one", "two");
84-
Observable<String> observable = Observable.create(skipLast(w, 0));
81+
Observable<String> observable = w.skipLast(0);
8582

8683
@SuppressWarnings("unchecked")
8784
Observer<String> observer = mock(Observer.class);
@@ -94,8 +91,7 @@ public void testSkipLastWithZeroCount() {
9491

9592
@Test
9693
public void testSkipLastWithNull() {
97-
Observable<String> w = Observable.from("one", null, "two");
98-
Observable<String> observable = Observable.create(skipLast(w, 1));
94+
Observable<String> observable = Observable.from(Arrays.asList("one", null, "two")).skipLast(1);
9995

10096
@SuppressWarnings("unchecked")
10197
Observer<String> observer = mock(Observer.class);
@@ -107,18 +103,9 @@ public void testSkipLastWithNull() {
107103
verify(observer, times(1)).onCompleted();
108104
}
109105

110-
@Test
106+
@Test(expected = IndexOutOfBoundsException.class)
111107
public void testSkipLastWithNegativeCount() {
112-
Observable<String> w = Observable.from("one");
113-
Observable<String> observable = Observable.create(skipLast(w, -1));
114-
115-
@SuppressWarnings("unchecked")
116-
Observer<String> observer = mock(Observer.class);
117-
observable.subscribe(observer);
118-
verify(observer, never()).onNext(any(String.class));
119-
verify(observer, times(1)).onError(
120-
any(IndexOutOfBoundsException.class));
121-
verify(observer, never()).onCompleted();
108+
Observable.from("one").skipLast(-1);
122109
}
123110

124111
@Test

0 commit comments

Comments
 (0)