Skip to content

Commit fbee37a

Browse files
authored
3.x: Fix scheduled tasks' fatal exception behavior (#6956)
* 3.x: Fix scheduled tasks' fatal exception behavior * Fix direct periodic tasks not stopping upon crash. * Fix the mistake introduced in the previous commit. * Ensure task exception is rethrown so that the parent FutureTask can end * Update the abstract Scheduler's tasks too * Adjust some test expectation with DisposeTask
1 parent b6a994f commit fbee37a

15 files changed

+425
-36
lines changed

src/main/java/io/reactivex/rxjava3/core/Scheduler.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818

1919
import io.reactivex.rxjava3.annotations.*;
2020
import io.reactivex.rxjava3.disposables.Disposable;
21-
import io.reactivex.rxjava3.exceptions.Exceptions;
2221
import io.reactivex.rxjava3.functions.Function;
2322
import io.reactivex.rxjava3.internal.disposables.*;
2423
import io.reactivex.rxjava3.internal.schedulers.*;
25-
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
2624
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2725
import io.reactivex.rxjava3.schedulers.SchedulerRunnableIntrospection;
2826

@@ -542,9 +540,10 @@ public void run() {
542540
try {
543541
run.run();
544542
} catch (Throwable ex) {
545-
Exceptions.throwIfFatal(ex);
546-
worker.dispose();
547-
throw ExceptionHelper.wrapOrThrow(ex);
543+
// Exceptions.throwIfFatal(ex); nowhere to go
544+
dispose();
545+
RxJavaPlugins.onError(ex);
546+
throw ex;
548547
}
549548
}
550549
}
@@ -586,7 +585,13 @@ static final class DisposeTask implements Disposable, Runnable, SchedulerRunnabl
586585
public void run() {
587586
runner = Thread.currentThread();
588587
try {
589-
decoratedRun.run();
588+
try {
589+
decoratedRun.run();
590+
} catch (Throwable ex) {
591+
// Exceptions.throwIfFatal(e); nowhere to go
592+
RxJavaPlugins.onError(ex);
593+
throw ex;
594+
}
590595
} finally {
591596
dispose();
592597
runner = null;

src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ public void run() {
320320
}
321321
try {
322322
actual.run();
323+
} catch (Throwable ex) {
324+
// Exceptions.throwIfFatal(ex); nowhere to go
325+
RxJavaPlugins.onError(ex);
326+
throw ex;
323327
} finally {
324328
lazySet(true);
325329
}
@@ -386,7 +390,13 @@ public void run() {
386390
thread = Thread.currentThread();
387391
if (compareAndSet(READY, RUNNING)) {
388392
try {
389-
run.run();
393+
try {
394+
run.run();
395+
} catch (Throwable ex) {
396+
// Exceptions.throwIfFatal(ex); nowhere to go
397+
RxJavaPlugins.onError(ex);
398+
throw ex;
399+
}
390400
} finally {
391401
thread = null;
392402
if (compareAndSet(RUNNING, FINISHED)) {
@@ -463,11 +473,17 @@ public void run() {
463473
Runnable r = get();
464474
if (r != null) {
465475
try {
466-
r.run();
467-
} finally {
468-
lazySet(null);
469-
timed.lazySet(DisposableHelper.DISPOSED);
470-
direct.lazySet(DisposableHelper.DISPOSED);
476+
try {
477+
r.run();
478+
} finally {
479+
lazySet(null);
480+
timed.lazySet(DisposableHelper.DISPOSED);
481+
direct.lazySet(DisposableHelper.DISPOSED);
482+
}
483+
} catch (Throwable ex) {
484+
// Exceptions.throwIfFatal(ex); nowhere to go
485+
RxJavaPlugins.onError(ex);
486+
throw ex;
471487
}
472488
}
473489
}

src/main/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.atomic.AtomicReference;
2121

2222
import io.reactivex.rxjava3.disposables.Disposable;
23-
import io.reactivex.rxjava3.exceptions.Exceptions;
2423
import io.reactivex.rxjava3.internal.functions.Functions;
2524
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2625

@@ -54,12 +53,13 @@ public Void call() {
5453
runner = Thread.currentThread();
5554
try {
5655
task.run();
57-
setRest(executor.submit(this));
5856
runner = null;
57+
setRest(executor.submit(this));
5958
} catch (Throwable ex) {
60-
Exceptions.throwIfFatal(ex);
59+
// Exceptions.throwIfFatal(ex); nowhere to go
6160
runner = null;
6261
RxJavaPlugins.onError(ex);
62+
throw ex;
6363
}
6464
return null;
6565
}

src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.reactivex.rxjava3.internal.schedulers;
1818

19-
import io.reactivex.rxjava3.exceptions.Exceptions;
2019
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2120

2221
/**
@@ -39,10 +38,11 @@ public void run() {
3938
runnable.run();
4039
runner = null;
4140
} catch (Throwable ex) {
42-
Exceptions.throwIfFatal(ex);
41+
// Exceptions.throwIfFatal(ex); nowhere to go
4342
runner = null;
44-
lazySet(FINISHED);
43+
dispose();
4544
RxJavaPlugins.onError(ex);
45+
throw ex;
4646
}
4747
}
4848
}

src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectTask.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.concurrent.Callable;
2020

21+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
22+
2123
/**
2224
* A Callable to be submitted to an ExecutorService that runs a Runnable
2325
* action and manages completion/cancellation.
@@ -35,10 +37,16 @@ public ScheduledDirectTask(Runnable runnable) {
3537
public Void call() {
3638
runner = Thread.currentThread();
3739
try {
38-
runnable.run();
39-
} finally {
40-
lazySet(FINISHED);
41-
runner = null;
40+
try {
41+
runnable.run();
42+
} finally {
43+
lazySet(FINISHED);
44+
runner = null;
45+
}
46+
} catch (Throwable ex) {
47+
// Exceptions.throwIfFatal(e); nowhere to go
48+
RxJavaPlugins.onError(ex);
49+
throw ex;
4250
}
4351
return null;
4452
}

src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void run() {
6666
} catch (Throwable e) {
6767
// Exceptions.throwIfFatal(e); nowhere to go
6868
RxJavaPlugins.onError(e);
69+
throw e;
6970
}
7071
} finally {
7172
lazySet(THREAD_INDEX, null);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.core;
15+
16+
import static org.junit.Assert.fail;
17+
import static org.testng.Assert.assertTrue;
18+
19+
import org.junit.Test;
20+
21+
import io.reactivex.rxjava3.core.Scheduler.DisposeTask;
22+
import io.reactivex.rxjava3.exceptions.TestException;
23+
import io.reactivex.rxjava3.schedulers.Schedulers;
24+
import io.reactivex.rxjava3.testsupport.TestHelper;
25+
26+
public class DisposeTaskTest extends RxJavaTest {
27+
28+
@Test
29+
public void runnableThrows() throws Throwable {
30+
TestHelper.withErrorTracking(errors -> {
31+
32+
Scheduler.Worker worker = Schedulers.single().createWorker();
33+
34+
DisposeTask task = new DisposeTask(() -> {
35+
throw new TestException();
36+
}, worker);
37+
38+
try {
39+
task.run();
40+
fail("Should have thrown!");
41+
} catch (TestException expected) {
42+
// expected
43+
}
44+
45+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
46+
47+
assertTrue(worker.isDisposed());
48+
});
49+
}
50+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.core;
15+
16+
import static org.junit.Assert.fail;
17+
import static org.testng.Assert.assertTrue;
18+
19+
import java.util.List;
20+
21+
import org.junit.Test;
22+
23+
import io.reactivex.rxjava3.core.Scheduler.PeriodicDirectTask;
24+
import io.reactivex.rxjava3.exceptions.TestException;
25+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
26+
import io.reactivex.rxjava3.schedulers.Schedulers;
27+
import io.reactivex.rxjava3.testsupport.TestHelper;
28+
29+
public class PeriodicDirectTaskTest extends RxJavaTest {
30+
31+
@Test
32+
public void runnableThrows() {
33+
List<Throwable> errors = TestHelper.trackPluginErrors();
34+
try {
35+
Scheduler.Worker worker = Schedulers.single().createWorker();
36+
37+
PeriodicDirectTask task = new PeriodicDirectTask(() -> {
38+
throw new TestException();
39+
}, worker);
40+
41+
try {
42+
task.run();
43+
fail("Should have thrown!");
44+
} catch (TestException expected) {
45+
// expected
46+
}
47+
48+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
49+
50+
assertTrue(worker.isDisposed());
51+
52+
task.run();
53+
} finally {
54+
RxJavaPlugins.reset();
55+
}
56+
}
57+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.schedulers;
15+
16+
import static org.junit.Assert.fail;
17+
18+
import java.util.List;
19+
20+
import org.junit.Test;
21+
22+
import io.reactivex.rxjava3.core.RxJavaTest;
23+
import io.reactivex.rxjava3.exceptions.TestException;
24+
import io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable;
25+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
26+
import io.reactivex.rxjava3.testsupport.TestHelper;
27+
28+
public class BooleanRunnableTest extends RxJavaTest {
29+
30+
@Test
31+
public void runnableThrows() {
32+
List<Throwable> errors = TestHelper.trackPluginErrors();
33+
try {
34+
BooleanRunnable task = new BooleanRunnable(() -> {
35+
throw new TestException();
36+
});
37+
38+
try {
39+
task.run();
40+
fail("Should have thrown!");
41+
} catch (TestException expected) {
42+
// expected
43+
}
44+
45+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
46+
} finally {
47+
RxJavaPlugins.reset();
48+
}
49+
}
50+
}

src/test/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTaskTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ public void run() {
4444
}
4545
}, exec);
4646

47-
assertNull(task.call());
47+
try {
48+
task.call();
49+
fail("Should have thrown!");
50+
} catch (TestException excepted) {
51+
// excepted
52+
}
4853

4954
TestHelper.assertUndeliverable(errors, 0, TestException.class);
5055
} finally {

0 commit comments

Comments
 (0)