Skip to content

Commit c9be48c

Browse files
Merge pull request #638 from akarnokd/OperationTakeLastwBufferDeferSubscr
Operators DelaySubscription, TakeLast w/ time, TakeLastBuffer
2 parents e748b6c + fa1b0eb commit c9be48c

File tree

5 files changed

+539
-1
lines changed

5 files changed

+539
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,6 +2253,31 @@ public Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
22532253
return OperationDelay.delay(this, delay, unit, scheduler);
22542254
}
22552255

2256+
/**
2257+
* Return an Observable which delays the subscription to this Observable sequence
2258+
* by the given amount.
2259+
* @param delay the time to delay the subscription
2260+
* @param unit the time unit
2261+
* @return an Observable which delays the subscription to this Observable sequence
2262+
* by the given amount.
2263+
*/
2264+
public Observable<T> delaySubscription(long delay, TimeUnit unit) {
2265+
return delaySubscription(delay, unit, Schedulers.threadPoolForComputation());
2266+
}
2267+
2268+
/**
2269+
* Return an Observable which delays the subscription to this Observable sequence
2270+
* by the given amount, waiting and subscribing on the given scheduler.
2271+
* @param delay the time to delay the subscription
2272+
* @param unit the time unit
2273+
* @param scheduler the scheduler where the waiting and subscription will happen
2274+
* @return an Observable which delays the subscription to this Observable sequence
2275+
* by the given amount, waiting and subscribing on the given scheduler
2276+
*/
2277+
public Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
2278+
return create(OperationDelay.delaySubscription(this, delay, unit, scheduler));
2279+
}
2280+
22562281
/**
22572282
* Drops items emitted by an Observable that are followed by newer items
22582283
* before a timeout value expires. The timer resets on each emission.
@@ -5489,6 +5514,148 @@ public Observable<T> takeLast(final int count) {
54895514
return create(OperationTakeLast.takeLast(this, count));
54905515
}
54915516

5517+
/**
5518+
* Return an Observable which contains the items from this observable which
5519+
* were emitted not before this completed minus a time window.
5520+
*
5521+
* @param time the length of the time window, relative to the completion of this
5522+
* observable.
5523+
* @param unit the time unit
5524+
* @return an Observable which contains the items from this observable which
5525+
* were emitted not before this completed minus a time window.
5526+
*/
5527+
public Observable<T> takeLast(long time, TimeUnit unit) {
5528+
return takeLast(time, unit, Schedulers.threadPoolForComputation());
5529+
}
5530+
5531+
/**
5532+
* Return an Observable which contains the items from this observable which
5533+
* were emitted not before this completed minus a time window, where the timing
5534+
* information is provided by the given scheduler.
5535+
*
5536+
* @param time the length of the time window, relative to the completion of this
5537+
* observable.
5538+
* @param unit the time unit
5539+
* @param scheduler the scheduler which provides the timestamps for the observed
5540+
* elements
5541+
* @return an Observable which contains the items from this observable which
5542+
* were emitted not before this completed minus a time window, where the timing
5543+
* information is provided by the given scheduler
5544+
*/
5545+
public Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler) {
5546+
return create(OperationTakeLast.takeLast(this, time, unit, scheduler));
5547+
}
5548+
5549+
/**
5550+
* Return an Observable which contains at most count items from this Observable
5551+
* which were emitted not before this completed minus a time window.
5552+
*
5553+
* @param count the maximum number of items to return
5554+
* @param time the length of the time window, relative to the completion of this
5555+
* observable.
5556+
* @param unit the time unit
5557+
* @return Return an Observable which contains at most count items from this Observable
5558+
* which were emitted not before this completed minus a time window.
5559+
*/
5560+
public Observable<T> takeLast(int count, long time, TimeUnit unit) {
5561+
return takeLast(count, time, unit, Schedulers.threadPoolForComputation());
5562+
}
5563+
5564+
/**
5565+
* Return an Observable which contains at most count items from this Observable
5566+
* which were emitted not before this completed minus a time window, where the timing
5567+
* information is provided by the given scheduler.
5568+
*
5569+
* @param count the maximum number of items to return
5570+
* @param time the length of the time window, relative to the completion of this
5571+
* observable.
5572+
* @param unit the time unit
5573+
* @param scheduler the scheduler which provides the timestamps for the observed
5574+
* elements
5575+
* @return Return an Observable which contains at most count items from this Observable
5576+
* which were emitted not before this completed minus a time window, where the timing
5577+
* information is provided by the given scheduler
5578+
*/
5579+
public Observable<T> takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) {
5580+
if (count < 0) {
5581+
throw new IllegalArgumentException("count >= 0 required");
5582+
}
5583+
return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler));
5584+
}
5585+
5586+
/**
5587+
* Return an Observable which emits single List containing the last count
5588+
* elements from this Observable.
5589+
*
5590+
* @param count the number of items to take last
5591+
* @return an Observable which emits single list containing the last count
5592+
* elements from this Observable.
5593+
*/
5594+
public Observable<List<T>> takeLastBuffer(int count) {
5595+
return takeLast(count).toList();
5596+
}
5597+
5598+
/**
5599+
* Return an Observable which emits single List containing items which
5600+
* were emitted not before this completed minus a time window.
5601+
* @param time the length of the time window, relative to the completion of this
5602+
* observable.
5603+
* @param unit the time unit
5604+
* @return an Observable which emits single list containing items which
5605+
* were emitted not before this completed minus a time window
5606+
*/
5607+
public Observable<List<T>> takeLastBuffer(long time, TimeUnit unit) {
5608+
return takeLast(time, unit).toList();
5609+
}
5610+
5611+
/**
5612+
* Return an Observable which emits single List containing items which
5613+
* were emitted not before this completed minus a time window, where the timing
5614+
* information is provided by the given scheduler.
5615+
* @param time the length of the time window, relative to the completion of this
5616+
* observable.
5617+
* @param unit the time unit
5618+
* @param scheduler the scheduler which provides the timestamps for the observed
5619+
* elements
5620+
* @return an Observable which emits single list containing items which
5621+
* were emitted not before this completed minus a time window, where the timing
5622+
* information is provided by the given scheduler
5623+
*/
5624+
public Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) {
5625+
return takeLast(time, unit, scheduler).toList();
5626+
}
5627+
5628+
/**
5629+
* Return an Observable which emits a single List containing at most count items
5630+
* from this Observable which were emitted not before this completed minus a time window.
5631+
* @param count the number of items to take last
5632+
* @param time the length of the time window, relative to the completion of this
5633+
* observable.
5634+
* @param unit the time unit
5635+
* @return an Observable which emits a single List containing at most count items
5636+
* from this Observable which were emitted not before this completed minus a time window.
5637+
*/
5638+
public Observable<List<T>> takeLastBuffer(int count, long time, TimeUnit unit) {
5639+
return takeLast(count, time, unit).toList();
5640+
}
5641+
5642+
/**
5643+
* Return an Observable which emits a single List containing at most count items
5644+
* from this Observable which were emitted not before this completed minus a time window.
5645+
* @param count the number of items to take last
5646+
* @param time the length of the time window, relative to the completion of this
5647+
* observable.
5648+
* @param unit the time unit
5649+
* @param scheduler the scheduler which provides the timestamps for the observed
5650+
* elements
5651+
* @return an Observable which emits a single List containing at most count items
5652+
* from this Observable which were emitted not before this completed minus a time window.
5653+
*/
5654+
public Observable<List<T>> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) {
5655+
return takeLast(count, time, unit, scheduler).toList();
5656+
}
5657+
5658+
54925659
/**
54935660
* Returns an Observable that emits the items from the source Observable
54945661
* only until the <code>other</code> Observable emits an item.

rxjava-core/src/main/java/rx/operators/OperationDelay.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818
import java.util.concurrent.TimeUnit;
1919

2020
import rx.Observable;
21+
import rx.Observable.OnSubscribeFunc;
22+
import rx.Observer;
2123
import rx.Scheduler;
24+
import rx.Subscription;
2225
import rx.observables.ConnectableObservable;
26+
import rx.subscriptions.SerialSubscription;
27+
import rx.util.functions.Action0;
2328
import rx.util.functions.Func1;
2429

2530
public final class OperationDelay {
@@ -40,4 +45,41 @@ public T call(Long ignored) {
4045
});
4146
return Observable.concat(seqs);
4247
}
48+
49+
/**
50+
* Delays the subscription to the source by the given amount, running on the given scheduler.
51+
*/
52+
public static <T> OnSubscribeFunc<T> delaySubscription(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
53+
return new DelaySubscribeFunc<T>(source, time, unit, scheduler);
54+
}
55+
56+
/** Subscribe function which schedules the actual subscription to source on a scheduler at a later time. */
57+
private static final class DelaySubscribeFunc<T> implements OnSubscribeFunc<T> {
58+
final Observable<? extends T> source;
59+
final Scheduler scheduler;
60+
final long time;
61+
final TimeUnit unit;
62+
63+
public DelaySubscribeFunc(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
64+
this.source = source;
65+
this.scheduler = scheduler;
66+
this.time = time;
67+
this.unit = unit;
68+
}
69+
@Override
70+
public Subscription onSubscribe(final Observer<? super T> t1) {
71+
final SerialSubscription ssub = new SerialSubscription();
72+
73+
ssub.setSubscription(scheduler.schedule(new Action0() {
74+
@Override
75+
public void call() {
76+
if (!ssub.isUnsubscribed()) {
77+
ssub.setSubscription(source.subscribe(t1));
78+
}
79+
}
80+
}, time, unit));
81+
82+
return ssub;
83+
}
84+
}
4385
}

