Skip to content

Commit 6456655

Browse files
authored
2.x: cancel upstream first, dispose worker last (#5075)
1 parent 14ea26b commit 6456655

14 files changed

+67
-81
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ public void request(long n) {
177177

178178
@Override
179179
public void cancel() {
180-
DisposableHelper.dispose(timer);
181-
182180
s.cancel();
181+
182+
DisposableHelper.dispose(timer);
183183
}
184184

185185
@Override
@@ -333,9 +333,9 @@ public void request(long n) {
333333

334334
@Override
335335
public void cancel() {
336-
w.dispose();
337336
clear();
338337
s.cancel();
338+
w.dispose();
339339
}
340340

341341
void clear() {
@@ -497,17 +497,15 @@ public void onNext(T t) {
497497

498498
@Override
499499
public void onError(Throwable t) {
500-
w.dispose();
501500
synchronized (this) {
502501
buffer = null;
503502
}
504503
actual.onError(t);
504+
w.dispose();
505505
}
506506

507507
@Override
508508
public void onComplete() {
509-
w.dispose();
510-
511509
U b;
512510
synchronized (this) {
513511
b = buffer;
@@ -519,6 +517,8 @@ public void onComplete() {
519517
if (enter()) {
520518
QueueDrainHelper.drainMaxLoop(queue, actual, false, this, this);
521519
}
520+
521+
w.dispose();
522522
}
523523

524524
@Override
@@ -543,11 +543,11 @@ public void cancel() {
543543

544544
@Override
545545
public void dispose() {
546-
w.dispose();
547546
synchronized (this) {
548547
buffer = null;
549548
}
550549
s.cancel();
550+
w.dispose();
551551
}
552552

553553
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ public void onError(Throwable t) {
108108
return;
109109
}
110110
done = true;
111-
DisposableHelper.dispose(timer);
112111
actual.onError(t);
112+
worker.dispose();
113113
}
114114

115115
@Override
@@ -127,8 +127,8 @@ public void onComplete() {
127127
de.emit();
128128
}
129129
DisposableHelper.dispose(timer);
130-
worker.dispose();
131130
actual.onComplete();
131+
worker.dispose();
132132
}
133133
}
134134

@@ -141,9 +141,8 @@ public void request(long n) {
141141

142142
@Override
143143
public void cancel() {
144-
DisposableHelper.dispose(timer);
145-
worker.dispose();
146144
s.cancel();
145+
worker.dispose();
147146
}
148147

149148
void emit(long idx, T t, DebounceEmitter<T> emitter) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ public void request(long n) {
121121

122122
@Override
123123
public void cancel() {
124-
w.dispose();
125124
s.cancel();
125+
w.dispose();
126126
}
127127

128128
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ public void onError(Throwable t) {
123123
return;
124124
}
125125
done = true;
126-
DisposableHelper.dispose(timer);
127126
actual.onError(t);
127+
worker.dispose();
128128
}
129129

130130
@Override
@@ -133,9 +133,8 @@ public void onComplete() {
133133
return;
134134
}
135135
done = true;
136-
DisposableHelper.dispose(timer);
137-
worker.dispose();
138136
actual.onComplete();
137+
worker.dispose();
139138
}
140139

141140
@Override
@@ -147,9 +146,8 @@ public void request(long n) {
147146

148147
@Override
149148
public void cancel() {
150-
DisposableHelper.dispose(timer);
151-
worker.dispose();
152149
s.cancel();
150+
worker.dispose();
153151
}
154152
}
155153
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ public void onError(Throwable t) {
154154
return;
155155
}
156156
done = true;
157-
worker.dispose();
158-
DisposableHelper.dispose(timer);
159157
arbiter.onError(t, s);
158+
worker.dispose();
160159
}
161160

162161
@Override
@@ -165,16 +164,14 @@ public void onComplete() {
165164
return;
166165
}
167166
done = true;
168-
worker.dispose();
169-
DisposableHelper.dispose(timer);
170167
arbiter.onComplete(s);
168+
worker.dispose();
171169
}
172170

173171
@Override
174172
public void dispose() {
175-
worker.dispose();
176-
DisposableHelper.dispose(timer);
177173
s.cancel();
174+
worker.dispose();
178175
}
179176

180177
@Override
@@ -256,9 +253,9 @@ public void onError(Throwable t) {
256253
return;
257254
}
258255
done = true;
259-
dispose();
260256

261257
actual.onError(t);
258+
worker.dispose();
262259
}
263260

264261
@Override
@@ -267,16 +264,15 @@ public void onComplete() {
267264
return;
268265
}
269266
done = true;
270-
dispose();
271267

272268
actual.onComplete();
269+
worker.dispose();
273270
}
274271

275272
@Override
276273
public void dispose() {
277-
worker.dispose();
278-
DisposableHelper.dispose(timer);
279274
s.cancel();
275+
worker.dispose();
280276
}
281277

282278
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ public void onError(Throwable t) {
159159
drainLoop();
160160
}
161161

162-
dispose();
163162
actual.onError(t);
163+
dispose();
164164
}
165165

