Skip to content

Commit 5a93133

Browse files
committed
Scan backpressure and first emission fix
1 parent adfabec commit 5a93133

File tree

2 files changed

+321
-88
lines changed

2 files changed

+321
-88
lines changed

src/main/java/rx/internal/operators/OperatorScan.java

Lines changed: 271 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.Queue;
1919

20+
import rx.*;
2021
import rx.Observable.Operator;
21-
import rx.Producer;
22-
import rx.Subscriber;
23-
import rx.exceptions.Exceptions;
24-
import rx.exceptions.OnErrorThrowable;
25-
import rx.functions.Func0;
26-
import rx.functions.Func2;
22+
import rx.exceptions.*;
23+
import rx.functions.*;
24+
import rx.internal.util.atomic.SpscLinkedAtomicQueue;
25+
import rx.internal.util.unsafe.*;
2726

2827
/**
2928
* Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds
@@ -87,87 +86,290 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {
8786

8887
@Override
8988
public Subscriber<? super T> call(final Subscriber<? super R> child) {
90-
return new Subscriber<T>(child) {
91-
private final R initialValue = initialValueFactory.call();
89+
final R initialValue = initialValueFactory.call();
90+
91+
if (initialValue == NO_INITIAL_VALUE) {
92+
return new Subscriber<T>(child) {
93+
boolean once;
94+
R value;
95+
@SuppressWarnings("unchecked")
96+
@Override
97+
public void onNext(T t) {
98+
R v;
99+
if (!once) {
100+
once = true;
101+
v = (R)t;
102+
} else {
103+
v = value;
104+
try {
105+
v = accumulator.call(v, t);
106+
} catch (Throwable e) {
107+
Exceptions.throwIfFatal(e);
108+
child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
109+
return;
110+
}
111+
}
112+
value = v;
113+
child.onNext(v);
114+
}
115+
@Override
116+
public void onError(Throwable e) {
117+
child.onError(e);
118+
}
119+
@Override
120+
public void onCompleted() {
121+
child.onCompleted();
122+
}
123+
};
124+
}
125+
126+
final InitialProducer<R> ip = new InitialProducer<R>(initialValue, child);
127+
128+
Subscriber<T> parent = new Subscriber<T>() {
92129
private R value = initialValue;
93-
boolean initialized = false;
94130

95-
@SuppressWarnings("unchecked")
96131
@Override
97132
public void onNext(T currentValue) {
98-
emitInitialValueIfNeeded(child);
99-
100-
if (this.value == NO_INITIAL_VALUE) {
101-
// if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R
102-
this.value = (R) currentValue;
103-
} else {
104-
try {
105-
this.value = accumulator.call(this.value, currentValue);
106-
} catch (Throwable e) {
107-
Exceptions.throwIfFatal(e);
108-
child.onError(OnErrorThrowable.addValueAsLastCause(e, currentValue));
109-
return;
110-
}
133+
R v = value;
134+
try {
135+
v = accumulator.call(v, currentValue);
136+
} catch (Throwable e) {
137+
Exceptions.throwIfFatal(e);
138+
onError(OnErrorThrowable.addValueAsLastCause(e, currentValue));
139+
return;
111140
}
112-
child.onNext(this.value);
141+
value = v;
142+
ip.onNext(v);
113143
}
114144

115145
@Override
116146
public void onError(Throwable e) {
117-
child.onError(e);
147+
ip.onError(e);
118148
}
119149

120150
@Override
121151
public void onCompleted() {
122-
emitInitialValueIfNeeded(child);
123-
child.onCompleted();
124-
}
125-
126-
private void emitInitialValueIfNeeded(final Subscriber<? super R> child) {
127-
if (!initialized) {
128-
initialized = true;
129-
// we emit first time through if we have an initial value
130-
if (initialValue != NO_INITIAL_VALUE) {
131-
child.onNext(initialValue);
132-
}
133-
}
152+
ip.onCompleted();
134153
}
135154

136-
/**
137-
* We want to adjust the requested value by subtracting 1 if we have an initial value
138-
*/
139155
@Override
140156
public void setProducer(final Producer producer) {
141-
child.setProducer(new Producer() {
142-
143-
final AtomicBoolean once = new AtomicBoolean();
144-
145-
final AtomicBoolean excessive = new AtomicBoolean();
146-
147-
@Override
148-
public void request(long n) {
149-
if (once.compareAndSet(false, true)) {
150-
if (initialValue == NO_INITIAL_VALUE || n == Long.MAX_VALUE) {
151-
producer.request(n);
152-
} else if (n == 1) {
153-
excessive.set(true);
154-
producer.request(1); // request at least 1
155-
} else {
156-
// n != Long.MAX_VALUE && n != 1
157-
producer.request(n - 1);
158-
}
159-
} else {
160-
// pass-thru after first time
161-
if (n > 1 // avoid to request 0
162-
&& excessive.compareAndSet(true, false) && n != Long.MAX_VALUE) {
163-
producer.request(n - 1);
164-
} else {
165-
producer.request(n);
166-
}
157+
ip.setProducer(producer);
158+
}
159+
};
160+
161+
child.add(parent);
162+
child.setProducer(ip);
163+
return parent;
164+
}
165+
166+
static final class InitialProducer<R> implements Producer, Observer<R> {
167+
final Subscriber<? super R> child;
168+
final Queue<Object> queue;
169+
170+
boolean emitting;
171+
/** Missed a terminal event. */
172+
boolean missed;
173+
/** Missed a request. */
174+
long missedRequested;
175+
/** Missed a producer. */
176+
Producer missedProducer;
177+
/** The current requested amount. */
178+
long requested;
179+
/** The current producer. */
180+
Producer producer;
181+
182+
volatile boolean done;
183+
Throwable error;
184+
185+
public InitialProducer(R initialValue, Subscriber<? super R> child) {
186+
this.child = child;
187+
Queue<Object> q;
188+
// TODO switch to the linked-array based queue once available
189+
if (UnsafeAccess.isUnsafeAvailable()) {
190+
q = new SpscLinkedQueue<Object>(); // new SpscUnboundedArrayQueue<R>(8);
191+
} else {
192+
q = new SpscLinkedAtomicQueue<Object>(); // new SpscUnboundedAtomicArrayQueue<R>(8);
193+
}
194+
this.queue = q;
195+
q.offer(initialValue);
196+
}
197+
198+
@Override
199+
public void request(long n) {
200+
if (n < 0L) {
201+
throw new IllegalArgumentException("n >= required but it was " + n);
202+
} else
203+
if (n != 0L) {
204+
synchronized (this) {
205+
if (emitting) {
206+
long mr = missedRequested;
207+
long mu = mr + n;
208+
if (mu < 0L) {
209+
mu = Long.MAX_VALUE;
167210
}
211+
missedRequested = mu;
212+
return;
168213
}
169-
});
214+
emitting = true;
215+
}
216+
217+
long r = requested;
218+
long u = r + n;
219+
if (u < 0L) {
220+
u = Long.MAX_VALUE;
221+
}
222+
requested = u;
223+
224+
Producer p = producer;
225+
if (p != null) {
226+
p.request(n);
227+
}
228+
229+
emitLoop();
170230
}
171-
};
231+
}
232+
233+
@Override
234+
public void onNext(R t) {
235+
queue.offer(NotificationLite.instance().next(t));
236+
emit();
237+
}
238+
239+
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super R> child) {
240+
if (child.isUnsubscribed()) {
241+
return true;
242+
}
243+
if (d) {
244+
Throwable err = error;
245+
if (err != null) {
246+
child.onError(err);
247+
return true;
248+
} else
249+
if (empty) {
250+
child.onCompleted();
251+
return true;
252+
}
253+
}
254+
return false;
255+
}
256+
257+
@Override
258+
public void onError(Throwable e) {
259+
error = e;
260+
done = true;
261+
emit();
262+
}
263+
264+
@Override
265+
public void onCompleted() {
266+
done = true;
267+
emit();
268+
}
269+
270+
public void setProducer(Producer p) {
271+
if (p == null) {
272+
throw new NullPointerException();
273+
}
274+
synchronized (this) {
275+
if (emitting) {
276+
missedProducer = p;
277+
return;
278+
}
279+
emitting = true;
280+
}
281+
producer = p;
282+
long r = requested;
283+
if (r != 0L) {
284+
p.request(r);
285+
}
286+
emitLoop();
287+
}
288+
289+
void emit() {
290+
synchronized (this) {
291+
if (emitting) {
292+
missed = true;
293+
return;
294+
}
295+
emitting = true;
296+
}
297+
emitLoop();
298+
}
299+
300+
void emitLoop() {
301+
final Subscriber<? super R> child = this.child;
302+
final Queue<Object> queue = this.queue;
303+
final NotificationLite<R> nl = NotificationLite.instance();
304+
long r = requested;
305+
for (;;) {
306+
boolean max = r == Long.MAX_VALUE;
307+
boolean d = done;
308+
boolean empty = queue.isEmpty();
309+
if (checkTerminated(d, empty, child)) {
310+
return;
311+
}
312+
while (r != 0L) {
313+
d = done;
314+
Object o = queue.poll();
315+
empty = o == null;
316+
if (checkTerminated(d, empty, child)) {
317+
return;
318+
}
319+
if (empty) {
320+
break;
321+
}
322+
R v = nl.getValue(o);
323+
try {
324+
child.onNext(v);
325+
} catch (Throwable e) {
326+
Exceptions.throwIfFatal(e);
327+
child.onError(OnErrorThrowable.addValueAsLastCause(e, v));
328+
return;
329+
}
330+
if (!max) {
331+
r--;
332+
}
333+
}
334+
if (!max) {
335+
requested = r;
336+
}
337+
338+
Producer p;
339+
long mr;
340+
synchronized (this) {
341+
p = missedProducer;
342+
mr = missedRequested;
343+
if (!missed && p == null && mr == 0L) {
344+
emitting = false;
345+
return;
346+
}
347+
missed = false;
348+
missedProducer = null;
349+
missedRequested = 0L;
350+
}
351+
352+
if (mr != 0L && !max) {
353+
long u = r + mr;
354+
if (u < 0L) {
355+
u = Long.MAX_VALUE;
356+
}
357+
requested = u;
358+
r = u;
359+
}
360+
361+
if (p != null) {
362+
producer = p;
363+
if (r != 0L) {
364+
p.request(r);
365+
}
366+
} else {
367+
p = producer;
368+
if (p != null && mr != 0L) {
369+
p.request(mr);
370+
}
371+
}
372+
}
373+
}
172374
}
173375
}

0 commit comments

Comments
 (0)