Skip to content

Commit 98ee8bc

Browse files
authored
2.x: fix Completable.concat to use replace (don't dispose old) (#5695)
* 2.x: fix Completable.concat to use replace (don't dispose old) * Remove comments from original issue report
1 parent 283ca57 commit 98ee8bc

File tree

4 files changed

+82
-3
lines changed

4 files changed

+82
-3
lines changed

src/main/java/io/reactivex/internal/operators/completable/CompletableConcatArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ static final class ConcatInnerObserver extends AtomicInteger implements Completa
5252

5353
@Override
5454
public void onSubscribe(Disposable d) {
55-
sd.update(d);
55+
sd.replace(d);
5656
}
5757

5858
@Override

src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ static final class ConcatInnerObserver extends AtomicInteger implements Completa
6464

6565
@Override
6666
public void onSubscribe(Disposable d) {
67-
sd.update(d);
67+
sd.replace(d);
6868
}
6969

7070
@Override

src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515

1616
import io.reactivex.Completable;
1717
import io.reactivex.Maybe;
18+
import io.reactivex.functions.Action;
19+
import io.reactivex.schedulers.Schedulers;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
1823
import org.junit.Test;
24+
import static org.junit.Assert.*;
1925

2026
public class CompletableAndThenTest {
2127
@Test(expected = NullPointerException.class)
@@ -63,4 +69,39 @@ public void andThenMaybeError() {
6369
.assertError(RuntimeException.class)
6470
.assertErrorMessage("bla");
6571
}
72+
73+
@Test
74+
public void andThenNoInterrupt() throws InterruptedException {
75+
for (int k = 0; k < 100; k++) {
76+
final int count = 10;
77+
final CountDownLatch latch = new CountDownLatch(count);
78+
final boolean[] interrupted = { false };
79+
80+
for (int i = 0; i < count; i++) {
81+
Completable.complete()
82+
.subscribeOn(Schedulers.io())
83+
.observeOn(Schedulers.io())
84+
.andThen(Completable.fromAction(new Action() {
85+
@Override
86+
public void run() throws Exception {
87+
try {
88+
Thread.sleep(30);
89+
} catch (InterruptedException e) {
90+
System.out.println("Interrupted! " + Thread.currentThread());
91+
interrupted[0] = true;
92+
}
93+
}
94+
}))
95+
.subscribe(new Action() {
96+
@Override
97+
public void run() throws Exception {
98+
latch.countDown();
99+
}
100+
});
101+
}
102+
103+
latch.await();
104+
assertFalse("The second Completable was interrupted!", interrupted[0]);
105+
}
106+
}
66107
}

src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.CountDownLatch;
1920

2021
import org.junit.Test;
2122
import org.reactivestreams.*;
2223

2324
import io.reactivex.*;
2425
import io.reactivex.disposables.Disposables;
2526
import io.reactivex.exceptions.*;
26-
import io.reactivex.functions.Function;
27+
import io.reactivex.functions.*;
2728
import io.reactivex.internal.subscriptions.BooleanSubscription;
2829
import io.reactivex.observers.*;
2930
import io.reactivex.plugins.RxJavaPlugins;
@@ -254,4 +255,41 @@ public void run() {
254255
TestHelper.race(r1, r2, Schedulers.single());
255256
}
256257
}
258+
259+
@Test
260+
public void noInterrupt() throws InterruptedException {
261+
for (int k = 0; k < 100; k++) {
262+
final int count = 10;
263+
final CountDownLatch latch = new CountDownLatch(count);
264+
final boolean[] interrupted = { false };
265+
266+
for (int i = 0; i < count; i++) {
267+
Completable c0 = Completable.fromAction(new Action() {
268+
@Override
269+
public void run() throws Exception {
270+
try {
271+
Thread.sleep(30);
272+
} catch (InterruptedException e) {
273+
System.out.println("Interrupted! " + Thread.currentThread());
274+
interrupted[0] = true;
275+
}
276+
}
277+
});
278+
Completable.concat(Arrays.asList(Completable.complete()
279+
.subscribeOn(Schedulers.io())
280+
.observeOn(Schedulers.io()),
281+
c0)
282+
)
283+
.subscribe(new Action() {
284+
@Override
285+
public void run() throws Exception {
286+
latch.countDown();
287+
}
288+
});
289+
}
290+
291+
latch.await();
292+
assertFalse("The second Completable was interrupted!", interrupted[0]);
293+
}
294+
}
257295
}

0 commit comments

Comments
 (0)