Skip to content

Commit 756e699

Browse files
committed
Only cache the latest items in the time window
1 parent 4e0ce47 commit 756e699

File tree

3 files changed

+25
-28
lines changed

3 files changed

+25
-28
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,7 +1300,7 @@ trait Observable[+T]
13001300
* <p>
13011301
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
13021302
*
1303-
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
1303+
* Note: this action will cache the latest items arriving in the specified time window.
13041304
*
13051305
* @param time the length of the time window
13061306
* @return an Observable that drops those items emitted by the source Observable in a time window before the
@@ -1316,7 +1316,7 @@ trait Observable[+T]
13161316
* <p>
13171317
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
13181318
*
1319-
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
1319+
* Note: this action will cache the latest items arriving in the specified time window.
13201320
*
13211321
* @param time the length of the time window
13221322
* @param scheduler the scheduler used as the time source

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5567,7 +5567,7 @@ public final Observable<T> skipLast(int count) {
55675567
* <p>
55685568
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
55695569
*
5570-
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
5570+
* Note: this action will cache the latest items arriving in the specified time window.
55715571
*
55725572
* @param time
55735573
* the length of the time window
@@ -5588,7 +5588,7 @@ public final Observable<T> skipLast(long time, TimeUnit unit) {
55885588
* <p>
55895589
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
55905590
*
5591-
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
5591+
* Note: this action will cache the latest items arriving in the specified time window.
55925592
*
55935593
* @param time
55945594
* the length of the time window

rxjava-core/src/main/java/rx/operators/OperatorSkipLastTimed.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collections;
20-
import java.util.List;
18+
import java.util.ArrayDeque;
19+
import java.util.Deque;
2120
import java.util.concurrent.TimeUnit;
2221

2322
import rx.Observable.Operator;
@@ -42,39 +41,37 @@ public OperatorSkipLastTimed(long time, TimeUnit unit, Scheduler scheduler) {
4241
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
4342
return new Subscriber<T>(subscriber) {
4443

45-
private List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();
44+
private Deque<Timestamped<T>> buffer = new ArrayDeque<Timestamped<T>>();
45+
46+
private void emitItemsOutOfWindow(long now) {
47+
long limit = now - timeInMillis;
48+
while (!buffer.isEmpty()) {
49+
Timestamped<T> v = buffer.getFirst();
50+
if (v.getTimestampMillis() < limit) {
51+
buffer.removeFirst();
52+
subscriber.onNext(v.getValue());
53+
} else {
54+
break;
55+
}
56+
}
57+
}
4658

4759
@Override
4860
public void onNext(T value) {
49-
buffer.add(new Timestamped<T>(scheduler.now(), value));
61+
long now = scheduler.now();
62+
emitItemsOutOfWindow(now);
63+
buffer.offerLast(new Timestamped<T>(now, value));
5064
}
5165

5266
@Override
5367
public void onError(Throwable e) {
54-
buffer = Collections.emptyList();
5568
subscriber.onError(e);
5669
}
5770

5871
@Override
5972
public void onCompleted() {
60-
long limit = scheduler.now() - timeInMillis;
61-
try {
62-
for (Timestamped<T> v : buffer) {
63-
if (v.getTimestampMillis() < limit) {
64-
try {
65-
subscriber.onNext(v.getValue());
66-
} catch (Throwable t) {
67-
subscriber.onError(t);
68-
return;
69-
}
70-
} else {
71-
break;
72-
}
73-
}
74-
subscriber.onCompleted();
75-
} finally {
76-
buffer = Collections.emptyList();
77-
}
73+
emitItemsOutOfWindow(scheduler.now());
74+
subscriber.onCompleted();
7875
}
7976

8077
};

0 commit comments

Comments
 (0)