1515 */
1616package rx .operators ;
1717
18+ import java .util .concurrent .ConcurrentLinkedQueue ;
19+ import java .util .concurrent .atomic .AtomicInteger ;
20+
21+ import rx .Notification ;
1822import rx .Observable ;
1923import rx .Observable .OnSubscribeFunc ;
2024import rx .Observer ;
2125import rx .Scheduler ;
2226import rx .Subscription ;
27+ import rx .concurrency .CurrentThreadScheduler ;
2328import rx .concurrency .ImmediateScheduler ;
2429import rx .subscriptions .CompositeSubscription ;
30+ import rx .util .functions .Action0 ;
31+ import rx .util .functions .Action1 ;
2532
2633/**
2734 * Asynchronously notify Observers on the specified Scheduler.
@@ -38,6 +45,9 @@ private static class ObserveOn<T> implements OnSubscribeFunc<T> {
3845 private final Observable <? extends T > source ;
3946 private final Scheduler scheduler ;
4047
48+ final ConcurrentLinkedQueue <Notification <? extends T >> queue = new ConcurrentLinkedQueue <Notification <? extends T >>();
49+ final AtomicInteger counter = new AtomicInteger (0 );
50+
4151 public ObserveOn (Observable <? extends T > source , Scheduler scheduler ) {
4252 this .source = source ;
4353 this .scheduler = scheduler ;
@@ -48,11 +58,55 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
4858 if (scheduler instanceof ImmediateScheduler ) {
4959 // do nothing if we request ImmediateScheduler so we don't invoke overhead
5060 return source .subscribe (observer );
61+ } else if (scheduler instanceof CurrentThreadScheduler ) {
62+ // do nothing if we request CurrentThreadScheduler so we don't invoke overhead
63+ return source .subscribe (observer );
5164 } else {
52- CompositeSubscription s = new CompositeSubscription ();
53- s .add (source .subscribe (new ScheduledObserver <T >(s , observer , scheduler )));
54- return s ;
65+ return observeOn (observer , scheduler );
5566 }
5667 }
68+
69+ public Subscription observeOn (final Observer <? super T > observer , Scheduler scheduler ) {
70+ final CompositeSubscription s = new CompositeSubscription ();
71+
72+ s .add (source .materialize ().subscribe (new Action1 <Notification <? extends T >>() {
73+
74+ @ Override
75+ public void call (Notification <? extends T > e ) {
76+ // this must happen before 'counter' is used to provide synchronization between threads
77+ queue .offer (e );
78+
79+ // we now use counter to atomically determine if we need to start processing or not
80+ // it will be 0 if it's the first notification or the scheduler has finished processing work
81+ // and we need to start doing it again
82+ if (counter .getAndIncrement () == 0 ) {
83+ processQueue (s , observer );
84+ }
85+
86+ }
87+ }));
88+
89+ return s ;
90+ }
91+
92+ private void processQueue (CompositeSubscription s , final Observer <? super T > observer ) {
93+ s .add (scheduler .schedule (new Action1 <Action0 >() {
94+ @ Override
95+ public void call (Action0 self ) {
96+ Notification <? extends T > not = queue .poll ();
97+ if (not != null ) {
98+ not .accept (observer );
99+ }
100+
101+ // decrement count and if we still have work to do
102+ // recursively schedule ourselves to process again
103+ if (counter .decrementAndGet () > 0 ) {
104+ self .call ();
105+ }
106+
107+ }
108+ }));
109+ }
57110 }
111+
58112}
0 commit comments