Skip to content

Commit fa1b0eb

Browse files
committed
Operators DelaySubscription, TakeLast w/ time, TakeLastBuffer
1 parent a252dca commit fa1b0eb

File tree

5 files changed

+539
-1
lines changed

5 files changed

+539
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2123,6 +2123,31 @@ public Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
21232123
return OperationDelay.delay(this, delay, unit, scheduler);
21242124
}
21252125

2126+
/**
2127+
* Return an Observable which delays the subscription to this Observable sequence
2128+
* by the given amount.
2129+
* @param delay the time to delay the subscription
2130+
* @param unit the time unit
2131+
* @return an Observable which delays the subscription to this Observable sequence
2132+
* by the given amount.
2133+
*/
2134+
public Observable<T> delaySubscription(long delay, TimeUnit unit) {
2135+
return delaySubscription(delay, unit, Schedulers.threadPoolForComputation());
2136+
}
2137+
2138+
/**
2139+
* Return an Observable which delays the subscription to this Observable sequence
2140+
* by the given amount, waiting and subscribing on the given scheduler.
2141+
* @param delay the time to delay the subscription
2142+
* @param unit the time unit
2143+
* @param scheduler the scheduler where the waiting and subscription will happen
2144+
* @return an Observable which delays the subscription to this Observable sequence
2145+
* by the given amount, waiting and subscribing on the given scheduler
2146+
*/
2147+
public Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
2148+
return create(OperationDelay.delaySubscription(this, delay, unit, scheduler));
2149+
}
2150+
21262151
/**
21272152
* Drops items emitted by an Observable that are followed by newer items
21282153
* before a timeout value expires. The timer resets on each emission.
@@ -5289,6 +5314,148 @@ public Observable<T> takeLast(final int count) {
52895314
return create(OperationTakeLast.takeLast(this, count));
52905315
}
52915316

5317+
/**
5318+
* Return an Observable which contains the items from this observable which
5319+
* were emitted not before this completed minus a time window.
5320+
*
5321+
* @param time the length of the time window, relative to the completion of this
5322+
* observable.
5323+
* @param unit the time unit
5324+
* @return an Observable which contains the items from this observable which
5325+
* were emitted not before this completed minus a time window.
5326+
*/
5327+
public Observable<T> takeLast(long time, TimeUnit unit) {
5328+
return takeLast(time, unit, Schedulers.threadPoolForComputation());
5329+
}
5330+
5331+
/**
5332+
* Return an Observable which contains the items from this observable which
5333+
* were emitted not before this completed minus a time window, where the timing
5334+
* information is provided by the given scheduler.
5335+
*
5336+
* @param time the length of the time window, relative to the completion of this
5337+
* observable.
5338+
* @param unit the time unit
5339+
* @param scheduler the scheduler which provides the timestamps for the observed
5340+
* elements
5341+
* @return an Observable which contains the items from this observable which
5342+
* were emitted not before this completed minus a time window, where the timing
5343+
* information is provided by the given scheduler
5344+
*/
5345+
public Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler) {
5346+
return create(OperationTakeLast.takeLast(this, time, unit, scheduler));
5347+
}
5348+
5349+
/**
5350+
* Return an Observable which contains at most count items from this Observable
5351+
* which were emitted not before this completed minus a time window.
5352+
*
5353+
* @param count the maximum number of items to return
5354+
* @param time the length of the time window, relative to the completion of this
5355+
* observable.
5356+
* @param unit the time unit
5357+
* @return Return an Observable which contains at most count items from this Observable
5358+
* which were emitted not before this completed minus a time window.
5359+
*/
5360+
public Observable<T> takeLast(int count, long time, TimeUnit unit) {
5361+
return takeLast(count, time, unit, Schedulers.threadPoolForComputation());
5362+
}
5363+
5364+
/**
5365+
* Return an Observable which contains at most count items from this Observable
5366+
* which were emitted not before this completed minus a time window, where the timing
5367+
* information is provided by the given scheduler.
5368+
*
5369+
* @param count the maximum number of items to return
5370+
* @param time the length of the time window, relative to the completion of this
5371+
* observable.
5372+
* @param unit the time unit
5373+
* @param scheduler the scheduler which provides the timestamps for the observed
5374+
* elements
5375+
* @return Return an Observable which contains at most count items from this Observable
5376+
* which were emitted not before this completed minus a time window, where the timing
5377+
* information is provided by the given scheduler
5378+
*/
5379+
public Observable<T> takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) {
5380+
if (count < 0) {
5381+
throw new IllegalArgumentException("count >= 0 required");
5382+
}
5383+
return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler));
5384+
}
5385+
5386+
/**
5387+
* Return an Observable which emits single List containing the last count
5388+
* elements from this Observable.
5389+
*
5390+
* @param count the number of items to take last
5391+
* @return an Observable which emits single list containing the last count
5392+
* elements from this Observable.
5393+
*/
5394+
public Observable<List<T>> takeLastBuffer(int count) {
5395+
return takeLast(count).toList();
5396+
}
5397+
5398+
/**
5399+
* Return an Observable which emits single List containing items which
5400+
* were emitted not before this completed minus a time window.
5401+
* @param time the length of the time window, relative to the completion of this
5402+
* observable.
5403+
* @param unit the time unit
5404+
* @return an Observable which emits single list containing items which
5405+
* were emitted not before this completed minus a time window
5406+
*/
5407+
public Observable<List<T>> takeLastBuffer(long time, TimeUnit unit) {
5408+
return takeLast(time, unit).toList();
5409+
}
5410+
5411+
/**
5412+
* Return an Observable which emits single List containing items which
5413+
* were emitted not before this completed minus a time window, where the timing
5414+
* information is provided by the given scheduler.
5415+
* @param time the length of the time window, relative to the completion of this
5416+
* observable.
5417+
* @param unit the time unit
5418+
* @param scheduler the scheduler which provides the timestamps for the observed
5419+
* elements
5420+
* @return an Observable which emits single list containing items which
5421+
* were emitted not before this completed minus a time window, where the timing
5422+
* information is provided by the given scheduler
5423+
*/
5424+
public Observable<List<T>> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) {
5425+
return takeLast(time, unit, scheduler).toList();
5426+
}
5427+
5428+
/**
5429+
* Return an Observable which emits a single List containing at most count items
5430+
* from this Observable which were emitted not before this completed minus a time window.
5431+
* @param count the number of items to take last
5432+
* @param time the length of the time window, relative to the completion of this
5433+
* observable.
5434+
* @param unit the time unit
5435+
* @return an Observable which emits a single List containing at most count items
5436+
* from this Observable which were emitted not before this completed minus a time window.
5437+
*/
5438+
public Observable<List<T>> takeLastBuffer(int count, long time, TimeUnit unit) {
5439+
return takeLast(count, time, unit).toList();
5440+
}
5441+
5442+
/**
5443+
* Return an Observable which emits a single List containing at most count items
5444+
* from this Observable which were emitted not before this completed minus a time window.
5445+
* @param count the number of items to take last
5446+
* @param time the length of the time window, relative to the completion of this
5447+
* observable.
5448+
* @param unit the time unit
5449+
* @param scheduler the scheduler which provides the timestamps for the observed
5450+
* elements
5451+
* @return an Observable which emits a single List containing at most count items
5452+
* from this Observable which were emitted not before this completed minus a time window.
5453+
*/
5454+
public Observable<List<T>> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) {
5455+
return takeLast(count, time, unit, scheduler).toList();
5456+
}
5457+
5458+
52925459
/**
52935460
* Returns an Observable that emits the items from the source Observable
52945461
* only until the <code>other</code> Observable emits an item.

rxjava-core/src/main/java/rx/operators/OperationDelay.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818
import java.util.concurrent.TimeUnit;
1919

2020
import rx.Observable;
21+
import rx.Observable.OnSubscribeFunc;
22+
import rx.Observer;
2123
import rx.Scheduler;
24+
import rx.Subscription;
2225
import rx.observables.ConnectableObservable;
26+
import rx.subscriptions.SerialSubscription;
27+
import rx.util.functions.Action0;
2328
import rx.util.functions.Func1;
2429

2530
public final class OperationDelay {
@@ -40,4 +45,41 @@ public T call(Long ignored) {
4045
});
4146
return Observable.concat(seqs);
4247
}
48+
49+
/**
50+
* Delays the subscription to the source by the given amount, running on the given scheduler.
51+
*/
52+
public static <T> OnSubscribeFunc<T> delaySubscription(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
53+
return new DelaySubscribeFunc<T>(source, time, unit, scheduler);
54+
}
55+
56+
/** Subscribe function which schedules the actual subscription to source on a scheduler at a later time. */
57+
private static final class DelaySubscribeFunc<T> implements OnSubscribeFunc<T> {
58+
final Observable<? extends T> source;
59+
final Scheduler scheduler;
60+
final long time;
61+
final TimeUnit unit;
62+
63+
public DelaySubscribeFunc(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
64+
this.source = source;
65+
this.scheduler = scheduler;
66+
this.time = time;
67+
this.unit = unit;
68+
}
69+
@Override
70+
public Subscription onSubscribe(final Observer<? super T> t1) {
71+
final SerialSubscription ssub = new SerialSubscription();
72+
73+
ssub.setSubscription(scheduler.schedule(new Action0() {
74+
@Override
75+
public void call() {
76+
if (!ssub.isUnsubscribed()) {
77+
ssub.setSubscription(source.subscribe(t1));
78+
}
79+
}
80+
}, time, unit));
81+
82+
return ssub;
83+
}
84+
}
4385
}

