Skip to content

Commit d60f0c0

Browse files
committed
Merge pull request #3285 from stealthcode/FixSyncOnSubscribeTest
Added latch to async SyncOnSubscrbeTest
2 parents 1682f64 + 583222d commit d60f0c0

File tree

2 files changed

+156
-132
lines changed

2 files changed

+156
-132
lines changed

src/main/java/rx/observables/SyncOnSubscribe.java

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package rx.observables;
1818

19-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2019
import java.util.concurrent.atomic.AtomicLong;
2120

2221
import rx.Observable.OnSubscribe;
@@ -321,14 +320,9 @@ private static class SubscriptionProducer<S, T>
321320
private final SyncOnSubscribe<S, T> parent;
322321
private boolean onNextCalled;
323322
private boolean hasTerminated;
324-
323+
325324
private S state;
326325

327-
volatile int isUnsubscribed;
328-
@SuppressWarnings("rawtypes")
329-
static final AtomicIntegerFieldUpdater<SubscriptionProducer> IS_UNSUBSCRIBED =
330-
AtomicIntegerFieldUpdater.newUpdater(SubscriptionProducer.class, "isUnsubscribed");
331-
332326
private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubscribe<S, T> parent, S state) {
333327
this.actualSubscriber = subscriber;
334328
this.parent = parent;
@@ -337,14 +331,39 @@ private SubscriptionProducer(final Subscriber<? super T> subscriber, SyncOnSubsc
337331

338332
@Override
339333
public boolean isUnsubscribed() {
340-
return isUnsubscribed != 0;
334+
return get() < 0L;
341335
}
342336

343337
@Override
344338
public void unsubscribe() {
345-
IS_UNSUBSCRIBED.compareAndSet(this, 0, 1);
346-
if (get() == 0L)
347-
parent.onUnsubscribe(state);
339+
while(true) {
340+
long requestCount = get();
341+
if (compareAndSet(0L, -1L)) {
342+
doUnsubscribe();
343+
return;
344+
}
345+
else if (compareAndSet(requestCount, -2L))
346+
// the loop is iterating concurrently
347+
// need to check if requestCount == -1
348+
// and unsub if so after loop iteration
349+
return;
350+
}
351+
}
352+
353+
private boolean tryUnsubscribe() {
354+
// only one thread at a time can iterate over request count
355+
// therefore the requestCount atomic cannot be decrement concurrently here
356+
// safe to set to -1 atomically (since this check can only be done by 1 thread)
357+
if (hasTerminated || get() < -1) {
358+
set(-1);
359+
doUnsubscribe();
360+
return true;
361+
}
362+
return false;
363+
}
364+
365+
private void doUnsubscribe() {
366+
parent.onUnsubscribe(state);
348367
}
349368

350369
@Override
@@ -358,71 +377,60 @@ public void request(long n) {
358377
}
359378
}
360379

361-
void fastpath() {
380+
private void fastpath() {
362381
final SyncOnSubscribe<S, T> p = parent;
363382
Subscriber<? super T> a = actualSubscriber;
364383

365-
if (isUnsubscribed()) {
366-
p.onUnsubscribe(state);
367-
return;
368-
}
369-
370384
for (;;) {
371385
try {
372386
onNextCalled = false;
373387
nextIteration(p);
374388
} catch (Throwable ex) {
375-
handleThrownError(p, a, state, ex);
389+
handleThrownError(a, ex);
376390
return;
377391
}
378-
if (hasTerminated || isUnsubscribed()) {
379-
p.onUnsubscribe(state);
392+
if (tryUnsubscribe()) {
380393
return;
381394
}
382395
}
383396
}
384397

385-
private void handleThrownError(final SyncOnSubscribe<S, T> p, Subscriber<? super T> a, S st, Throwable ex) {
398+
private void handleThrownError(Subscriber<? super T> a, Throwable ex) {
386399
if (hasTerminated) {
387400
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
388401
} else {
389402
hasTerminated = true;
390403
a.onError(ex);
391-
p.onUnsubscribe(st);
404+
unsubscribe();
392405
}
393406
}
394407

395-
void slowPath(long n) {
408+
private void slowPath(long n) {
396409
final SyncOnSubscribe<S, T> p = parent;
397410
Subscriber<? super T> a = actualSubscriber;
398411
long numRequested = n;
399412
for (;;) {
400-
if (isUnsubscribed()) {
401-
p.onUnsubscribe(state);
402-
return;
403-
}
404413
long numRemaining = numRequested;
405414
do {
406415
try {
407416
onNextCalled = false;
408417
nextIteration(p);
409418
} catch (Throwable ex) {
410-
handleThrownError(p, a, state, ex);
419+
handleThrownError(a, ex);
411420
return;
412421
}
413-
if (hasTerminated || isUnsubscribed()) {
414-
p.onUnsubscribe(state);
422+
if (tryUnsubscribe()) {
415423
return;
416424
}
417425
if (onNextCalled)
418426
numRemaining--;
419427
} while (numRemaining != 0L);
420-
421428
numRequested = addAndGet(-numRequested);
422-
if (numRequested == 0L) {
429+
if (numRequested <= 0L)
423430
break;
424-
}
425431
}
432+
// catches cases where unsubscribe is called before decrementing atomic request count
433+
tryUnsubscribe();
426434
}
427435

428436
private void nextIteration(final SyncOnSubscribe<S, T> parent) {

0 commit comments

Comments
 (0)