1515 */
1616package rx .operators ;
1717
18- import java .util .concurrent .Semaphore ;
18+ import java .util .concurrent .ConcurrentLinkedQueue ;
1919import java .util .concurrent .atomic .AtomicLong ;
2020
2121import rx .Observable .Operator ;
2222import rx .Scheduler ;
2323import rx .Scheduler .Inner ;
2424import rx .Subscriber ;
2525import rx .schedulers .ImmediateScheduler ;
26- import rx .schedulers .TestScheduler ;
2726import rx .schedulers .TrampolineScheduler ;
28- import rx .subscriptions .Subscriptions ;
29- import rx .util .functions .Action0 ;
3027import rx .util .functions .Action1 ;
3128
3229/**
33- * Delivers events on the specified Scheduler.
34- * <p>
35- * This provides backpressure by blocking the incoming onNext when there is already one in the queue.
36- * <p>
37- * This means that at any given time the max number of "onNext" in flight is 3:
38- * -> 1 being delivered on the Scheduler
39- * -> 1 in the queue waiting for the Scheduler
40- * -> 1 blocking on the queue waiting to deliver it
41- *
42- * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler
43- * can loop and have something to do each time around to optimize for avoiding rescheduling when it
44- * can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop
45- * thus if the queue is empty it exits and next time something is added it will reschedule.
30+ * Delivers events on the specified Scheduler asynchronously via an unbounded buffer.
4631 *
4732 * <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/observeOn.png">
4833 */
4934public class OperatorObserveOn <T > implements Operator <T , T > {
5035
5136 private final Scheduler scheduler ;
52- private final int bufferSize ;
5337
5438 /**
55- *
5639 * @param scheduler
57- * @param bufferSize
58- * that will be rounded up to the next power of 2
5940 */
60- public OperatorObserveOn (Scheduler scheduler , int bufferSize ) {
61- this .scheduler = scheduler ;
62- this .bufferSize = roundToNextPowerOfTwoIfNecessary (bufferSize );
63- }
64-
6541 public OperatorObserveOn (Scheduler scheduler ) {
66- this (scheduler , 1 );
67- }
68-
69- private static int roundToNextPowerOfTwoIfNecessary (int num ) {
70- if ((num & -num ) == num ) {
71- return num ;
72- } else {
73- int result = 1 ;
74- while (num != 0 )
75- {
76- num >>= 1 ;
77- result <<= 1 ;
78- }
79- return result ;
80- }
42+ this .scheduler = scheduler ;
8143 }
8244
8345 @ Override
@@ -88,19 +50,19 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
8850 } else if (scheduler instanceof TrampolineScheduler ) {
8951 // avoid overhead, execute directly
9052 return child ;
91- } else if (scheduler instanceof TestScheduler ) {
92- // this one will deadlock as it is single-threaded and won't run the scheduled
93- // work until it manually advances, which it won't be able to do as it will block
94- return child ;
9553 } else {
9654 return new ObserveOnSubscriber (child );
9755 }
9856 }
9957
100- private static Object NULL_SENTINEL = new Object ();
101- private static Object COMPLETE_SENTINEL = new Object ();
58+ private static class Sentinel {
10259
103- private static class ErrorSentinel {
60+ }
61+
62+ private static Sentinel NULL_SENTINEL = new Sentinel ();
63+ private static Sentinel COMPLETE_SENTINEL = new Sentinel ();
64+
65+ private static class ErrorSentinel extends Sentinel {
10466 final Throwable e ;
10567
10668 ErrorSentinel (Throwable e ) {
@@ -113,7 +75,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
11375 final Subscriber <? super T > observer ;
11476 private volatile Scheduler .Inner recursiveScheduler ;
11577
116- private final InterruptibleBlockingQueue <Object > queue = new InterruptibleBlockingQueue <Object >(bufferSize );
78+ private final ConcurrentLinkedQueue <Object > queue = new ConcurrentLinkedQueue <Object >();
11779 final AtomicLong counter = new AtomicLong (0 );
11880
11981 public ObserveOnSubscriber (Subscriber <? super T > observer ) {
@@ -123,62 +85,29 @@ public ObserveOnSubscriber(Subscriber<? super T> observer) {
12385
12486 @ Override
12587 public void onNext (final T t ) {
126- try {
127- // we want to block for natural back-pressure
128- // so that the producer waits for each value to be consumed
129- if (t == null ) {
130- queue .addBlocking (NULL_SENTINEL );
131- } else {
132- queue .addBlocking (t );
133- }
134- schedule ();
135- } catch (InterruptedException e ) {
136- if (!isUnsubscribed ()) {
137- onError (e );
138- }
88+ if (t == null ) {
89+ queue .offer (NULL_SENTINEL );
90+ } else {
91+ queue .offer (t );
13992 }
93+ schedule ();
14094 }
14195
14296 @ Override
14397 public void onCompleted () {
144- try {
145- // we want to block for natural back-pressure
146- // so that the producer waits for each value to be consumed
147- queue .addBlocking (COMPLETE_SENTINEL );
148- schedule ();
149- } catch (InterruptedException e ) {
150- onError (e );
151- }
98+ queue .offer (COMPLETE_SENTINEL );
99+ schedule ();
152100 }
153101
154102 @ Override
155103 public void onError (final Throwable e ) {
156- try {
157- // we want to block for natural back-pressure
158- // so that the producer waits for each value to be consumed
159- queue .addBlocking (new ErrorSentinel (e ));
160- schedule ();
161- } catch (InterruptedException e2 ) {
162- // call directly if we can't schedule
163- observer .onError (e2 );
164- }
104+ queue .offer (new ErrorSentinel (e ));
105+ schedule ();
165106 }
166107
167108 protected void schedule () {
168109 if (counter .getAndIncrement () == 0 ) {
169110 if (recursiveScheduler == null ) {
170- // first time through, register a Subscription
171- // that can interrupt this thread
172- add (Subscriptions .create (new Action0 () {
173-
174- @ Override
175- public void call () {
176- // we have to interrupt the parent thread because
177- // it can be blocked on queue.put
178- queue .interrupt ();
179- }
180-
181- }));
182111 add (scheduler .schedule (new Action1 <Inner >() {
183112
184113 @ Override
@@ -206,12 +135,14 @@ private void pollQueue() {
206135 do {
207136 Object v = queue .poll ();
208137 if (v != null ) {
209- if (v == NULL_SENTINEL ) {
210- observer .onNext (null );
211- } else if (v == COMPLETE_SENTINEL ) {
212- observer .onCompleted ();
213- } else if (v instanceof ErrorSentinel ) {
214- observer .onError (((ErrorSentinel ) v ).e );
138+ if (v instanceof Sentinel ) {
139+ if (v == NULL_SENTINEL ) {
140+ observer .onNext (null );
141+ } else if (v == COMPLETE_SENTINEL ) {
142+ observer .onCompleted ();
143+ } else if (v instanceof ErrorSentinel ) {
144+ observer .onError (((ErrorSentinel ) v ).e );
145+ }
215146 } else {
216147 observer .onNext ((T ) v );
217148 }
@@ -221,110 +152,4 @@ private void pollQueue() {
221152
222153 }
223154
224- /**
225- * Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread).
226- *
227- * This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows
228- * unsubscribe behavior when this queue is being used.
229- *
230- * @param <E>
231- */
232- private static class InterruptibleBlockingQueue <E > {
233-
234- private final Semaphore semaphore ;
235- private volatile boolean interrupted = false ;
236-
237- private final E [] buffer ;
238-
239- private AtomicLong tail = new AtomicLong ();
240- private AtomicLong head = new AtomicLong ();
241- private final int capacity ;
242- private final int mask ;
243-
244- @ SuppressWarnings ("unchecked" )
245- public InterruptibleBlockingQueue (final int size ) {
246- this .semaphore = new Semaphore (size );
247- this .capacity = size ;
248- this .mask = size - 1 ;
249- buffer = (E []) new Object [size ];
250- }
251-
252- /**
253- * Used to unsubscribe and interrupt the producer if blocked in put()
254- */
255- public void interrupt () {
256- interrupted = true ;
257- semaphore .release ();
258- }
259-
260- public void addBlocking (final E e ) throws InterruptedException {
261- if (interrupted ) {
262- throw new InterruptedException ("Interrupted by Unsubscribe" );
263- }
264- semaphore .acquire ();
265- if (interrupted ) {
266- throw new InterruptedException ("Interrupted by Unsubscribe" );
267- }
268- if (e == null ) {
269- throw new IllegalArgumentException ("Can not put null" );
270- }
271-
272- if (offer (e )) {
273- return ;
274- } else {
275- throw new IllegalStateException ("Queue is full" );
276- }
277- }
278-
279- private boolean offer (final E e ) {
280- final long _t = tail .get ();
281- if (_t - head .get () == capacity ) {
282- // queue is full
283- return false ;
284- }
285- int index = (int ) (_t & mask );
286- buffer [index ] = e ;
287- // move the tail forward
288- tail .lazySet (_t + 1 );
289-
290- return true ;
291- }
292-
293- public E poll () {
294- if (interrupted ) {
295- return null ;
296- }
297- final long _h = head .get ();
298- if (tail .get () == _h ) {
299- // nothing available
300- return null ;
301- }
302- int index = (int ) (_h & mask );
303-
304- // fetch the item
305- E v = buffer [index ];
306- // allow GC to happen
307- buffer [index ] = null ;
308- // increment and signal we're done
309- head .lazySet (_h + 1 );
310- if (v != null ) {
311- semaphore .release ();
312- }
313- return v ;
314- }
315-
316- public int size ()
317- {
318- int size ;
319- do
320- {
321- final long currentHead = head .get ();
322- final long currentTail = tail .get ();
323- size = (int ) (currentTail - currentHead );
324- } while (size > buffer .length );
325-
326- return size ;
327- }
328-
329- }
330155}
0 commit comments