1515 */
1616package rx .internal .operators ;
1717
18- import java .util .LinkedList ;
19- import java .util .Queue ;
20- import java . util . concurrent . atomic . AtomicIntegerFieldUpdater ;
21- import rx .Observable ;
18+ import java .util .* ;
19+ import java .util .concurrent . atomic .* ;
20+
21+ import rx .* ;
2222import rx .Observable .Operator ;
23- import rx .Subscriber ;
23+ import rx .Observable ;
24+ import rx .exceptions .MissingBackpressureException ;
25+ import rx .internal .util .RxRingBuffer ;
2426import rx .observers .SerializedSubscriber ;
2527import rx .subscriptions .CompositeSubscription ;
2628
@@ -47,34 +49,75 @@ public Subscriber<? super Observable<? extends T>> call(Subscriber<? super T> ch
4749 final CompositeSubscription csub = new CompositeSubscription ();
4850 child .add (csub );
4951
50- return new SourceSubscriber <T >(maxConcurrency , s , csub );
52+ SourceSubscriber <T > ssub = new SourceSubscriber <T >(maxConcurrency , s , csub );
53+ child .setProducer (new MergeMaxConcurrentProducer <T >(ssub ));
54+
55+ return ssub ;
56+ }
57+ /** Routes the requests from downstream to the sourcesubscriber. */
58+ static final class MergeMaxConcurrentProducer <T > implements Producer {
59+ final SourceSubscriber <T > ssub ;
60+ public MergeMaxConcurrentProducer (SourceSubscriber <T > ssub ) {
61+ this .ssub = ssub ;
62+ }
63+ @ Override
64+ public void request (long n ) {
65+ ssub .downstreamRequest (n );
66+ }
5167 }
5268 static final class SourceSubscriber <T > extends Subscriber <Observable <? extends T >> {
69+ final NotificationLite <T > nl = NotificationLite .instance ();
5370 final int maxConcurrency ;
5471 final Subscriber <T > s ;
5572 final CompositeSubscription csub ;
5673 final Object guard ;
5774
5875 volatile int wip ;
5976 @ SuppressWarnings ("rawtypes" )
60- static final AtomicIntegerFieldUpdater <SourceSubscriber > WIP_UPDATER
77+ static final AtomicIntegerFieldUpdater <SourceSubscriber > WIP
6178 = AtomicIntegerFieldUpdater .newUpdater (SourceSubscriber .class , "wip" );
79+ volatile int sourceIndex ;
80+ @ SuppressWarnings ("rawtypes" )
81+ static final AtomicIntegerFieldUpdater <SourceSubscriber > SOURCE_INDEX
82+ = AtomicIntegerFieldUpdater .newUpdater (SourceSubscriber .class , "sourceIndex" );
6283
6384 /** Guarded by guard. */
6485 int active ;
6586 /** Guarded by guard. */
6687 final Queue <Observable <? extends T >> queue ;
6788
89+ /** Indicates the emitting phase. Guarded by this. */
90+ boolean emitting ;
91+ /** Counts the missed emitting calls. Guarded by this. */
92+ int missedEmitting ;
93+ /** The last buffer index in the round-robin drain scheme. Accessed while emitting == true. */
94+ int lastIndex ;
95+
96+ /** Guarded by itself. */
97+ final List <MergeItemSubscriber > subscribers ;
98+
99+ volatile long requested ;
100+ @ SuppressWarnings ("rawtypes" )
101+ static final AtomicLongFieldUpdater <SourceSubscriber > REQUESTED
102+ = AtomicLongFieldUpdater .newUpdater (SourceSubscriber .class , "requested" );
103+
104+
68105 public SourceSubscriber (int maxConcurrency , Subscriber <T > s , CompositeSubscription csub ) {
69106 super (s );
70107 this .maxConcurrency = maxConcurrency ;
71108 this .s = s ;
72109 this .csub = csub ;
73110 this .guard = new Object ();
74- this .queue = new LinkedList <Observable <? extends T >>();
111+ this .queue = new ArrayDeque <Observable <? extends T >>(maxConcurrency );
112+ this .subscribers = Collections .synchronizedList (new ArrayList <MergeItemSubscriber >());
75113 this .wip = 1 ;
76114 }
77115
116+ @ Override
117+ public void onStart () {
118+ request (maxConcurrency );
119+ }
120+
78121 @ Override
79122 public void onNext (Observable <? extends T > t ) {
80123 synchronized (guard ) {
@@ -94,50 +137,213 @@ void subscribeNext() {
94137 queue .poll ();
95138 }
96139
97- Subscriber <T > itemSub = new Subscriber <T >() {
98- boolean once = true ;
99- @ Override
100- public void onNext (T t ) {
101- s .onNext (t );
102- }
103-
104- @ Override
105- public void onError (Throwable e ) {
106- SourceSubscriber .this .onError (e );
107- }
108-
109- @ Override
110- public void onCompleted () {
111- if (once ) {
112- once = false ;
113- synchronized (guard ) {
114- active --;
115- }
116- csub .remove (this );
117-
118- subscribeNext ();
119-
120- SourceSubscriber .this .onCompleted ();
121- }
122- }
123-
124- };
140+ MergeItemSubscriber itemSub = new MergeItemSubscriber (SOURCE_INDEX .getAndIncrement (this ));
141+ subscribers .add (itemSub );
142+
125143 csub .add (itemSub );
126- WIP_UPDATER .incrementAndGet (this );
144+
145+ WIP .incrementAndGet (this );
127146
128147 t .unsafeSubscribe (itemSub );
148+
149+ request (1 );
129150 }
130151
131152 @ Override
132153 public void onError (Throwable e ) {
133- s .onError (e );
134- unsubscribe ();
154+ Object [] active ;
155+ synchronized (subscribers ) {
156+ active = subscribers .toArray ();
157+ subscribers .clear ();
158+ }
159+
160+ try {
161+ s .onError (e );
162+
163+ unsubscribe ();
164+ } finally {
165+ for (Object o : active ) {
166+ @ SuppressWarnings ("unchecked" )
167+ MergeItemSubscriber a = (MergeItemSubscriber )o ;
168+ a .release ();
169+ }
170+ }
171+
135172 }
136173
137174 @ Override
138175 public void onCompleted () {
139- if (WIP_UPDATER .decrementAndGet (this ) == 0 ) {
140- s .onCompleted ();
176+ WIP .decrementAndGet (this );
177+ drain ();
178+ }
179+
180+ protected void downstreamRequest (long n ) {
181+ for (;;) {
182+ long r = requested ;
183+ long u ;
184+ if (r != Long .MAX_VALUE && n == Long .MAX_VALUE ) {
185+ u = Long .MAX_VALUE ;
186+ } else
187+ if (r + n < 0 ) {
188+ u = Long .MAX_VALUE ;
189+ } else {
190+ u = r + n ;
191+ }
192+ if (REQUESTED .compareAndSet (this , r , u )) {
193+ break ;
194+ }
195+ }
196+ drain ();
197+ }
198+
199+ protected void drain () {
200+ synchronized (this ) {
201+ if (emitting ) {
202+ missedEmitting ++;
203+ return ;
204+ }
205+ emitting = true ;
206+ missedEmitting = 0 ;
207+ }
208+ final List <SourceSubscriber <T >.MergeItemSubscriber > subs = subscribers ;
209+ final Subscriber <T > child = s ;
210+ Object [] active = new Object [subs .size ()];
211+ do {
212+ long r ;
213+
214+ outer :
215+ while ((r = requested ) > 0 ) {
216+ int idx = lastIndex ;
217+ synchronized (subs ) {
218+ if (subs .size () == active .length ) {
219+ active = subs .toArray (active );
220+ } else {
221+ active = subs .toArray ();
222+ }
223+ }
224+
225+ int resumeIndex = 0 ;
226+ int j = 0 ;
227+ for (Object o : active ) {
228+ @ SuppressWarnings ("unchecked" )
229+ MergeItemSubscriber e = (MergeItemSubscriber )o ;
230+ if (e .index == idx ) {
231+ resumeIndex = j ;
232+ break ;
233+ }
234+ j ++;
235+ }
236+ int sumConsumed = 0 ;
237+ for (int i = 0 ; i < active .length ; i ++) {
238+ j = (i + resumeIndex ) % active .length ;
239+
240+ @ SuppressWarnings ("unchecked" )
241+ final MergeItemSubscriber e = (MergeItemSubscriber )active [j ];
242+ final RxRingBuffer b = e .buffer ;
243+ lastIndex = e .index ;
244+
245+ if (!e .once && b .peek () == null ) {
246+ subs .remove (e );
247+
248+ synchronized (guard ) {
249+ this .active --;
250+ }
251+ csub .remove (e );
252+
253+ e .release ();
254+
255+ subscribeNext ();
256+
257+ WIP .decrementAndGet (this );
258+
259+ continue outer ;
260+ }
261+
262+ int consumed = 0 ;
263+ Object v ;
264+ while (r > 0 && (v = b .poll ()) != null ) {
265+ nl .accept (child , v );
266+ if (child .isUnsubscribed ()) {
267+ return ;
268+ }
269+ r --;
270+ consumed ++;
271+ }
272+ if (consumed > 0 ) {
273+ sumConsumed += consumed ;
274+ REQUESTED .addAndGet (this , -consumed );
275+ e .requestMore (consumed );
276+ }
277+ if (r == 0 ) {
278+ break outer ;
279+ }
280+ }
281+ if (sumConsumed == 0 ) {
282+ break ;
283+ }
284+ }
285+
286+ if (active .length == 0 ) {
287+ if (wip == 0 ) {
288+ child .onCompleted ();
289+ return ;
290+ }
291+ }
292+ synchronized (this ) {
293+ if (missedEmitting == 0 ) {
294+ emitting = false ;
295+ break ;
296+ }
297+ missedEmitting = 0 ;
298+ }
299+ } while (true );
300+ }
301+ final class MergeItemSubscriber extends Subscriber <T > {
302+ volatile boolean once = true ;
303+ final int index ;
304+ final RxRingBuffer buffer ;
305+
306+ public MergeItemSubscriber (int index ) {
307+ buffer = RxRingBuffer .getSpmcInstance ();
308+ this .index = index ;
309+ }
310+
311+ @ Override
312+ public void onStart () {
313+ request (RxRingBuffer .SIZE );
314+ }
315+
316+ @ Override
317+ public void onNext (T t ) {
318+ try {
319+ buffer .onNext (t );
320+ } catch (MissingBackpressureException ex ) {
321+ onError (ex );
322+ return ;
323+ }
324+
325+ drain ();
326+ }
327+
328+ @ Override
329+ public void onError (Throwable e ) {
330+ SourceSubscriber .this .onError (e );
331+ }
332+
333+ @ Override
334+ public void onCompleted () {
335+ if (once ) {
336+ once = false ;
337+ drain ();
338+ }
339+ }
340+ /** Request more from upstream. */
341+ void requestMore (long n ) {
342+ request (n );
343+ }
344+ void release () {
345+ // NO-OP for now
346+ buffer .release ();
141347 }
142348 }
143349 }
0 commit comments