Skip to content

Commit af3720e

Browse files
Merge branch 'schedulerf' of git://github.com/mairbek/RxJava into pull-234-merge-ObserveOn
2 parents b66557c + 35f2b2f commit af3720e

File tree

2 files changed

+72
-20
lines changed

2 files changed

+72
-20
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import org.junit.Test;
2222

23+
import org.mockito.InOrder;
2324
import rx.Observable;
2425
import rx.Observer;
2526
import rx.Scheduler;
2627
import rx.Subscription;
2728
import rx.concurrency.Schedulers;
28-
import rx.util.functions.Action0;
2929
import rx.util.functions.Func1;
3030

3131
public class OperationObserveOn {
@@ -60,15 +60,35 @@ public void testObserveOn() {
6060
Observer<Integer> observer = mock(Observer.class);
6161
Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);
6262

63-
verify(scheduler, times(4)).schedule(any(Action0.class));
64-
verifyNoMoreInteractions(scheduler);
65-
6663
verify(observer, times(1)).onNext(1);
6764
verify(observer, times(1)).onNext(2);
6865
verify(observer, times(1)).onNext(3);
6966
verify(observer, times(1)).onCompleted();
7067
}
7168

69+
70+
@Test
71+
@SuppressWarnings("unchecked")
72+
public void testOrdering() throws InterruptedException {
73+
Observable<String> obs = Observable.from("one", null, "two", "three", "four");
74+
75+
Observer<String> observer = mock(Observer.class);
76+
77+
InOrder inOrder = inOrder(observer);
78+
79+
obs.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);
80+
81+
Thread.sleep(500); // !!! not a true unit test
82+
83+
inOrder.verify(observer, times(1)).onNext("one");
84+
inOrder.verify(observer, times(1)).onNext(null);
85+
inOrder.verify(observer, times(1)).onNext("two");
86+
inOrder.verify(observer, times(1)).onNext("three");
87+
inOrder.verify(observer, times(1)).onNext("four");
88+
inOrder.verify(observer, times(1)).onCompleted();
89+
inOrder.verifyNoMoreInteractions();
90+
}
91+
7292
}
7393

7494
}

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,45 +15,77 @@
1515
*/
1616
package rx.operators;
1717

18+
import rx.Notification;
1819
import rx.Observer;
1920
import rx.Scheduler;
2021
import rx.util.functions.Action0;
2122

23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
2226
/* package */class ScheduledObserver<T> implements Observer<T> {
2327
private final Observer<T> underlying;
2428
private final Scheduler scheduler;
2529

30+
private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
31+
private final AtomicInteger counter = new AtomicInteger(0);
32+
2633
public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
2734
this.underlying = underlying;
2835
this.scheduler = scheduler;
2936
}
3037

3138
@Override
3239
public void onCompleted() {
33-
scheduler.schedule(new Action0() {
34-
@Override
35-
public void call() {
36-
underlying.onCompleted();
37-
}
38-
});
40+
enqueue(new Notification<T>());
3941
}
4042

4143
@Override
4244
public void onError(final Exception e) {
43-
scheduler.schedule(new Action0() {
44-
@Override
45-
public void call() {
46-
underlying.onError(e);
47-
}
48-
});
45+
enqueue(new Notification<T>(e));
4946
}
5047

5148
@Override
5249
public void onNext(final T args) {
50+
enqueue(new Notification<T>(args));
51+
}
52+
53+
private void enqueue(Notification<T> notification) {
54+
int count = counter.getAndIncrement();
55+
56+
queue.offer(notification);
57+
58+
if (count == 0) {
59+
processQueue();
60+
}
61+
}
62+
63+
private void processQueue() {
5364
scheduler.schedule(new Action0() {
5465
@Override
5566
public void call() {
56-
underlying.onNext(args);
67+
Notification<T> not = queue.poll();
68+
69+
switch (not.getKind()) {
70+
case OnNext:
71+
underlying.onNext(not.getValue());
72+
break;
73+
case OnError:
74+
underlying.onError(not.getException());
75+
break;
76+
case OnCompleted:
77+
underlying.onCompleted();
78+
break;
79+
default:
80+
throw new IllegalStateException("Unknown kind of notification " + not);
81+
82+
}
83+
84+
int count = counter.decrementAndGet();
85+
if (count > 0) {
86+
scheduler.schedule(this);
87+
}
88+
5789
}
5890
});
5991
}

0 commit comments

Comments
 (0)