Skip to content

Commit 201f54d

Browse files
Merge pull request #1446 from benjchristensen/zip-backpressure
Zip with Backpressure Support
2 parents cb75b97 + b068f5d commit 201f54d

File tree

6 files changed

+390
-194
lines changed

6 files changed

+390
-194
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public void onNext(T t) {
4545
child.onNext(false);
4646
child.onCompleted();
4747
unsubscribe();
48+
} else {
49+
// if we drop values we must replace them upstream as downstream won't receive and request more
50+
request(1);
4851
}
4952
}
5053

rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java

Lines changed: 153 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.ConcurrentLinkedQueue;
18+
import java.util.concurrent.atomic.AtomicLong;
1919
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2020

2121
import rx.Observable;
2222
import rx.Observable.Operator;
2323
import rx.Observer;
24+
import rx.Producer;
2425
import rx.Subscriber;
26+
import rx.exceptions.MissingBackpressureException;
2527
import rx.exceptions.OnErrorThrowable;
2628
import rx.functions.Func2;
2729
import rx.functions.Func3;
@@ -33,6 +35,7 @@
3335
import rx.functions.Func9;
3436
import rx.functions.FuncN;
3537
import rx.functions.Functions;
38+
import rx.internal.util.RxRingBuffer;
3639
import rx.subscriptions.CompositeSubscription;
3740

3841
/**
@@ -48,7 +51,9 @@
4851
* <p>
4952
* The resulting Observable returned from zip will invoke <code>onNext</code> as many times as the
5053
* number of <code>onNext</code> invocations of the source Observable that emits the fewest items.
51-
* @param <R> the result type
54+
*
55+
* @param <R>
56+
* the result type
5257
*/
5358
public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {
5459
/*
@@ -104,69 +109,106 @@ public OperatorZip(Func9 f) {
104109

105110
@SuppressWarnings("rawtypes")
106111
@Override
107-
public Subscriber<? super Observable[]> call(final Subscriber<? super R> observer) {
108-
return new Subscriber<Observable[]>(observer) {
112+
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) {
113+
final Zip<R> zipper = new Zip<R>(child, zipFunction);
114+
final ZipProducer<R> producer = new ZipProducer<R>(zipper);
115+
child.setProducer(producer);
116+
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer);
117+
return subscriber;
118+
}
109119

110-
boolean started = false;
120+
private final class ZipSubscriber extends Subscriber<Observable[]> {
111121

112-
@Override
113-
public void onCompleted() {
114-
if (!started) {
115-
// this means we have not received a valid onNext before termination so we emit the onCompleted
116-
observer.onCompleted();
117-
}
118-
}
122+
final Subscriber<? super R> child;
123+
final Zip<R> zipper;
124+
final ZipProducer<R> producer;
119125

120-
@Override
121-
public void onError(Throwable e) {
122-
observer.onError(e);
126+
public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper, ZipProducer<R> producer) {
127+
super(child);
128+
this.child = child;
129+
this.zipper = zipper;
130+
this.producer = producer;
131+
}
132+
133+
boolean started = false;
134+
135+
@Override
136+
public void onCompleted() {
137+
if (!started) {
138+
// this means we have not received a valid onNext before termination so we emit the onCompleted
139+
child.onCompleted();
123140
}
141+
}
124142

125-
@Override
126-
public void onNext(Observable[] observables) {
127-
if (observables == null || observables.length == 0) {
128-
observer.onCompleted();
129-
} else {
130-
started = true;
131-
new Zip<R>(observables, observer, zipFunction).zip();
132-
}
143+
@Override
144+
public void onError(Throwable e) {
145+
child.onError(e);
146+
}
147+
148+
@Override
149+
public void onNext(Observable[] observables) {
150+
if (observables == null || observables.length == 0) {
151+
child.onCompleted();
152+
} else {
153+
started = true;
154+
zipper.start(observables, producer);
133155
}
156+
}
157+
158+
}
159+
160+
private static final class ZipProducer<R> extends AtomicLong implements Producer {
161+
162+
private Zip<R> zipper;
163+
164+
public ZipProducer(Zip<R> zipper) {
165+
this.zipper = zipper;
166+
}
167+
168+
@Override
169+
public void request(long n) {
170+
addAndGet(n);
171+
// try and claim emission if no other threads are doing so
172+
zipper.tick();
173+
}
134174

135-
};
136175
}
137176

138-
static final NotificationLite<Object> on = NotificationLite.instance();
139177
private static final class Zip<R> {
140-
@SuppressWarnings("rawtypes")
141-
final Observable[] os;
142-
final Object[] observers;
143-
final Observer<? super R> observer;
144-
final FuncN<? extends R> zipFunction;
145-
final CompositeSubscription childSubscription = new CompositeSubscription();
178+
private final Observer<? super R> child;
179+
private final FuncN<? extends R> zipFunction;
180+
private final CompositeSubscription childSubscription = new CompositeSubscription();
181+
146182
volatile long counter;
147183
@SuppressWarnings("rawtypes")
148-
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER
149-
= AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");
184+
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");
185+
186+
static final int THRESHOLD = (int) (RxRingBuffer.SIZE * 0.7);
187+
int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block
188+
189+
/* initialized when started in `start` */
190+
private Object[] observers;
191+
private AtomicLong requested;
150192

151193
@SuppressWarnings("rawtypes")
152-
public Zip(Observable[] os, final Subscriber<? super R> observer, FuncN<? extends R> zipFunction) {
153-
this.os = os;
154-
this.observer = observer;
194+
public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
195+
this.child = child;
155196
this.zipFunction = zipFunction;
197+
child.add(childSubscription);
198+
}
199+
200+
@SuppressWarnings("unchecked")
201+
public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) {
156202
observers = new Object[os.length];
203+
this.requested = requested;
157204
for (int i = 0; i < os.length; i++) {
158-
InnerObserver io = new InnerObserver();
205+
InnerSubscriber io = new InnerSubscriber();
159206
observers[i] = io;
160207
childSubscription.add(io);
161208
}
162209

163-
observer.add(childSubscription);
164-
}
165-
166-
@SuppressWarnings("unchecked")
167-
public void zip() {
168210
for (int i = 0; i < os.length; i++) {
169-
os[i].unsafeSubscribe((InnerObserver) observers[i]);
211+
os[i].unsafeSubscribe((InnerSubscriber) observers[i]);
170212
}
171213
}
172214

