Skip to content

Commit d20b17e

Browse files
artem-zinnatullinakarnokd
authored andcommitted
2.x: Fix TrampolineScheduler not calling RxJavaPlugins.onSchedule(), add tests for all schedulers. (#5747)
1 parent 860e39e commit d20b17e

File tree

2 files changed

+74
-2
lines changed

2 files changed

+74
-2
lines changed

src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public Worker createWorker() {
4949
@NonNull
5050
@Override
5151
public Disposable scheduleDirect(@NonNull Runnable run) {
52-
run.run();
52+
RxJavaPlugins.onSchedule(run).run();
5353
return EmptyDisposable.INSTANCE;
5454
}
5555

@@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
5858
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
5959
try {
6060
unit.sleep(delay);
61-
run.run();
61+
RxJavaPlugins.onSchedule(run).run();
6262
} catch (InterruptedException ex) {
6363
Thread.currentThread().interrupt();
6464
RxJavaPlugins.onError(ex);

src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.concurrent.*;
2121
import java.util.concurrent.atomic.*;
2222

23+
import io.reactivex.internal.functions.Functions;
24+
import io.reactivex.plugins.RxJavaPlugins;
2325
import org.junit.Test;
2426
import org.mockito.InOrder;
2527
import org.mockito.invocation.InvocationOnMock;
@@ -41,6 +43,7 @@ public abstract class AbstractSchedulerTests {
4143

4244
/**
4345
* The scheduler to test.
46+
*
4447
* @return the Scheduler instance
4548
*/
4649
protected abstract Scheduler getScheduler();
@@ -576,6 +579,7 @@ public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
576579
try {
577580
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
578581
int count;
582+
579583
@Override
580584
public void run() {
581585
if (++count == 10) {
@@ -610,6 +614,7 @@ public void schedulePeriodicallyZeroPeriod() throws Exception {
610614
try {
611615
sd.replace(w.schedulePeriodically(new Runnable() {
612616
int count;
617+
613618
@Override
614619
public void run() {
615620
if (++count == 10) {
@@ -626,4 +631,71 @@ public void run() {
626631
}
627632
}
628633
}
634+
635+
private void assertRunnableDecorated(Runnable scheduleCall) throws InterruptedException {
636+
try {
637+
final CountDownLatch decoratedCalled = new CountDownLatch(1);
638+
639+
RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
640+
@Override
641+
public Runnable apply(final Runnable actual) throws Exception {
642+
return new Runnable() {
643+
@Override
644+
public void run() {
645+
decoratedCalled.countDown();
646+
actual.run();
647+
}
648+
};
649+
}
650+
});
651+
652+
scheduleCall.run();
653+
654+
assertTrue(decoratedCalled.await(5, TimeUnit.SECONDS));
655+
} finally {
656+
RxJavaPlugins.reset();
657+
}
658+
}
659+
660+
@Test(timeout = 6000)
661+
public void scheduleDirectDecoratesRunnable() throws InterruptedException {
662+
assertRunnableDecorated(new Runnable() {
663+
@Override
664+
public void run() {
665+
getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE);
666+
}
667+
});
668+
}
669+
670+
@Test(timeout = 6000)
671+
public void scheduleDirectWithDelayDecoratesRunnable() throws InterruptedException {
672+
assertRunnableDecorated(new Runnable() {
673+
@Override
674+
public void run() {
675+
getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
676+
}
677+
});
678+
}
679+
680+
@Test(timeout = 6000)
681+
public void schedulePeriodicallyDirectDecoratesRunnable() throws InterruptedException {
682+
final Scheduler scheduler = getScheduler();
683+
if (scheduler instanceof TrampolineScheduler) {
684+
// Can't properly stop a trampolined periodic task.
685+
return;
686+
}
687+
688+
final AtomicReference<Disposable> disposable = new AtomicReference<Disposable>();
689+
690+
try {
691+
assertRunnableDecorated(new Runnable() {
692+
@Override
693+
public void run() {
694+
disposable.set(scheduler.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 10000, TimeUnit.MILLISECONDS));
695+
}
696+
});
697+
} finally {
698+
disposable.get().dispose();
699+
}
700+
}
629701
}

0 commit comments

Comments
 (0)