@@ -99,7 +99,7 @@ public void call() {
9999 public void connect (Action1 <? super Subscription > connection ) {
100100 // each time we connect we create a new Subscription
101101 boolean shouldSubscribe = false ;
102-
102+
103103 // subscription is the state of whether we are connected or not
104104 OriginSubscriber <T > origin = requestHandler .state .getOrigin ();
105105 if (origin == null ) {
@@ -113,7 +113,7 @@ public void connect(Action1<? super Subscription> connection) {
113113 connection .call (Subscriptions .create (new Action0 () {
114114 @ Override
115115 public void call () {
116- Subscription s = requestHandler .state .getOrigin ();
116+ OriginSubscriber < T > s = requestHandler .state .getOrigin ();
117117 requestHandler .state .setOrigin (null );
118118 if (s != null ) {
119119 s .unsubscribe ();
@@ -135,9 +135,11 @@ private static class OriginSubscriber<T> extends Subscriber<T> {
135135 private final RequestHandler <T > requestHandler ;
136136 private final AtomicLong originOutstanding = new AtomicLong ();
137137 private final long THRESHOLD = RxRingBuffer .SIZE / 4 ;
138+ private final RxRingBuffer buffer = RxRingBuffer .getSpmcInstance ();
138139
139140 OriginSubscriber (RequestHandler <T > requestHandler ) {
140141 this .requestHandler = requestHandler ;
142+ add (buffer );
141143 }
142144
143145 @ Override
@@ -199,6 +201,8 @@ public void onNext(T t) {
199201 * with a complicated state machine so I'm sticking with mutex locks and just trying to make sure the work done while holding the
200202 * lock is small (such as never emitting data).
201203 *
204+ * This does however mean we can't rely on a reference to State being consistent. For example, it can end up with a null OriginSubscriber.
205+ *
202206 * @param <T>
203207 */
204208 private static class State <T > {
@@ -288,7 +292,7 @@ private long resetAfterSubscriberUpdate() {
288292
289293 private static class RequestHandler <T > {
290294 private final NotificationLite <T > notifier = NotificationLite .instance ();
291- private final RxRingBuffer buffer = RxRingBuffer . getSpmcInstance ();
295+
292296 private final State <T > state = new State <T >();
293297 @ SuppressWarnings ("unused" )
294298 volatile long wip ;
@@ -297,16 +301,24 @@ private static class RequestHandler<T> {
297301
298302 public void requestFromChildSubscriber (Subscriber <? super T > subscriber , Long request ) {
299303 state .requestFromSubscriber (subscriber , request );
300- drainQueue ();
304+ OriginSubscriber <T > originSubscriber = state .getOrigin ();
305+ if (originSubscriber != null ) {
306+ drainQueue (originSubscriber );
307+ }
301308 }
302309
303310 public void emit (Object t ) throws MissingBackpressureException {
311+ OriginSubscriber <T > originSubscriber = state .getOrigin ();
312+ if (originSubscriber == null ) {
313+ // unsubscribed so break ... we are done
314+ return ;
315+ }
304316 if (notifier .isCompleted (t )) {
305- buffer .onCompleted ();
317+ originSubscriber . buffer .onCompleted ();
306318 } else {
307- buffer .onNext (notifier .getValue (t ));
319+ originSubscriber . buffer .onNext (notifier .getValue (t ));
308320 }
309- drainQueue ();
321+ drainQueue (originSubscriber );
310322 }
311323
312324 private void requestMoreAfterEmission (int emitted ) {
@@ -319,7 +331,7 @@ private void requestMoreAfterEmission(int emitted) {
319331 }
320332 }
321333
322- public void drainQueue () {
334+ public void drainQueue (OriginSubscriber < T > originSubscriber ) {
323335 if (WIP .getAndIncrement (this ) == 0 ) {
324336 int emitted = 0 ;
325337 do {
@@ -338,7 +350,7 @@ public void drainQueue() {
338350 if (!shouldEmit ) {
339351 break ;
340352 }
341- Object o = buffer .poll ();
353+ Object o = originSubscriber . buffer .poll ();
342354 if (o == null ) {
343355 // nothing in buffer so increment outstanding back again
344356 state .incrementOutstandingAfterFailedEmit ();
0 commit comments