Skip to content

Commit 1a774e6

Browse files
authored
2.x: Fix cancel/dispose upon upstream switch for some operators (#6258)
* 2.x: Fix cancel/dispose upon upstream switch for some operators * Restore star imports
1 parent 1406637 commit 1a774e6

30 files changed

+920
-98
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatArray.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ static final class ConcatArraySubscriber<T> extends SubscriptionArbiter implemen
5959
long produced;
6060

6161
ConcatArraySubscriber(Publisher<? extends T>[] sources, boolean delayError, Subscriber<? super T> downstream) {
62+
super(false);
6263
this.downstream = downstream;
6364
this.sources = sources;
6465
this.delayError = delayError;

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ static final class ConcatMapInner<R>
571571
long produced;
572572

573573
ConcatMapInner(ConcatMapSupport<R> parent) {
574+
super(false);
574575
this.parent = parent;
575576
}
576577

src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
*/
1313
package io.reactivex.internal.operators.flowable;
1414

15+
import java.util.concurrent.atomic.*;
16+
1517
import org.reactivestreams.*;
1618

1719
import io.reactivex.*;
18-
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
20+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
1921
import io.reactivex.plugins.RxJavaPlugins;
2022

2123
/**
@@ -35,94 +37,105 @@ public FlowableDelaySubscriptionOther(Publisher<? extends T> main, Publisher<U>
3537

3638
@Override
3739
public void subscribeActual(final Subscriber<? super T> child) {
38-
final SubscriptionArbiter serial = new SubscriptionArbiter();
39-
child.onSubscribe(serial);
40+
MainSubscriber<T> parent = new MainSubscriber<T>(child, main);
41+
child.onSubscribe(parent);
42+
other.subscribe(parent.other);
43+
}
4044

41-
FlowableSubscriber<U> otherSubscriber = new DelaySubscriber(serial, child);
45+
static final class MainSubscriber<T> extends AtomicLong implements FlowableSubscriber<T>, Subscription {
4246

43-
other.subscribe(otherSubscriber);
44-
}
47+
private static final long serialVersionUID = 2259811067697317255L;
48+
49+
final Subscriber<? super T> downstream;
4550

46-
final class DelaySubscriber implements FlowableSubscriber<U> {
47-
final SubscriptionArbiter serial;
48-
final Subscriber<? super T> child;
49-
boolean done;
51+
final Publisher<? extends T> main;
5052

51-
DelaySubscriber(SubscriptionArbiter serial, Subscriber<? super T> child) {
52-
this.serial = serial;
53-
this.child = child;
53+
final OtherSubscriber other;
54+
55+
final AtomicReference<Subscription> upstream;
56+
57+
MainSubscriber(Subscriber<? super T> downstream, Publisher<? extends T> main) {
58+
this.downstream = downstream;
59+
this.main = main;
60+
this.other = new OtherSubscriber();
61+
this.upstream = new AtomicReference<Subscription>();
5462
}
5563

56-
@Override
57-
public void onSubscribe(final Subscription s) {
58-
serial.setSubscription(new DelaySubscription(s));
59-
s.request(Long.MAX_VALUE);
64+
void next() {
65+
main.subscribe(this);
6066
}
6167

6268
@Override
63-
public void onNext(U t) {
64-
onComplete();
69+
public void onNext(T t) {
70+
downstream.onNext(t);
6571
}
6672

6773
@Override
68-
public void onError(Throwable e) {
69-
if (done) {
70-
RxJavaPlugins.onError(e);
71-
return;
72-
}
73-
done = true;
74-
child.onError(e);
74+
public void onError(Throwable t) {
75+
downstream.onError(t);
7576
}
7677

7778
@Override
7879
public void onComplete() {
79-
if (done) {
80-
return;
81-
}
82-
done = true;
83-
84-
main.subscribe(new OnCompleteSubscriber());
80+
downstream.onComplete();
8581
}
8682

87-
final class DelaySubscription implements Subscription {
88-
89-
final Subscription upstream;
90-
91-
DelaySubscription(Subscription s) {
92-
this.upstream = s;
83+
@Override
84+
public void request(long n) {
85+
if (SubscriptionHelper.validate(n)) {
86+
SubscriptionHelper.deferredRequest(upstream, this, n);
9387
}
88+
}
9489

95-
@Override
96-
public void request(long n) {
97-
// ignored
98-
}
90+
@Override
91+
public void cancel() {
92+
SubscriptionHelper.cancel(other);
93+
SubscriptionHelper.cancel(upstream);
94+
}
9995

100-
@Override
101-
public void cancel() {
102-
upstream.cancel();
103-
}
96+
@Override
97+
public void onSubscribe(Subscription s) {
98+
SubscriptionHelper.deferredSetOnce(upstream, this, s);
10499
}
105100

106-
final class OnCompleteSubscriber implements FlowableSubscriber<T> {
101+
final class OtherSubscriber extends AtomicReference<Subscription> implements FlowableSubscriber<Object> {
102+
103+
private static final long serialVersionUID = -3892798459447644106L;
104+
107105
@Override
108106
public void onSubscribe(Subscription s) {
109-
serial.setSubscription(s);
107+
if (SubscriptionHelper.setOnce(this, s)) {
108+
s.request(Long.MAX_VALUE);
109+
}
110110
}
111111

112112
@Override
113-
public void onNext(T t) {
114-
child.onNext(t);
113+
public void onNext(Object t) {
114+
Subscription s = get();
115+
if (s != SubscriptionHelper.CANCELLED) {
116+
lazySet(SubscriptionHelper.CANCELLED);
117+
s.cancel();
118+
next();
119+
}
115120
}
116121

117122
@Override
118123
public void onError(Throwable t) {
119-
child.onError(t);
124+
Subscription s = get();
125+
if (s != SubscriptionHelper.CANCELLED) {
126+
downstream.onError(t);
127+
} else {
128+
RxJavaPlugins.onError(t);
129+
}
120130
}
121131

122132
@Override
123133
public void onComplete() {
124-
child.onComplete();
134+
Subscription s = get();
135+
if (s != SubscriptionHelper.CANCELLED) {
136+
next();
137+
}
125138
}
126139
}
127-
}
140+
}
128141
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ static final class OnErrorNextSubscriber<T>
5858
long produced;
5959

6060
OnErrorNextSubscriber(Subscriber<? super T> actual, Function<? super Throwable, ? extends Publisher<? extends T>> nextSupplier, boolean allowFatal) {
61+
super(false);
6162
this.downstream = actual;
6263
this.nextSupplier = nextSupplier;
6364
this.allowFatal = allowFatal;

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public FlowableRepeat(Flowable<T> source, long count) {
2929

3030
@Override
3131
public void subscribeActual(Subscriber<? super T> s) {
32-
SubscriptionArbiter sa = new SubscriptionArbiter();
32+
SubscriptionArbiter sa = new SubscriptionArbiter(false);
3333
s.onSubscribe(sa);
3434

3535
RepeatSubscriber<T> rs = new RepeatSubscriber<T>(s, count != Long.MAX_VALUE ? count - 1 : Long.MAX_VALUE, sa, source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatUntil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public FlowableRepeatUntil(Flowable<T> source, BooleanSupplier until) {
3131

3232
@Override
3333
public void subscribeActual(Subscriber<? super T> s) {
34-
SubscriptionArbiter sa = new SubscriptionArbiter();
34+
SubscriptionArbiter sa = new SubscriptionArbiter(false);
3535
s.onSubscribe(sa);
3636

3737
RepeatSubscriber<T> rs = new RepeatSubscriber<T>(s, until, sa, source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatWhen.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ abstract static class WhenSourceSubscriber<T, U> extends SubscriptionArbiter imp
143143

144144
WhenSourceSubscriber(Subscriber<? super T> actual, FlowableProcessor<U> processor,
145145
Subscription receiver) {
146+
super(false);
146147
this.downstream = actual;
147148
this.processor = processor;
148149
this.receiver = receiver;
@@ -160,6 +161,7 @@ public final void onNext(T t) {
160161
}
161162

162163
protected final void again(U signal) {
164+
setSubscription(EmptySubscription.INSTANCE);
163165
long p = produced;
164166
if (p != 0L) {
165167
produced = 0L;

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryBiPredicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public FlowableRetryBiPredicate(
3333

3434
@Override
3535
public void subscribeActual(Subscriber<? super T> s) {
36-
SubscriptionArbiter sa = new SubscriptionArbiter();
36+
SubscriptionArbiter sa = new SubscriptionArbiter(false);
3737
s.onSubscribe(sa);
3838

3939
RetryBiSubscriber<T> rs = new RetryBiSubscriber<T>(s, predicate, sa, source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryPredicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public FlowableRetryPredicate(Flowable<T> source,
3535

3636
@Override
3737
public void subscribeActual(Subscriber<? super T> s) {
38-
SubscriptionArbiter sa = new SubscriptionArbiter();
38+
SubscriptionArbiter sa = new SubscriptionArbiter(false);
3939
s.onSubscribe(sa);
4040

4141
RetrySubscriber<T> rs = new RetrySubscriber<T>(s, count, predicate, sa, source);

src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchIfEmpty.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ static final class SwitchIfEmptySubscriber<T> implements FlowableSubscriber<T> {
4343
this.downstream = actual;
4444
this.other = other;
4545
this.empty = true;
46-
this.arbiter = new SubscriptionArbiter();
46+
this.arbiter = new SubscriptionArbiter(false);
4747
}
4848

4949
@Override

0 commit comments

Comments
 (0)