Skip to content

Commit 337efeb

Browse files
committed
Modified to conform Rx.NET
1 parent 40e851b commit 337efeb

File tree

3 files changed

+117
-87
lines changed

3 files changed

+117
-87
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2170,10 +2170,7 @@ public static Observable<Long> timer(long initialDelay, long period, TimeUnit un
21702170
/**
21712171
* Create an Observable which delays the events via another Observable on a per item-basis.
21722172
* <p>
2173-
* Note: onError or onCompleted events are immediately propagated.
2174-
* <p>
2175-
* Note: if the Observable returned by the {@code itemDelay} just completes, that
2176-
* particular source item is not emitted.
2173+
* Note: onError event is immediately propagated.
21772174
*
21782175
* @param <U> the item delay value type (ignored)
21792176
* @param itemDelay function that returns an Observable for each source item which is
@@ -2187,13 +2184,7 @@ public <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDel
21872184
/**
21882185
* Create an Observable which delays the subscription and events via another Observables on a per item-basis.
21892186
* <p>
2190-
* Note: onError or onCompleted events are immediately propagated.
2191-
* <p>
2192-
* Note: if the Observable returned by the {@code itemDelay} just completes, that
2193-
* particular source item is not emitted.
2194-
* <p>
2195-
* Note: if the {@code subscriptionDelay}'s Observable just completes, the created
2196-
* observable will just complete as well.
2187+
* Note: onError event is immediately propagated.
21972188
*
21982189
* @param <U> the subscription delay value type (ignored)
21992190
* @param <V> the item delay value type (ignored)

rxjava-core/src/main/java/rx/operators/OperationDelay.java

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ public DelayViaObservable(Observable<? extends T> source,
120120
public Subscription onSubscribe(Observer<? super T> t1) {
121121
CompositeSubscription csub = new CompositeSubscription();
122122

123+
SerialSubscription sosub = new SerialSubscription();
124+
csub.add(sosub);
125+
SourceObserver<T, V> so = new SourceObserver<T, V>(t1, itemDelay, csub, sosub);
123126
if (subscriptionDelay == null) {
124-
csub.add(source.subscribe(new SourceObserver<T, V>(t1, itemDelay, csub)));
127+
sosub.set(source.subscribe(so));
125128
} else {
126129
Observable<U> subscriptionSource;
127130
try {
@@ -132,66 +135,70 @@ public Subscription onSubscribe(Observer<? super T> t1) {
132135
}
133136
SerialSubscription ssub = new SerialSubscription();
134137
csub.add(ssub);
135-
ssub.set(subscriptionSource.subscribe(new SubscribeDelay<T, U, V>(source, t1, itemDelay, csub, ssub)));
138+
ssub.set(subscriptionSource.subscribe(new SubscribeDelay<T, U, V>(source, so, csub, ssub)));
136139
}
137140

138141
return csub;
139142
}
143+
/** Subscribe delay observer. */
140144
private static final class SubscribeDelay<T, U, V> implements Observer<U> {
141145
final Observable<? extends T> source;
142-
final Observer<? super T> observer;
143-
final Func1<? super T, ? extends Observable<V>> itemDelay;
146+
final SourceObserver<T, V> so;
144147
final CompositeSubscription csub;
145148
final Subscription self;
146-
/** Prevent any onError and onCompleted once the first item was delivered. */
149+
/** Prevent any onError once the first item was delivered. */
147150
boolean subscribed;
148151

149-
public SubscribeDelay(Observable<? extends T> source, Observer<? super T> observer, Func1<? super T, ? extends Observable<V>> itemDelay,
152+
public SubscribeDelay(
153+
Observable<? extends T> source,
154+
SourceObserver<T, V> so,
150155
CompositeSubscription csub, Subscription self) {
151156
this.source = source;
152-
this.observer = observer;
153-
this.itemDelay = itemDelay;
157+
this.so = so;
154158
this.csub = csub;
155159
this.self = self;
156160
}
157161

158162
@Override
159163
public void onNext(U args) {
160-
subscribed = true;
161-
csub.remove(self);
162-
csub.add(source.subscribe(new SourceObserver<T, V>(observer, itemDelay, csub)));
164+
onCompleted();
163165
}
164166

165167
@Override
166168
public void onError(Throwable e) {
167169
if (!subscribed) {
168-
observer.onError(e);
170+
so.observer.onError(e);
169171
csub.unsubscribe();
170172
}
171173
}
172174

173175
@Override
174176
public void onCompleted() {
175-
if (!subscribed) {
176-
observer.onCompleted();
177-
csub.unsubscribe();
178-
}
177+
subscribed = true;
178+
csub.remove(self);
179+
so.self.set(source.subscribe(so));
179180
}
180181
}
181182
/** The source observer. */
182183
private static final class SourceObserver<T, U> implements Observer<T> {
183184
final Observer<? super T> observer;
184185
final Func1<? super T, ? extends Observable<U>> itemDelay;
185186
final CompositeSubscription csub;
187+
final SerialSubscription self;
186188
/** Guard to avoid overlapping events from the various sources. */
187189
final Object guard;
188190
boolean done;
191+
int wip;
189192

190-
public SourceObserver(Observer<? super T> observer, Func1<? super T, ? extends Observable<U>> itemDelay, CompositeSubscription csub) {
193+
public SourceObserver(Observer<? super T> observer,
194+
Func1<? super T, ? extends Observable<U>> itemDelay,
195+
CompositeSubscription csub,
196+
SerialSubscription self) {
191197
this.observer = observer;
192198
this.itemDelay = itemDelay;
193199
this.csub = csub;
194200
this.guard = new Object();
201+
this.self = self;
195202
}
196203

197204
@Override
@@ -203,47 +210,57 @@ public void onNext(T args) {
203210
onError(t);
204211
return;
205212
}
213+
214+
synchronized (guard) {
215+
wip++;
216+
}
217+
206218
SerialSubscription ssub = new SerialSubscription();
207219
csub.add(ssub);
208-
209220
ssub.set(delayer.subscribe(new DelayObserver<T, U>(args, this, ssub)));
210221
}
211222

212223
@Override
213224
public void onError(Throwable e) {
214225
synchronized (guard) {
215-
if (done) {
216-
return;
217-
}
218-
done = true;
219226
observer.onError(e);
220227
}
221228
csub.unsubscribe();
222229
}
223230

224231
@Override
225232
public void onCompleted() {
233+
boolean b;
226234
synchronized (guard) {
227-
if (done) {
228-
return;
229-
}
230235
done = true;
231-
observer.onCompleted();
236+
b = checkDone();
237+
}
238+
if (b) {
239+
csub.unsubscribe();
240+
} else {
241+
self.unsubscribe();
232242
}
233-
csub.unsubscribe();
234243
}
235244

236-
public void emit(T value, Subscription token) {
245+
void emit(T value, Subscription token) {
246+
boolean b;
237247
synchronized (guard) {
238-
if (done) {
239-
return;
240-
}
241248
observer.onNext(value);
249+
wip--;
250+
b = checkDone();
251+
}
252+
if (b) {
253+
csub.unsubscribe();
254+
} else {
255+
csub.remove(token);
242256
}
243-
remove(token);
244257
}
245-
public void remove(Subscription token) {
246-
csub.remove(token);
258+
boolean checkDone() {
259+
if (done && wip == 0) {
260+
observer.onCompleted();
261+
return true;
262+
}
263+
return false;
247264
}
248265
}
249266
/**
@@ -272,7 +289,7 @@ public void onError(Throwable e) {
272289

273290
@Override
274291
public void onCompleted() {
275-
parent.remove(token);
292+
parent.emit(value, token);
276293
}
277294

278295
}

rxjava-core/src/test/java/rx/operators/OperationDelayTest.java

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -276,47 +276,7 @@ public Observable<Integer> call(Integer t1) {
276276

277277
verify(o, never()).onError(any(Throwable.class));
278278
}
279-
@Test
280-
public void testDelayWithObservableSkipper1() {
281-
PublishSubject<Integer> source = PublishSubject.create();
282-
final List<PublishSubject<Integer>> delays = new ArrayList<PublishSubject<Integer>>();
283-
final int n = 10;
284-
for (int i = 0; i < n; i++) {
285-
PublishSubject<Integer> delay = PublishSubject.create();
286-
delays.add(delay);
287-
}
288-
289-
Func1<Integer, Observable<Integer>> delayFunc = new Func1<Integer, Observable<Integer>>() {
290-
@Override
291-
public Observable<Integer> call(Integer t1) {
292-
return delays.get(t1);
293-
}
294-
};
295-
296-
@SuppressWarnings("unchecked")
297-
Observer<Object> o = mock(Observer.class);
298-
InOrder inOrder = inOrder(o);
299-
300-
source.delay(delayFunc).subscribe(o);
301-
302-
303-
for (int i = 0; i < n; i++) {
304-
source.onNext(i);
305-
if (i % 2 == 0) {
306-
delays.get(i).onNext(i);
307-
inOrder.verify(o).onNext(i);
308-
} else {
309-
delays.get(i).onCompleted();
310-
inOrder.verify(o, never()).onNext(i);
311-
}
312-
}
313-
source.onCompleted();
314-
315-
inOrder.verify(o).onCompleted();
316-
inOrder.verifyNoMoreInteractions();
317-
318-
verify(o, never()).onError(any(Throwable.class));
319-
}
279+
320280
@Test
321281
public void testDelayWithObservableSingleSend1() {
322282
PublishSubject<Integer> source = PublishSubject.create();
@@ -521,4 +481,66 @@ public Observable<Integer> call(Integer t1) {
521481
verify(o, never()).onNext(any());
522482
verify(o, never()).onCompleted();
523483
}
484+
@Test
485+
public void testDelayWithObservableEmptyDelayer() {
486+
PublishSubject<Integer> source = PublishSubject.create();
487+
488+
Func1<Integer, Observable<Integer>> delayFunc = new Func1<Integer, Observable<Integer>>() {
489+
490+
@Override
491+
public Observable<Integer> call(Integer t1) {
492+
return Observable.empty();
493+
}
494+
};
495+
@SuppressWarnings("unchecked")
496+
Observer<Object> o = mock(Observer.class);
497+
InOrder inOrder = inOrder(o);
498+
499+
source.delay(delayFunc).subscribe(o);
500+
501+
source.onNext(1);
502+
source.onCompleted();
503+
504+
inOrder.verify(o).onNext(1);
505+
inOrder.verify(o).onCompleted();
506+
inOrder.verifyNoMoreInteractions();
507+
verify(o, never()).onError(any(Throwable.class));
508+
}
509+
510+
@Test
511+
public void testDelayWithObservableSubscriptionRunCompletion() {
512+
PublishSubject<Integer> source = PublishSubject.create();
513+
final PublishSubject<Integer> sdelay = PublishSubject.create();
514+
final PublishSubject<Integer> delay = PublishSubject.create();
515+
Func0<Observable<Integer>> subFunc = new Func0<Observable<Integer>>() {
516+
@Override
517+
public Observable<Integer> call() {
518+
return sdelay;
519+
}
520+
};
521+
Func1<Integer, Observable<Integer>> delayFunc = new Func1<Integer, Observable<Integer>>() {
522+
523+
@Override
524+
public Observable<Integer> call(Integer t1) {
525+
return delay;
526+
}
527+
};
528+
529+
@SuppressWarnings("unchecked")
530+
Observer<Object> o = mock(Observer.class);
531+
InOrder inOrder = inOrder(o);
532+
533+
source.delay(subFunc, delayFunc).subscribe(o);
534+
535+
source.onNext(1);
536+
sdelay.onCompleted();
537+
538+
source.onNext(2);
539+
delay.onNext(2);
540+
541+
inOrder.verify(o).onNext(2);
542+
inOrder.verifyNoMoreInteractions();
543+
verify(o, never()).onError(any(Throwable.class));
544+
verify(o, never()).onCompleted();
545+
}
524546
}

0 commit comments

Comments
 (0)