Skip to content

Commit 90df5da

Browse files
committed
Merge pull request #3630 from akarnokd/ConcatMapEagerInnerNPEFix1x
1.x: ConcatMapEager allow nulls from inner Observables.
2 parents ef1c509 + cef0b91 commit 90df5da

File tree

2 files changed

+27
-8
lines changed

2 files changed

+27
-8
lines changed

src/main/java/rx/internal/operators/OperatorEagerConcatMap.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ void drain() {
166166

167167
final AtomicLong requested = sharedProducer;
168168
final Subscriber<? super R> actualSubscriber = this.actual;
169+
final NotificationLite<R> nl = NotificationLite.instance();
169170

170171
for (;;) {
171172

@@ -200,13 +201,13 @@ void drain() {
200201
long emittedAmount = 0L;
201202
boolean unbounded = requestedAmount == Long.MAX_VALUE;
202203

203-
Queue<R> innerQueue = innerSubscriber.queue;
204+
Queue<Object> innerQueue = innerSubscriber.queue;
204205
boolean innerDone = false;
205206

206207

207208
for (;;) {
208209
outerDone = innerSubscriber.done;
209-
R v = innerQueue.peek();
210+
Object v = innerQueue.peek();
210211
empty = v == null;
211212

212213
if (outerDone) {
@@ -237,7 +238,7 @@ void drain() {
237238
innerQueue.poll();
238239

239240
try {
240-
actualSubscriber.onNext(v);
241+
actualSubscriber.onNext(nl.getValue(v));
241242
} catch (Throwable ex) {
242243
Exceptions.throwOrReport(ex, actualSubscriber, v);
243244
return;
@@ -271,27 +272,29 @@ void drain() {
271272

272273
static final class EagerInnerSubscriber<T> extends Subscriber<T> {
273274
final EagerOuterSubscriber<?, T> parent;
274-
final Queue<T> queue;
275+
final Queue<Object> queue;
276+
final NotificationLite<T> nl;
275277

276278
volatile boolean done;
277279
Throwable error;
278280

279281
public EagerInnerSubscriber(EagerOuterSubscriber<?, T> parent, int bufferSize) {
280282
super();
281283
this.parent = parent;
282-
Queue<T> q;
284+
Queue<Object> q;
283285
if (UnsafeAccess.isUnsafeAvailable()) {
284-
q = new SpscArrayQueue<T>(bufferSize);
286+
q = new SpscArrayQueue<Object>(bufferSize);
285287
} else {
286-
q = new SpscAtomicArrayQueue<T>(bufferSize);
288+
q = new SpscAtomicArrayQueue<Object>(bufferSize);
287289
}
288290
this.queue = q;
291+
this.nl = NotificationLite.instance();
289292
request(bufferSize);
290293
}
291294

292295
@Override
293296
public void onNext(T t) {
294-
queue.offer(t);
297+
queue.offer(nl.next(t));
295298
parent.drain();
296299
}
297300

src/test/java/rx/internal/operators/OperatorEagerConcatMapTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,20 @@ public void call(Integer t) {
394394
ts.assertNotCompleted();
395395
Assert.assertEquals(RxRingBuffer.SIZE, count.get());
396396
}
397+
398+
@Test
399+
public void testInnerNull() {
400+
TestSubscriber<Object> ts = TestSubscriber.create();
401+
402+
Observable.just(1).concatMapEager(new Func1<Integer, Observable<Integer>>() {
403+
@Override
404+
public Observable<Integer> call(Integer t) {
405+
return Observable.just(null);
406+
}
407+
}).subscribe(ts);
408+
409+
ts.assertNoErrors();
410+
ts.assertCompleted();
411+
ts.assertValue(null);
412+
}
397413
}

0 commit comments

Comments
 (0)