Skip to content

Commit 5b2394c

Browse files
authored
1.x: fix Completable.concat to use replace (don't dispose old) (#5696)
* 1.x: fix Completable.concat to use replace (don't dispose old) * Remove original issue comments
1 parent 396b610 commit 5b2394c

File tree

3 files changed

+87
-9
lines changed

3 files changed

+87
-9
lines changed

src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import rx.*;
2222
import rx.Completable.OnSubscribe;
23-
import rx.subscriptions.SerialSubscription;
23+
import rx.internal.subscriptions.SequentialSubscription;
2424

2525
public final class CompletableOnSubscribeConcatArray implements OnSubscribe {
2626
final Completable[] sources;
@@ -45,17 +45,17 @@ static final class ConcatInnerSubscriber extends AtomicInteger implements Comple
4545

4646
int index;
4747

48-
final SerialSubscription sd;
48+
final SequentialSubscription sd;
4949

5050
public ConcatInnerSubscriber(CompletableSubscriber actual, Completable[] sources) {
5151
this.actual = actual;
5252
this.sources = sources;
53-
this.sd = new SerialSubscription();
53+
this.sd = new SequentialSubscription();
5454
}
5555

5656
@Override
5757
public void onSubscribe(Subscription d) {
58-
sd.set(d);
58+
sd.replace(d);
5959
}
6060

6161
@Override

src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
import rx.*;
2323
import rx.Completable.OnSubscribe;
24-
import rx.subscriptions.*;
24+
import rx.internal.subscriptions.SequentialSubscription;
25+
import rx.subscriptions.Subscriptions;
2526

2627
public final class CompletableOnSubscribeConcatIterable implements OnSubscribe {
2728
final Iterable<? extends Completable> sources;
@@ -61,17 +62,17 @@ static final class ConcatInnerSubscriber extends AtomicInteger implements Comple
6162
final CompletableSubscriber actual;
6263
final Iterator<? extends Completable> sources;
6364

64-
final SerialSubscription sd;
65+
final SequentialSubscription sd;
6566

6667
public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends Completable> sources) {
6768
this.actual = actual;
6869
this.sources = sources;
69-
this.sd = new SerialSubscription();
70+
this.sd = new SequentialSubscription();
7071
}
7172

7273
@Override
7374
public void onSubscribe(Subscription d) {
74-
sd.set(d);
75+
sd.replace(d);
7576
}
7677

7778
@Override

src/test/java/rx/internal/operators/CompletableConcatTest.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package rx.internal.operators;
1818

19-
import java.util.concurrent.TimeUnit;
19+
import static org.junit.Assert.assertFalse;
20+
21+
import java.util.Arrays;
22+
import java.util.concurrent.*;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2024

2125
import org.junit.*;
2226

@@ -58,4 +62,77 @@ public void call() {
5862

5963
Assert.assertEquals(5, calls[0]);
6064
}
65+
66+
@Test
67+
public void andThenNoInterrupt() throws InterruptedException {
68+
for (int k = 0; k < 100; k++) {
69+
final int count = 10;
70+
final CountDownLatch latch = new CountDownLatch(count);
71+
final AtomicBoolean interrupted = new AtomicBoolean();
72+
73+
for (int i = 0; i < count; i++) {
74+
Completable.complete()
75+
.subscribeOn(Schedulers.io())
76+
.observeOn(Schedulers.io())
77+
.andThen(Completable.fromAction(new Action0() {
78+
@Override
79+
public void call() {
80+
try {
81+
Thread.sleep(30);
82+
} catch (InterruptedException e) {
83+
System.out.println("Interrupted! " + Thread.currentThread());
84+
interrupted.set(true);
85+
}
86+
}
87+
}))
88+
.subscribe(new Action0() {
89+
@Override
90+
public void call() {
91+
latch.countDown();
92+
}
93+
});
94+
}
95+
96+
latch.await();
97+
assertFalse("The second Completable was interrupted!", interrupted.get());
98+
}
99+
}
100+
101+
@Test
102+
public void noInterrupt() throws InterruptedException {
103+
for (int k = 0; k < 100; k++) {
104+
final int count = 10;
105+
final CountDownLatch latch = new CountDownLatch(count);
106+
final AtomicBoolean interrupted = new AtomicBoolean();
107+
108+
for (int i = 0; i < count; i++) {
109+
Completable c0 = Completable.fromAction(new Action0() {
110+
@Override
111+
public void call() {
112+
try {
113+
Thread.sleep(30);
114+
} catch (InterruptedException e) {
115+
System.out.println("Interrupted! " + Thread.currentThread());
116+
interrupted.set(true);
117+
}
118+
}
119+
});
120+
Completable.concat(Arrays.asList(Completable.complete()
121+
.subscribeOn(Schedulers.io())
122+
.observeOn(Schedulers.io()),
123+
c0)
124+
)
125+
.subscribe(new Action0() {
126+
@Override
127+
public void call() {
128+
latch.countDown();
129+
}
130+
});
131+
}
132+
133+
latch.await();
134+
assertFalse("The second Completable was interrupted!", interrupted.get());
135+
}
136+
}
137+
61138
}

0 commit comments

Comments
 (0)