16
16
17
17
package rx .observables ;
18
18
19
- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
20
19
import java .util .concurrent .atomic .AtomicLong ;
21
20
22
21
import rx .Observable .OnSubscribe ;
@@ -321,14 +320,9 @@ private static class SubscriptionProducer<S, T>
321
320
private final SyncOnSubscribe <S , T > parent ;
322
321
private boolean onNextCalled ;
323
322
private boolean hasTerminated ;
324
-
323
+
325
324
private S state ;
326
325
327
- volatile int isUnsubscribed ;
328
- @ SuppressWarnings ("rawtypes" )
329
- static final AtomicIntegerFieldUpdater <SubscriptionProducer > IS_UNSUBSCRIBED =
330
- AtomicIntegerFieldUpdater .newUpdater (SubscriptionProducer .class , "isUnsubscribed" );
331
-
332
326
private SubscriptionProducer (final Subscriber <? super T > subscriber , SyncOnSubscribe <S , T > parent , S state ) {
333
327
this .actualSubscriber = subscriber ;
334
328
this .parent = parent ;
@@ -337,14 +331,39 @@ private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubsc
337
331
338
332
@ Override
339
333
public boolean isUnsubscribed () {
340
- return isUnsubscribed != 0 ;
334
+ return get () < 0L ;
341
335
}
342
336
343
337
@ Override
344
338
public void unsubscribe () {
345
- IS_UNSUBSCRIBED .compareAndSet (this , 0 , 1 );
346
- if (get () == 0L )
347
- parent .onUnsubscribe (state );
339
+ while (true ) {
340
+ long requestCount = get ();
341
+ if (compareAndSet (0L , -1L )) {
342
+ doUnsubscribe ();
343
+ return ;
344
+ }
345
+ else if (compareAndSet (requestCount , -2L ))
346
+ // the loop is iterating concurrently
347
+ // need to check if requestCount == -1
348
+ // and unsub if so after loop iteration
349
+ return ;
350
+ }
351
+ }
352
+
353
+ private boolean tryUnsubscribe () {
354
+ // only one thread at a time can iterate over request count
355
+ // therefore the requestCount atomic cannot be decrement concurrently here
356
+ // safe to set to -1 atomically (since this check can only be done by 1 thread)
357
+ if (hasTerminated || get () < -1 ) {
358
+ set (-1 );
359
+ doUnsubscribe ();
360
+ return true ;
361
+ }
362
+ return false ;
363
+ }
364
+
365
+ private void doUnsubscribe () {
366
+ parent .onUnsubscribe (state );
348
367
}
349
368
350
369
@ Override
@@ -358,71 +377,60 @@ public void request(long n) {
358
377
}
359
378
}
360
379
361
- void fastpath () {
380
+ private void fastpath () {
362
381
final SyncOnSubscribe <S , T > p = parent ;
363
382
Subscriber <? super T > a = actualSubscriber ;
364
383
365
- if (isUnsubscribed ()) {
366
- p .onUnsubscribe (state );
367
- return ;
368
- }
369
-
370
384
for (;;) {
371
385
try {
372
386
onNextCalled = false ;
373
387
nextIteration (p );
374
388
} catch (Throwable ex ) {
375
- handleThrownError (p , a , state , ex );
389
+ handleThrownError (a , ex );
376
390
return ;
377
391
}
378
- if (hasTerminated || isUnsubscribed ()) {
379
- p .onUnsubscribe (state );
392
+ if (tryUnsubscribe ()) {
380
393
return ;
381
394
}
382
395
}
383
396
}
384
397
385
- private void handleThrownError (final SyncOnSubscribe < S , T > p , Subscriber <? super T > a , S st , Throwable ex ) {
398
+ private void handleThrownError (Subscriber <? super T > a , Throwable ex ) {
386
399
if (hasTerminated ) {
387
400
RxJavaPlugins .getInstance ().getErrorHandler ().handleError (ex );
388
401
} else {
389
402
hasTerminated = true ;
390
403
a .onError (ex );
391
- p . onUnsubscribe ( st );
404
+ unsubscribe ( );
392
405
}
393
406
}
394
407
395
- void slowPath (long n ) {
408
+ private void slowPath (long n ) {
396
409
final SyncOnSubscribe <S , T > p = parent ;
397
410
Subscriber <? super T > a = actualSubscriber ;
398
411
long numRequested = n ;
399
412
for (;;) {
400
- if (isUnsubscribed ()) {
401
- p .onUnsubscribe (state );
402
- return ;
403
- }
404
413
long numRemaining = numRequested ;
405
414
do {
406
415
try {
407
416
onNextCalled = false ;
408
417
nextIteration (p );
409
418
} catch (Throwable ex ) {
410
- handleThrownError (p , a , state , ex );
419
+ handleThrownError (a , ex );
411
420
return ;
412
421
}
413
- if (hasTerminated || isUnsubscribed ()) {
414
- p .onUnsubscribe (state );
422
+ if (tryUnsubscribe ()) {
415
423
return ;
416
424
}
417
425
if (onNextCalled )
418
426
numRemaining --;
419
427
} while (numRemaining != 0L );
420
-
421
428
numRequested = addAndGet (-numRequested );
422
- if (numRequested == 0L ) {
429
+ if (numRequested <= 0L )
423
430
break ;
424
- }
425
431
}
432
+ // catches cases where unsubscribe is called before decrementing atomic request count
433
+ tryUnsubscribe ();
426
434
}
427
435
428
436
private void nextIteration (final SyncOnSubscribe <S , T > parent ) {
0 commit comments