rxjava-core/src/main/java/rx/operators/OperationTakeLast.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
import java.util.Deque;
1919
import java.util.LinkedList;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.locks.ReentrantLock;
2122

2223
import rx.Observable;
2324
import rx.Observable.OnSubscribeFunc;
2425
import rx.Observer;
26+
import rx.Scheduler;
2527
import rx.Subscription;
28+
import rx.subscriptions.SingleAssignmentSubscription;
29+
import rx.util.Timestamped;
2630

2731
/**
2832
* Returns an Observable that emits the last <code>count</code> items emitted by the source
@@ -119,4 +123,121 @@ public void onNext(T value) {
119123
}
120124

121125
}
126+
127+
/**
128+
* Returns the items emitted by source whose arrived in the time window
129+
* before the source completed.
130+
*/
131+
public static <T> OnSubscribeFunc<T> takeLast(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
132+
return new TakeLastTimed<T>(source, -1, time, unit, scheduler);
133+
}
134+
135+
/**
136+
* Returns the items emitted by source whose arrived in the time window
137+
* before the source completed and at most count values.
138+
*/
139+
public static <T> OnSubscribeFunc<T> takeLast(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
140+
return new TakeLastTimed<T>(source, count, time, unit, scheduler);
141+
}
142+
143+
/** Take only the values which appeared some time before the completion. */
144+
static final class TakeLastTimed<T> implements OnSubscribeFunc<T> {
145+
final Observable<? extends T> source;
146+
final long ageMillis;
147+
final Scheduler scheduler;
148+
final int count;
149+
150+
public TakeLastTimed(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
151+
this.source = source;
152+
this.ageMillis = unit.toMillis(time);
153+
this.scheduler = scheduler;
154+
this.count = count;
155+
}
156+
157+
@Override
158+
public Subscription onSubscribe(Observer<? super T> t1) {
159+
SingleAssignmentSubscription sas = new SingleAssignmentSubscription();
160+
sas.set(source.subscribe(new TakeLastTimedObserver<T>(t1, sas, count, ageMillis, scheduler)));
161+
return sas;
162+
}
163+
}
164+
/** Observes source values and keeps the most recent items. */
165+
static final class TakeLastTimedObserver<T> implements Observer<T> {
166+
final Observer<? super T> observer;
167+
final Subscription cancel;
168+
final long ageMillis;
169+
final Scheduler scheduler;
170+
/** -1 indicates unlimited buffer. */
171+
final int count;
172+
173+
final Deque<Timestamped<T>> buffer = new LinkedList<Timestamped<T>>();
174+
175+
public TakeLastTimedObserver(Observer<? super T> observer, Subscription cancel,
176+
int count, long ageMillis, Scheduler scheduler) {
177+
this.observer = observer;
178+
this.cancel = cancel;
179+
this.ageMillis = ageMillis;
180+
this.scheduler = scheduler;
181+
this.count = count;
182+
}
183+
184+
protected void runEvictionPolicy(long now) {
185+
// trim size
186+
while (count >= 0 && buffer.size() > count) {
187+
buffer.pollFirst();
188+
}
189+
// remove old entries
190+
while (!buffer.isEmpty()) {
191+
Timestamped<T> v = buffer.peekFirst();
192+
if (v.getTimestampMillis() < now - ageMillis) {
193+
buffer.pollFirst();
194+
} else {
195+
break;
196+
}
197+
}
198+
}
199+
200+
@Override
201+
public void onNext(T args) {
202+
long t = scheduler.now();
203+
buffer.add(new Timestamped<T>(t, args));
204+
runEvictionPolicy(t);
205+
}
206+
207+
@Override
208+
public void onError(Throwable e) {
209+
buffer.clear();
210+
observer.onError(e);
211+
cancel.unsubscribe();
212+
}
213+
214+
/**
215+
* Emit the contents of the buffer.
216+
* @return true if no exception was raised in the process
217+
*/
218+
protected boolean emitBuffer() {
219+
for (Timestamped<T> v : buffer) {
220+
try {
221+
observer.onNext(v.getValue());
222+
} catch (Throwable t) {
223+
buffer.clear();
224+
observer.onError(t);
225+
return false;
226+
}
227+
}
228+
buffer.clear();
229+
return true;
230+
}
231+
232+
@Override
233+
public void onCompleted() {
234+
runEvictionPolicy(scheduler.now());
235+
236+
if (emitBuffer()) {
237+
observer.onCompleted();
238+
}
239+
cancel.unsubscribe();
240+
}
241+
242+
}
122243
}

0 commit comments

Comments
 (0)