Skip to content

Commit 52dee7d

Browse files
authored
2.x: Fix concurrent clear in observeOn while output-fused (#6710)
1 parent 3cb610f commit 52dee7d

File tree

4 files changed

+71
-3
lines changed

4 files changed

+71
-3
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public final void cancel() {
154154
upstream.cancel();
155155
worker.dispose();
156156

157-
if (getAndIncrement() == 0) {
157+
if (!outputFused && getAndIncrement() == 0) {
158158
queue.clear();
159159
}
160160
}

src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void dispose() {
145145
disposed = true;
146146
upstream.dispose();
147147
worker.dispose();
148-
if (getAndIncrement() == 0) {
148+
if (!outputFused && getAndIncrement() == 0) {
149149
queue.clear();
150150
}
151151
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -34,6 +35,7 @@
3435
import io.reactivex.internal.operators.flowable.FlowableObserveOn.BaseObserveOnSubscriber;
3536
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3637
import io.reactivex.internal.subscriptions.BooleanSubscription;
38+
import io.reactivex.observers.TestObserver;
3739
import io.reactivex.plugins.RxJavaPlugins;
3840
import io.reactivex.processors.*;
3941
import io.reactivex.schedulers.*;
@@ -1940,4 +1942,37 @@ public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() {
19401942

19411943
assertEquals(1, s.disposedCount.get());
19421944
}
1945+
1946+
@Test
1947+
public void fusedNoConcurrentCleanDueToCancel() {
1948+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
1949+
List<Throwable> errors = TestHelper.trackPluginErrors();
1950+
try {
1951+
final UnicastProcessor<Integer> up = UnicastProcessor.create();
1952+
1953+
TestObserver<Integer> to = up.hide()
1954+
.observeOn(Schedulers.io())
1955+
.observeOn(Schedulers.single())
1956+
.unsubscribeOn(Schedulers.computation())
1957+
.firstOrError()
1958+
.test();
1959+
1960+
for (int i = 0; up.hasSubscribers() && i < 10000; i++) {
1961+
up.onNext(i);
1962+
}
1963+
1964+
to
1965+
.awaitDone(5, TimeUnit.SECONDS)
1966+
;
1967+
1968+
if (!errors.isEmpty()) {
1969+
throw new CompositeException(errors);
1970+
}
1971+
1972+
to.assertResult(0);
1973+
} finally {
1974+
RxJavaPlugins.reset();
1975+
}
1976+
}
1977+
}
19431978
}

src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -28,7 +29,7 @@
2829
import io.reactivex.Observer;
2930
import io.reactivex.annotations.Nullable;
3031
import io.reactivex.disposables.*;
31-
import io.reactivex.exceptions.TestException;
32+
import io.reactivex.exceptions.*;
3233
import io.reactivex.functions.*;
3334
import io.reactivex.internal.fuseable.*;
3435
import io.reactivex.internal.operators.flowable.FlowableObserveOnTest.DisposeTrackingScheduler;
@@ -813,4 +814,36 @@ public void workerNotDisposedPrematurelyNormalInAsyncOut() {
813814
assertEquals(1, s.disposedCount.get());
814815
}
815816

817+
@Test
818+
public void fusedNoConcurrentCleanDueToCancel() {
819+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
820+
List<Throwable> errors = TestHelper.trackPluginErrors();
821+
try {
822+
final UnicastSubject<Integer> us = UnicastSubject.create();
823+
824+
TestObserver<Integer> to = us.hide()
825+
.observeOn(Schedulers.io())
826+
.observeOn(Schedulers.single())
827+
.unsubscribeOn(Schedulers.computation())
828+
.firstOrError()
829+
.test();
830+
831+
for (int i = 0; us.hasObservers() && i < 10000; i++) {
832+
us.onNext(i);
833+
}
834+
835+
to
836+
.awaitDone(5, TimeUnit.SECONDS)
837+
;
838+
839+
if (!errors.isEmpty()) {
840+
throw new CompositeException(errors);
841+
}
842+
843+
to.assertResult(0);
844+
} finally {
845+
RxJavaPlugins.reset();
846+
}
847+
}
848+
}
816849
}

0 commit comments

Comments
 (0)