1616package rx .internal .operators ;
1717
1818import java .util .Iterator ;
19- import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
19+ import java .util .concurrent .atomic .AtomicLong ;
2020
21+ import rx .*;
2122import rx .Observable .OnSubscribe ;
22- import rx .Producer ;
23- import rx .Subscriber ;
2423
2524/**
2625 * Converts an {@code Iterable} sequence into an {@code Observable}.
@@ -50,33 +49,54 @@ public void call(final Subscriber<? super T> o) {
5049 o .setProducer (new IterableProducer <T >(o , it ));
5150 }
5251
53- private static final class IterableProducer <T > implements Producer {
52+ private static final class IterableProducer <T > extends AtomicLong implements Producer {
53+ /** */
54+ private static final long serialVersionUID = -8730475647105475802L ;
5455 private final Subscriber <? super T > o ;
5556 private final Iterator <? extends T > it ;
5657
57- private volatile long requested = 0 ;
58- @ SuppressWarnings ("rawtypes" )
59- private static final AtomicLongFieldUpdater <IterableProducer > REQUESTED_UPDATER = AtomicLongFieldUpdater .newUpdater (IterableProducer .class , "requested" );
60-
6158 private IterableProducer (Subscriber <? super T > o , Iterator <? extends T > it ) {
6259 this .o = o ;
6360 this .it = it ;
6461 }
6562
6663 @ Override
6764 public void request (long n ) {
68- if (requested == Long .MAX_VALUE ) {
65+ if (get () == Long .MAX_VALUE ) {
6966 // already started with fast-path
7067 return ;
7168 }
72- if (n == Long .MAX_VALUE && REQUESTED_UPDATER .compareAndSet (this , 0 , Long .MAX_VALUE )) {
73- // fast-path without backpressure
69+ if (n == Long .MAX_VALUE && compareAndSet (0 , Long .MAX_VALUE )) {
70+ fastpath ();
71+ } else
72+ if (n > 0 && BackpressureUtils .getAndAddRequest (this , n ) == 0L ) {
73+ slowpath (n );
74+ }
75+
76+ }
77+
78+ void slowpath (long n ) {
79+ // backpressure is requested
80+ final Subscriber <? super T > o = this .o ;
81+ final Iterator <? extends T > it = this .it ;
7482
83+ long r = n ;
84+ while (true ) {
85+ /*
86+ * This complicated logic is done to avoid touching the
87+ * volatile `requested` value during the loop itself. If
88+ * it is touched during the loop the performance is
89+ * impacted significantly.
90+ */
91+ long numToEmit = r ;
7592 while (true ) {
7693 if (o .isUnsubscribed ()) {
7794 return ;
7895 } else if (it .hasNext ()) {
79- o .onNext (it .next ());
96+ if (--numToEmit >= 0 ) {
97+ o .onNext (it .next ());
98+ } else
99+ break ;
80100 } else if (!o .isUnsubscribed ()) {
81101 o .onCompleted ();
82102 return ;
@@ -85,45 +105,34 @@ public void request(long n) {
85105 return ;
86106 }
87107 }
88- } else if (n > 0 ) {
89- // backpressure is requested
90- long _c = BackpressureUtils .getAndAddRequest (REQUESTED_UPDATER , this , n );
91- if (_c == 0 ) {
92- while (true ) {
93- /*
94- * This complicated logic is done to avoid touching the
95- * volatile `requested` value during the loop itself. If
96- * it is touched during the loop the performance is
97- * impacted significantly.
98- */
99- long r = requested ;
100- long numToEmit = r ;
101- while (true ) {
102- if (o .isUnsubscribed ()) {
103- return ;
104- } else if (it .hasNext ()) {
105- if (--numToEmit >= 0 ) {
106- o .onNext (it .next ());
107- } else
108- break ;
109- } else if (!o .isUnsubscribed ()) {
110- o .onCompleted ();
111- return ;
112- } else {
113- // is unsubscribed
114- return ;
115- }
116- }
117- if (REQUESTED_UPDATER .addAndGet (this , -r ) == 0 ) {
118- // we're done emitting the number requested so
119- // return
120- return ;
121- }
122-
123- }
108+ r = addAndGet (-r );
109+ if (r == 0L ) {
110+ // we're done emitting the number requested so
111+ // return
112+ return ;
124113 }
114+
125115 }
116+ }
126117
118+ void fastpath () {
119+ // fast-path without backpressure
120+ final Subscriber <? super T > o = this .o ;
121+ final Iterator <? extends T > it = this .it ;
122+
123+ while (true ) {
124+ if (o .isUnsubscribed ()) {
125+ return ;
126+ } else if (it .hasNext ()) {
127+ o .onNext (it .next ());
128+ } else if (!o .isUnsubscribed ()) {
129+ o .onCompleted ();
130+ return ;
131+ } else {
132+ // is unsubscribed
133+ return ;
134+ }
135+ }
127136 }
128137 }
129138
0 commit comments