@@ -179,51 +221,64 @@ public void zip() {
179221
*/
180222
@SuppressWarnings("unchecked")
181223
void tick() {
224+
if (observers == null) {
225+
// nothing yet to do (initial request from Producer)
226+
return;
227+
}
182228
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
183229
do {
184-
final Object[] vs = new Object[observers.length];
185-
boolean allHaveValues = true;
186-
for (int i = 0; i < observers.length; i++) {
187-
Object n = ((InnerObserver) observers[i]).items.peek();
188-
189-
if (n == null) {
190-
allHaveValues = false;
191-
continue;
192-
}
230+
// we only emit if requested > 0
231+
if (requested.get() > 0) {
232+
final Object[] vs = new Object[observers.length];
233+
boolean allHaveValues = true;
234+
for (int i = 0; i < observers.length; i++) {
235+
RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items;
236+
Object n = buffer.peek();
237+
238+
if (n == null) {
239+
allHaveValues = false;
240+
continue;
241+
}
193242

194-
switch (on.kind(n)) {
195-
case OnNext:
196-
vs[i] = on.getValue(n);
197-
break;
198-
case OnCompleted:
199-
observer.onCompleted();
200-
// we need to unsubscribe from all children since children are
201-
// independently subscribed
202-
childSubscription.unsubscribe();
203-
return;
204-
default:
205-
// shouldn't get here
206-
}
207-
}
208-
if (allHaveValues) {
209-
try {
210-
// all have something so emit
211-
observer.onNext(zipFunction.call(vs));
212-
} catch (Throwable e) {
213-
observer.onError(OnErrorThrowable.addValueAsLastCause(e, vs));
214-
return;
215-
}
216-
// now remove them
217-
for (Object obj : observers) {
218-
InnerObserver io = (InnerObserver)obj;
219-
io.items.poll();
220-
// eagerly check if the next item on this queue is an onComplete
221-
if (on.isCompleted(io.items.peek())) {
222-
// it is an onComplete so shut down
223-
observer.onCompleted();
224-
// we need to unsubscribe from all children since children are independently subscribed
243+
if (buffer.isCompleted(n)) {
244+
child.onCompleted();
245+
// we need to unsubscribe from all children since children are
246+
// independently subscribed
225247
childSubscription.unsubscribe();
226248
return;
249+
} else {
250+
vs[i] = buffer.getValue(n);
251+
}
252+
}
253+
if (allHaveValues) {
254+
try {
255+
// all have something so emit
256+
child.onNext(zipFunction.call(vs));
257+
// we emitted so decrement the requested counter
258+
requested.decrementAndGet();
259+
emitted++;
260+
} catch (Throwable e) {
261+
child.onError(OnErrorThrowable.addValueAsLastCause(e, vs));
262+
return;
263+
}
264+
// now remove them
265+
for (Object obj : observers) {
266+
RxRingBuffer buffer = ((InnerSubscriber) obj).items;
267+
buffer.poll();
268+
// eagerly check if the next item on this queue is an onComplete
269+
if (buffer.isCompleted(buffer.peek())) {
270+
// it is an onComplete so shut down
271+
child.onCompleted();
272+
// we need to unsubscribe from all children since children are independently subscribed
273+
childSubscription.unsubscribe();
274+
return;
275+
}
276+
}
277+
if (emitted > THRESHOLD) {
278+
for (Object obj : observers) {
279+
((InnerSubscriber) obj).request(emitted);
280+
}
281+
emitted = 0;
227282
}
228283
}
229284
}
@@ -235,27 +290,36 @@ void tick() {
235290
// used to observe each Observable we are zipping together
236291
// it collects all items in an internal queue
237292
@SuppressWarnings("rawtypes")
238-
final class InnerObserver extends Subscriber {
293+
final class InnerSubscriber extends Subscriber {
239294
// Concurrent* since we need to read it from across threads
240-
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue();
295+
final RxRingBuffer items = RxRingBuffer.getSpmcInstance();
296+
297+
@Override
298+
public void onStart() {
299+
request(RxRingBuffer.SIZE);
300+
}
241301

242302
@SuppressWarnings("unchecked")
243303
@Override
244304
public void onCompleted() {
245-
items.add(on.completed());
305+
items.onCompleted();
246306
tick();
247307
}
248308

249309
@Override
250310
public void onError(Throwable e) {
251-
// emit error and shut down
252-
observer.onError(e);
311+
// emit error immediately and shut down
312+
child.onError(e);
253313
}
254314

255315
@SuppressWarnings("unchecked")
256316
@Override
257317
public void onNext(Object t) {
258-
items.add(on.next(t));
318+
try {
319+
items.onNext(t);
320+
} catch (MissingBackpressureException e) {
321+
onError(e);
322+
}
259323
tick();
260324
}
261325
};

rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,27 +276,37 @@ public Object poll() {
276276
Object o;
277277
o = queue.poll();
278278
/*
279-
* benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll()
279+
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
280280
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
281-
* "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState.
281+
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
282282
*
283283
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
284284
* or needing to enqueue terminalState.
285285
*
286286
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
287287
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
288288
* is currently the way it is.
289-
*
290-
* This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list,
291-
* such as ConcurrentLinkedQueue.
292289
*/
293-
if (o == null && terminalState != null && queue.size() == 0) {
290+
if (o == null && terminalState != null && queue.isEmpty()) {
294291
o = terminalState;
295292
// once emitted we clear so a poll loop will finish
296293
terminalState = null;
297294
}
298295
return o;
299296
}
297+
298+
public Object peek() {
299+
if (queue == null) {
300+
// we are unsubscribed and have released the undelrying queue
301+
return null;
302+
}
303+
Object o;
304+
o = queue.peek();
305+
if (o == null && terminalState != null && queue.isEmpty()) {
306+
o = terminalState;
307+
}
308+
return o;
309+
}
300310

301311
public boolean isCompleted(Object o) {
302312
return on.isCompleted(o);
@@ -306,6 +316,10 @@ public boolean isError(Object o) {
306316
return on.isError(o);
307317
}
308318

319+
public Object getValue(Object o) {
320+
return on.getValue(o);
321+
}
322+
309323
public boolean accept(Object o, Observer child) {
310324
return on.accept(child, o);
311325
}

0 commit comments

Comments
 (0)