1717
1818import java .util .concurrent .ConcurrentLinkedQueue ;
1919import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
20+ import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
21+
2022import rx .Observable ;
2123import rx .Observable .Operator ;
24+ import rx .Producer ;
2225import rx .Subscriber ;
2326import rx .functions .Action0 ;
2427import rx .observers .SerializedSubscriber ;
3033 * <p>
3134 * <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/concat.png" alt="">
3235 *
33- * @param <T> the source and result value type
36+ * @param <T>
37+ * the source and result value type
3438 */
3539public final class OperatorConcat <T > implements Operator <T , Observable <? extends T >> {
3640 @ Override
3741 public Subscriber <? super Observable <? extends T >> call (final Subscriber <? super T > child ) {
3842 final SerializedSubscriber <T > s = new SerializedSubscriber <T >(child );
3943 final SerialSubscription current = new SerialSubscription ();
4044 child .add (current );
41- return new ConcatSubscriber <T >(s , current );
45+ ConcatSubscriber <T > cs = new ConcatSubscriber <T >(s , current );
46+ ConcatProducer <T > cp = new ConcatProducer <T >(cs );
47+ child .setProducer (cp );
48+ return cs ;
4249 }
43-
50+
51+ static final class ConcatProducer <T > implements Producer {
52+ final ConcatSubscriber <T > cs ;
53+
54+ ConcatProducer (ConcatSubscriber <T > cs ) {
55+ this .cs = cs ;
56+ }
57+
58+ @ Override
59+ public void request (long n ) {
60+ cs .requestFromChild (n );
61+ }
62+
63+ }
64+
4465 static final class ConcatSubscriber <T > extends Subscriber <Observable <? extends T >> {
4566 final NotificationLite <Observable <? extends T >> nl = NotificationLite .instance ();
46- private final Subscriber <T > s ;
67+ private final Subscriber <T > child ;
4768 private final SerialSubscription current ;
4869 final ConcurrentLinkedQueue <Object > queue ;
70+
71+ volatile ConcatInnerSubscriber <T > currentSubscriber ;
72+
4973 volatile int wip ;
5074 @ SuppressWarnings ("rawtypes" )
51- static final AtomicIntegerFieldUpdater <ConcatSubscriber > WIP_UPDATER
52- = AtomicIntegerFieldUpdater .newUpdater (ConcatSubscriber .class , "wip" );
53-
75+ static final AtomicIntegerFieldUpdater <ConcatSubscriber > WIP_UPDATER = AtomicIntegerFieldUpdater .newUpdater (ConcatSubscriber .class , "wip" );
76+
77+ // accessed by REQUESTED_UPDATER
78+ private volatile long requested ;
79+ @ SuppressWarnings ("rawtypes" )
80+ private static final AtomicLongFieldUpdater <ConcatSubscriber > REQUESTED_UPDATER = AtomicLongFieldUpdater .newUpdater (ConcatSubscriber .class , "requested" );
81+
5482 public ConcatSubscriber (Subscriber <T > s , SerialSubscription current ) {
5583 super (s );
56- this .s = s ;
84+ this .child = s ;
5785 this .current = current ;
5886 this .queue = new ConcurrentLinkedQueue <Object >();
5987 add (Subscriptions .create (new Action0 () {
@@ -71,20 +99,42 @@ public void onStart() {
7199 request (2 );
72100 }
73101
102+ private void requestFromChild (long n ) {
103+ // we track 'requested' so we know whether we should subscribe the next or not
104+ if (REQUESTED_UPDATER .getAndAdd (this , n ) == 0 ) {
105+ if (currentSubscriber == null && wip > 0 ) {
106+ // this means we may be moving from one subscriber to another after having stopped processing
107+ // so need to kick off the subscribe via this request notification
108+ subscribeNext ();
109+ // return here as we don't want to do the requestMore logic below (which would double request)
110+ return ;
111+ }
112+ }
113+
114+ if (currentSubscriber != null ) {
115+ // otherwise we are just passing it through to the currentSubscriber
116+ currentSubscriber .requestMore (n );
117+ }
118+ }
119+
120+ private void decrementRequested () {
121+ REQUESTED_UPDATER .decrementAndGet (this );
122+ }
123+
74124 @ Override
75125 public void onNext (Observable <? extends T > t ) {
76126 queue .add (nl .next (t ));
77127 if (WIP_UPDATER .getAndIncrement (this ) == 0 ) {
78128 subscribeNext ();
79129 }
80130 }
81-
131+
82132 @ Override
83133 public void onError (Throwable e ) {
84- s .onError (e );
134+ child .onError (e );
85135 unsubscribe ();
86136 }
87-
137+
88138 @ Override
89139 public void onCompleted () {
90140 queue .add (nl .completed ());
@@ -95,39 +145,65 @@ public void onCompleted() {
95145
96146 void completeInner () {
97147 request (1 );
148+ currentSubscriber = null ;
98149 if (WIP_UPDATER .decrementAndGet (this ) > 0 ) {
99150 subscribeNext ();
100151 }
101152 }
102153
103154 void subscribeNext () {
104- Object o = queue .poll ();
105- if (nl .isCompleted (o )) {
106- s .onCompleted ();
107- } else if (o != null ) {
108- Observable <? extends T > obs = nl .getValue (o );
109- Subscriber <T > sourceSub = new Subscriber <T >() {
110-
111- @ Override
112- public void onNext (T t ) {
113- // TODO need to support backpressure here https://github.com/Netflix/RxJava/issues/1480
114- s .onNext (t );
115- }
116-
117- @ Override
118- public void onError (Throwable e ) {
119- ConcatSubscriber .this .onError (e );
120- }
121-
122- @ Override
123- public void onCompleted () {
124- completeInner ();
125- }
126-
127- };
128- current .set (sourceSub );
129- obs .unsafeSubscribe (sourceSub );
155+ if (requested > 0 ) {
156+ Object o = queue .poll ();
157+ if (nl .isCompleted (o )) {
158+ child .onCompleted ();
159+ } else if (o != null ) {
160+ Observable <? extends T > obs = nl .getValue (o );
161+ currentSubscriber = new ConcatInnerSubscriber <T >(this , child , requested );
162+ current .set (currentSubscriber );
163+ obs .unsafeSubscribe (currentSubscriber );
164+ }
165+ } else {
166+ // requested == 0, so we'll peek to see if we are completed, otherwise wait until another request
167+ Object o = queue .peek ();
168+ if (nl .isCompleted (o )) {
169+ child .onCompleted ();
170+ }
130171 }
131172 }
132173 }
174+
175+ static class ConcatInnerSubscriber <T > extends Subscriber <T > {
176+
177+ private final Subscriber <T > child ;
178+ private final ConcatSubscriber <T > parent ;
179+
180+ public ConcatInnerSubscriber (ConcatSubscriber <T > parent , Subscriber <T > child , long initialRequest ) {
181+ this .parent = parent ;
182+ this .child = child ;
183+ request (initialRequest );
184+ }
185+
186+ void requestMore (long n ) {
187+ request (n );
188+ }
189+
190+ @ Override
191+ public void onNext (T t ) {
192+ parent .decrementRequested ();
193+ child .onNext (t );
194+ }
195+
196+ @ Override
197+ public void onError (Throwable e ) {
198+ // terminal error through parent so everything gets cleaned up, including this inner
199+ parent .onError (e );
200+ }
201+
202+ @ Override
203+ public void onCompleted () {
204+ // terminal completion to parent so it continues to the next
205+ parent .completeInner ();
206+ }
207+
208+ };
133209}
0 commit comments