166166
@Override
@@ -170,8 +170,8 @@ public void onComplete() {
170170
drainLoop();
171171
}
172172

173-
dispose();
174173
actual.onComplete();
174+
dispose();
175175
}
176176

177177
@Override
@@ -396,8 +396,8 @@ public void onNext(T t) {
396396
} else {
397397
window = null;
398398
s.cancel();
399-
dispose();
400399
actual.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
400+
dispose();
401401
return;
402402
}
403403
} else {
@@ -424,8 +424,8 @@ public void onError(Throwable t) {
424424
drainLoop();
425425
}
426426

427-
dispose();
428427
actual.onError(t);
428+
dispose();
429429
}
430430

431431
@Override
@@ -435,8 +435,8 @@ public void onComplete() {
435435
drainLoop();
436436
}
437437

438-
dispose();
439438
actual.onComplete();
439+
dispose();
440440
}
441441

442442
@Override
@@ -479,13 +479,13 @@ void drainLoop() {
479479
if (d && (empty || isHolder)) {
480480
window = null;
481481
q.clear();
482-
dispose();
483482
Throwable err = error;
484483
if (err != null) {
485484
w.onError(err);
486485
} else {
487486
w.onComplete();
488487
}
488+
dispose();
489489
return;
490490
}
491491

@@ -509,8 +509,8 @@ void drainLoop() {
509509
window = null;
510510
queue.clear();
511511
s.cancel();
512-
dispose();
513512
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
513+
dispose();
514514
return;
515515
}
516516
}
@@ -550,8 +550,8 @@ void drainLoop() {
550550
} else {
551551
window = null;
552552
s.cancel();
553-
dispose();
554553
actual.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
554+
dispose();
555555
return;
556556
}
557557
} else {
@@ -683,8 +683,8 @@ public void onError(Throwable t) {
683683
drainLoop();
684684
}
685685

686-
dispose();
687686
actual.onError(t);
687+
dispose();
688688
}
689689

690690
@Override
@@ -694,8 +694,8 @@ public void onComplete() {
694694
drainLoop();
695695
}
696696

697-
dispose();
698697
actual.onComplete();
698+
dispose();
699699
}
700700

701701
@Override
@@ -747,7 +747,6 @@ void drainLoop() {
747747

748748
if (d && (empty || sw)) {
749749
q.clear();
750-
dispose();
751750
Throwable e = error;
752751
if (e != null) {
753752
for (UnicastProcessor<T> w : ws) {
@@ -759,6 +758,7 @@ void drainLoop() {
759758
}
760759
}
761760
ws.clear();
761+
dispose();
762762
return;
763763
}
764764

0 commit comments

Comments
 (0)