Skip to content

Commit 40e851b

Browse files
committed
Delay with subscription and item delaying observables.
1 parent fe438ca commit 40e851b

File tree

3 files changed

+525
-0
lines changed

3 files changed

+525
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2167,6 +2167,50 @@ public static Observable<Long> timer(long initialDelay, long period, TimeUnit un
21672167
return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler));
21682168
}
21692169

2170+
/**
2171+
* Create an Observable which delays the events via another Observable on a per item-basis.
2172+
* <p>
2173+
* Note: onError or onCompleted events are immediately propagated.
2174+
* <p>
2175+
* Note: if the Observable returned by the {@code itemDelay} just completes, that
2176+
* particular source item is not emitted.
2177+
*
2178+
* @param <U> the item delay value type (ignored)
2179+
* @param itemDelay function that returns an Observable for each source item which is
2180+
* then used for delaying that particular item until the Observable
2181+
* fires its first onNext event.
2182+
* @return an Observable which delays the events via another Observable on a per item-basis.
2183+
*/
2184+
public <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) {
2185+
return create(OperationDelay.delay(this, itemDelay));
2186+
}
2187+
/**
2188+
* Create an Observable which delays the subscription and events via another Observables on a per item-basis.
2189+
* <p>
2190+
* Note: onError or onCompleted events are immediately propagated.
2191+
* <p>
2192+
* Note: if the Observable returned by the {@code itemDelay} just completes, that
2193+
* particular source item is not emitted.
2194+
* <p>
2195+
* Note: if the {@code subscriptionDelay}'s Observable just completes, the created
2196+
* observable will just complete as well.
2197+
*
2198+
* @param <U> the subscription delay value type (ignored)
2199+
* @param <V> the item delay value type (ignored)
2200+
* @param subscriptionDelay function that returns an Observable which will trigger
2201+
* the subscription to the source observable once it fires an
2202+
* onNext event.
2203+
* @param itemDelay function that returns an Observable for each source item which is
2204+
* then used for delaying that particular item until the Observable
2205+
* fires its first onNext event.
2206+
* @return an Observable which delays the events via another Observable on a per item-basis.
2207+
*/
2208+
public <U, V> Observable<T> delay(
2209+
Func0<? extends Observable<U>> subscriptionDelay,
2210+
Func1<? super T, ? extends Observable<V>> itemDelay) {
2211+
return create(OperationDelay.delay(this, subscriptionDelay, itemDelay));
2212+
}
2213+
21702214
/**
21712215
* Returns an Observable that emits the items emitted by the source
21722216
* Observable shifted forward in time by a specified delay. Error

rxjava-core/src/main/java/rx/operators/OperationDelay.java

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import rx.Scheduler;
2424
import rx.Subscription;
2525
import rx.observables.ConnectableObservable;
26+
import rx.subscriptions.CompositeSubscription;
2627
import rx.subscriptions.SerialSubscription;
28+
import rx.subscriptions.Subscriptions;
2729
import rx.util.functions.Action0;
30+
import rx.util.functions.Func0;
2831
import rx.util.functions.Func1;
2932

3033
public final class OperationDelay {
@@ -82,4 +85,196 @@ public void call() {
8285
return ssub;
8386
}
8487
}
88+
/**
89+
* Delay the emission of the source items by a per-item observable that fires its first element.
90+
*/
91+
public static <T, U> OnSubscribeFunc<T> delay(Observable<? extends T> source,
92+
Func1<? super T, ? extends Observable<U>> itemDelay) {
93+
return new DelayViaObservable<T, Object, U>(source, null, itemDelay);
94+
}
95+
/**
96+
* Delay the subscription and emission of the source items by a per-item observable that fires its first element.
97+
*/
98+
public static <T, U, V> OnSubscribeFunc<T> delay(Observable<? extends T> source,
99+
Func0<? extends Observable<U>> subscriptionDelay,
100+
Func1<? super T, ? extends Observable<V>> itemDelay) {
101+
return new DelayViaObservable<T, U, V>(source, subscriptionDelay, itemDelay);
102+
}
103+
/**
104+
* Delay the emission of the source items by a per-item observable that fires its first element.
105+
*/
106+
private static final class DelayViaObservable<T, U, V> implements OnSubscribeFunc<T> {
107+
final Observable<? extends T> source;
108+
final Func0<? extends Observable<U>> subscriptionDelay;
109+
final Func1<? super T, ? extends Observable<V>> itemDelay;
110+
111+
public DelayViaObservable(Observable<? extends T> source,
112+
Func0<? extends Observable<U>> subscriptionDelay,
113+
Func1<? super T, ? extends Observable<V>> itemDelay) {
114+
this.source = source;
115+
this.subscriptionDelay = subscriptionDelay;
116+
this.itemDelay = itemDelay;
117+
}
118+
119+
@Override
120+
public Subscription onSubscribe(Observer<? super T> t1) {
121+
CompositeSubscription csub = new CompositeSubscription();
122+
123+
if (subscriptionDelay == null) {
124+
csub.add(source.subscribe(new SourceObserver<T, V>(t1, itemDelay, csub)));
125+
} else {
126+
Observable<U> subscriptionSource;
127+
try {
128+
subscriptionSource = subscriptionDelay.call();
129+
} catch (Throwable t) {
130+
t1.onError(t);
131+
return Subscriptions.empty();
132+
}
133+
SerialSubscription ssub = new SerialSubscription();
134+
csub.add(ssub);
135+
ssub.set(subscriptionSource.subscribe(new SubscribeDelay<T, U, V>(source, t1, itemDelay, csub, ssub)));
136+
}
137+
138+
return csub;
139+
}
140+
private static final class SubscribeDelay<T, U, V> implements Observer<U> {
141+
final Observable<? extends T> source;
142+
final Observer<? super T> observer;
143+
final Func1<? super T, ? extends Observable<V>> itemDelay;
144+
final CompositeSubscription csub;
145+
final Subscription self;
146+
/** Prevent any onError and onCompleted once the first item was delivered. */
147+
boolean subscribed;
148+
149+
public SubscribeDelay(Observable<? extends T> source, Observer<? super T> observer, Func1<? super T, ? extends Observable<V>> itemDelay,
150+
CompositeSubscription csub, Subscription self) {
151+
this.source = source;
152+
this.observer = observer;
153+
this.itemDelay = itemDelay;
154+
this.csub = csub;
155+
this.self = self;
156+
}
157+
158+
@Override
159+
public void onNext(U args) {
160+
subscribed = true;
161+
csub.remove(self);
162+
csub.add(source.subscribe(new SourceObserver<T, V>(observer, itemDelay, csub)));
163+
}
164+
165+
@Override
166+
public void onError(Throwable e) {
167+
if (!subscribed) {
168+
observer.onError(e);
169+
csub.unsubscribe();
170+
}
171+
}
172+
173+
@Override
174+
public void onCompleted() {
175+
if (!subscribed) {
176+
observer.onCompleted();
177+
csub.unsubscribe();
178+
}
179+
}
180+
}
181+
/** The source observer. */
182+
private static final class SourceObserver<T, U> implements Observer<T> {
183+
final Observer<? super T> observer;
184+
final Func1<? super T, ? extends Observable<U>> itemDelay;
185+
final CompositeSubscription csub;
186+
/** Guard to avoid overlapping events from the various sources. */
187+
final Object guard;
188+
boolean done;
189+
190+
public SourceObserver(Observer<? super T> observer, Func1<? super T, ? extends Observable<U>> itemDelay, CompositeSubscription csub) {
191+
this.observer = observer;
192+
this.itemDelay = itemDelay;
193+
this.csub = csub;
194+
this.guard = new Object();
195+
}
196+
197+
@Override
198+
public void onNext(T args) {
199+
Observable<U> delayer;
200+
try {
201+
delayer = itemDelay.call(args);
202+
} catch (Throwable t) {
203+
onError(t);
204+
return;
205+
}
206+
SerialSubscription ssub = new SerialSubscription();
207+
csub.add(ssub);
208+
209+
ssub.set(delayer.subscribe(new DelayObserver<T, U>(args, this, ssub)));
210+
}
211+
212+
@Override
213+
public void onError(Throwable e) {
214+
synchronized (guard) {
215+
if (done) {
216+
return;
217+
}
218+
done = true;
219+
observer.onError(e);
220+
}
221+
csub.unsubscribe();
222+
}
223+
224+
@Override
225+
public void onCompleted() {
226+
synchronized (guard) {
227+
if (done) {
228+
return;
229+
}
230+
done = true;
231+
observer.onCompleted();
232+
}
233+
csub.unsubscribe();
234+
}
235+
236+
public void emit(T value, Subscription token) {
237+
synchronized (guard) {
238+
if (done) {
239+
return;
240+
}
241+
observer.onNext(value);
242+
}
243+
remove(token);
244+
}
245+
public void remove(Subscription token) {
246+
csub.remove(token);
247+
}
248+
}
249+
/**
250+
* Delay observer.
251+
*/
252+
private static final class DelayObserver<T, U> implements Observer<U> {
253+
final T value;
254+
final SourceObserver<T, U> parent;
255+
final Subscription token;
256+
257+
public DelayObserver(T value, SourceObserver<T, U> parent, Subscription token) {
258+
this.value = value;
259+
this.parent = parent;
260+
this.token = token;
261+
}
262+
263+
@Override
264+
public void onNext(U args) {
265+
parent.emit(value, token);
266+
}
267+
268+
@Override
269+
public void onError(Throwable e) {
270+
parent.onError(e);
271+
}
272+
273+
@Override
274+
public void onCompleted() {
275+
parent.remove(token);
276+
}
277+
278+
}
279+
}
85280
}

0 commit comments

Comments
 (0)