rxjava-core/src/main/java/rx/operators/OperationTakeLast.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
import java.util.Deque;
1919
import java.util.LinkedList;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.locks.ReentrantLock;
2122

2223
import rx.Observable;
2324
import rx.Observable.OnSubscribeFunc;
2425
import rx.Observer;
26+
import rx.Scheduler;
2527
import rx.Subscription;
28+
import rx.subscriptions.SingleAssignmentSubscription;
29+
import rx.util.Timestamped;
2630

2731
/**
2832
* Returns an Observable that emits the last <code>count</code> items emitted by the source
@@ -119,4 +123,121 @@ public void onNext(T value) {
119123
}
120124

121125
}
126+
127+
/**
128+
* Returns the items emitted by source whose arrived in the time window
129+
* before the source completed.
130+
*/
131+
public static <T> OnSubscribeFunc<T> takeLast(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
132+
return new TakeLastTimed<T>(source, -1, time, unit, scheduler);
133+
}
134+
135+
/**
136+
* Returns the items emitted by source whose arrived in the time window
137+
* before the source completed and at most count values.
138+
*/
139+
public static <T> OnSubscribeFunc<T> takeLast(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
140+
return new TakeLastTimed<T>(source, count, time, unit, scheduler);
141+
}
142+
143+
/** Take only the values which appeared some time before the completion. */
144+
static final class TakeLastTimed<T> implements OnSubscribeFunc<T> {
145+
final Observable<? extends T> source;
146+
final long ageMillis;
147+
final Scheduler scheduler;
148+
final int count;
149+
150+
public TakeLastTimed(Observable<? extends T> source, int count, long time, TimeUnit unit, Scheduler scheduler) {
151+
this.source = source;
152+
this.ageMillis = unit.toMillis(time);
153+
this.scheduler = scheduler;
154+
this.count = count;
155+
}
156+
157+
@Override
158+
public Subscription onSubscribe(Observer<? super T> t1) {
159+
SingleAssignmentSubscription sas = new SingleAssignmentSubscription();
160+
sas.set(source.subscribe(new TakeLastTimedObserver<T>(t1, sas, count, ageMillis, scheduler)));
161+
return sas;
162+
}
163+
}
164+
/** Observes source values and keeps the most recent items. */
165+
static final class TakeLastTimedObserver<T> implements Observer<T> {
166+
final Observer<? super T> observer;
167+
final Subscription cancel;
168+
final long ageMillis;
169+
final Scheduler scheduler;
170+
/** -1 indicates unlimited buffer. */
171+
final int count;
172+
173+
final Deque<Timestamped<T>> buffer = new LinkedList<Timestamped<T>>();
174+
175+
public TakeLastTimedObserver(Observer<? super T> observer, Subscription cancel,
176+
int count, long ageMillis, Scheduler scheduler) {
177+
this.observer = observer;
178+
this.cancel = cancel;
179+
this.ageMillis = ageMillis;
180+
this.scheduler = scheduler;
181+
this.count = count;
182+
}
183+
184+
protected void runEvictionPolicy(long now) {
185+
// trim size
186+
while (count >= 0 && buffer.size() > count) {
187+
buffer.pollFirst();
188+
}
189+
// remove old entries
190+
while (!buffer.isEmpty()) {
191+
Timestamped<T> v = buffer.peekFirst();
192+
if (v.getTimestampMillis() < now - ageMillis) {
193+
buffer.pollFirst();
194+
} else {
195+
break;
196+
}
197+
}
198+
}
199+
200+
@Override
201+
public void onNext(T args) {
202+
long t = scheduler.now();
203+
buffer.add(new Timestamped<T>(t, args));
204+
runEvictionPolicy(t);
205+
}
206+
207+
@Override
208+
public void onError(Throwable e) {
209+
buffer.clear();
210+
observer.onError(e);
211+
cancel.unsubscribe();
212+
}
213+
214+
/**
215+
* Emit the contents of the buffer.
216+
* @return true if no exception was raised in the process
217+
*/
218+
protected boolean emitBuffer() {
219+
for (Timestamped<T> v : buffer) {
220+
try {
221+
observer.onNext(v.getValue());
222+
} catch (Throwable t) {
223+
buffer.clear();
224+
observer.onError(t);
225+
return false;
226+
}
227+
}
228+
buffer.clear();
229+
return true;
230+
}
231+
232+
@Override
233+
public void onCompleted() {
234+
runEvictionPolicy(scheduler.now());
235+
236+
if (emitBuffer()) {
237+
observer.onCompleted();
238+
}
239+
cancel.unsubscribe();
240+
}
241+
242+
}
122243
}

0 commit comments

Comments
 (0)