Skip to content

Commit aeb879a

Browse files
committed
Handle concurrent unsubscription in drain (avoid NPE).
1 parent 77c5643 commit aeb879a

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

src/main/java/rx/internal/operators/OperatorPublish.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,10 @@ public void drainQueue(OriginSubscriber<T> originSubscriber) {
367367

368368
for (Subscriber<? super T> s : localState.getSubscribers()) {
369369
AtomicLong req = localMap.get(s);
370-
nl.accept(s, o);
371-
req.decrementAndGet();
370+
if (req != null) { // null req indicates a concurrent unsubscription happened
371+
nl.accept(s, o);
372+
req.decrementAndGet();
373+
}
372374
}
373375
emitted++;
374376
}

0 commit comments

Comments
 (0)