Skip to content

Commit 4d2e821

Browse files
lukaszguzakarnokd
authored andcommitted
2.x: RxJavaPlugins unwrapRunnable (#5734)
* 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable-javadoc * 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage * 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage * 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage * 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage * 2.x: RxJavaPlugins unwrapRunnable-Enhancement and test coverage * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable * 2.x: RxJavaPlugins unwrapRunnable
1 parent 30905fc commit 4d2e821

File tree

7 files changed

+203
-14
lines changed

7 files changed

+203
-14
lines changed

src/main/java/io/reactivex/Scheduler.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
package io.reactivex;
1515

16-
import java.util.concurrent.TimeUnit;
17-
1816
import io.reactivex.annotations.*;
1917
import io.reactivex.disposables.Disposable;
2018
import io.reactivex.exceptions.Exceptions;
@@ -23,6 +21,9 @@
2321
import io.reactivex.internal.schedulers.*;
2422
import io.reactivex.internal.util.ExceptionHelper;
2523
import io.reactivex.plugins.RxJavaPlugins;
24+
import io.reactivex.schedulers.SchedulerRunnableIntrospection;
25+
26+
import java.util.concurrent.TimeUnit;
2627

2728
/**
2829
* A {@code Scheduler} is an object that specifies an API for scheduling
@@ -197,7 +198,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
197198
* <p>
198199
* Limit the amount concurrency two at a time without creating a new fix
199200
* size thread pool:
200-
*
201+
*
201202
* <pre>
202203
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
203204
* // use merge max concurrent to limit the number of concurrent
@@ -215,20 +216,20 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
215216
* {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
216217
* subscribing to the first {@link Flowable} could deadlock the
217218
* subscription to the second.
218-
*
219+
*
219220
* <pre>
220221
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
221222
* // use merge max concurrent to limit the number of concurrent
222223
* // Flowables two at a time
223224
* return Completable.merge(Flowable.merge(workers, 2));
224225
* });
225226
* </pre>
226-
*
227+
*
227228
* Slowing down the rate to no more than than 1 a second. This suffers from
228229
* the same problem as the one above I could find an {@link Flowable}
229230
* operator that limits the rate without dropping the values (aka leaky
230231
* bucket algorithm).
231-
*
232+
*
232233
* <pre>
233234
* Scheduler slowScheduler = Schedulers.computation().when(workers -&gt; {
234235
* // use concatenate to make each worker happen one at a time.
@@ -238,7 +239,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
238239
* }));
239240
* });
240241
* </pre>
241-
*
242+
*
242243
* <p>History: 2.0.1 - experimental
243244
* @param <S> a Scheduler and a Subscription
244245
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
@@ -347,7 +348,7 @@ public long now(@NonNull TimeUnit unit) {
347348
* Holds state and logic to calculate when the next delayed invocation
348349
* of this task has to happen (accounting for clock drifts).
349350
*/
350-
final class PeriodicTask implements Runnable {
351+
final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
351352
@NonNull
352353
final Runnable decoratedRun;
353354
@NonNull
@@ -393,11 +394,16 @@ public void run() {
393394
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
394395
}
395396
}
397+
398+
@Override
399+
public Runnable getWrappedRunnable() {
400+
return this.decoratedRun;
401+
}
396402
}
397403
}
398404

399405
static class PeriodicDirectTask
400-
implements Runnable, Disposable {
406+
implements Disposable, Runnable, SchedulerRunnableIntrospection {
401407
final Runnable run;
402408
@NonNull
403409
final Worker worker;
@@ -432,9 +438,14 @@ public void dispose() {
432438
public boolean isDisposed() {
433439
return disposed;
434440
}
441+
442+
@Override
443+
public Runnable getWrappedRunnable() {
444+
return run;
445+
}
435446
}
436447

437-
static final class DisposeTask implements Runnable, Disposable {
448+
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
438449
final Runnable decoratedRun;
439450
final Worker w;
440451

@@ -469,5 +480,10 @@ public void dispose() {
469480
public boolean isDisposed() {
470481
return w.isDisposed();
471482
}
483+
484+
@Override
485+
public Runnable getWrappedRunnable() {
486+
return this.decoratedRun;
487+
}
472488
}
473489
}

src/main/java/io/reactivex/internal/schedulers/AbstractDirectTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121

2222
import io.reactivex.disposables.Disposable;
2323
import io.reactivex.internal.functions.Functions;
24+
import io.reactivex.schedulers.SchedulerRunnableIntrospection;
2425

2526
/**
2627
* Base functionality for direct tasks that manage a runnable and cancellation/completion.
2728
* @since 2.0.8
2829
*/
2930
abstract class AbstractDirectTask
3031
extends AtomicReference<Future<?>>
31-
implements Disposable {
32+
implements Disposable, SchedulerRunnableIntrospection {
3233

3334
private static final long serialVersionUID = 1811839108042568751L;
3435

@@ -77,4 +78,9 @@ public final void setFuture(Future<?> future) {
7778
}
7879
}
7980
}
81+
82+
@Override
83+
public Runnable getWrappedRunnable() {
84+
return runnable;
85+
}
8086
}

src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
import io.reactivex.annotations.NonNull;
2121
import io.reactivex.disposables.*;
2222
import io.reactivex.internal.disposables.*;
23+
import io.reactivex.internal.functions.Functions;
2324
import io.reactivex.internal.queue.MpscLinkedQueue;
2425
import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable;
2526
import io.reactivex.plugins.RxJavaPlugins;
26-
import io.reactivex.schedulers.Schedulers;
27+
import io.reactivex.schedulers.*;
2728

2829
/**
2930
* Wraps an Executor and provides the Scheduler API over it.
@@ -290,7 +291,8 @@ public void run() {
290291
}
291292
}
292293

293-
static final class DelayedRunnable extends AtomicReference<Runnable> implements Runnable, Disposable {
294+
static final class DelayedRunnable extends AtomicReference<Runnable>
295+
implements Runnable, Disposable, SchedulerRunnableIntrospection {
294296

295297
private static final long serialVersionUID = -4101336210206799084L;
296298

@@ -330,6 +332,12 @@ public void dispose() {
330332
direct.dispose();
331333
}
332334
}
335+
336+
@Override
337+
public Runnable getWrappedRunnable() {
338+
Runnable r = get();
339+
return r != null ? r : Functions.EMPTY_RUNNABLE;
340+
}
333341
}
334342

335343
final class DelayedDispose implements Runnable {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
4+
* compliance with the License. You may obtain a copy of the License at
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
7+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
8+
* the License for the specific language governing permissions and limitations under the License.
9+
*/
10+
11+
package io.reactivex.schedulers;
12+
13+
import io.reactivex.annotations.*;
14+
import io.reactivex.functions.Function;
15+
import io.reactivex.plugins.RxJavaPlugins;
16+
17+
/**
18+
* Interface to wrap an action inside internal scheduler's task.
19+
*
20+
* You can check if runnable implements this interface and unwrap original runnable.
21+
* For example inside of the {@link RxJavaPlugins#setScheduleHandler(Function)}
22+
*
23+
* @since 2.1.7 - experimental
24+
*/
25+
@Experimental
26+
public interface SchedulerRunnableIntrospection {
27+
28+
/**
29+
* Returns the wrapped action.
30+
*
31+
* @return the wrapped action. Cannot be null.
32+
*/
33+
@NonNull
34+
Runnable getWrappedRunnable();
35+
}

src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,4 +698,48 @@ public void run() {
698698
disposable.get().dispose();
699699
}
700700
}
701-
}
701+
702+
@Test(timeout = 5000)
703+
public void unwrapDefaultPeriodicTask() throws InterruptedException {
704+
Scheduler s = getScheduler();
705+
if (s instanceof TrampolineScheduler) {
706+
// TrampolineScheduler always return EmptyDisposable
707+
return;
708+
}
709+
710+
711+
final CountDownLatch cdl = new CountDownLatch(1);
712+
Runnable countDownRunnable = new Runnable() {
713+
@Override
714+
public void run() {
715+
cdl.countDown();
716+
}
717+
};
718+
Disposable disposable = s.schedulePeriodicallyDirect(countDownRunnable, 100, 100, TimeUnit.MILLISECONDS);
719+
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;
720+
721+
assertSame(countDownRunnable, wrapper.getWrappedRunnable());
722+
assertTrue(cdl.await(5, TimeUnit.SECONDS));
723+
disposable.dispose();
724+
}
725+
726+
@Test
727+
public void unwrapScheduleDirectTask() {
728+
Scheduler scheduler = getScheduler();
729+
if (scheduler instanceof TrampolineScheduler) {
730+
// TrampolineScheduler always return EmptyDisposable
731+
return;
732+
}
733+
final CountDownLatch cdl = new CountDownLatch(1);
734+
Runnable countDownRunnable = new Runnable() {
735+
@Override
736+
public void run() {
737+
cdl.countDown();
738+
}
739+
};
740+
Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS);
741+
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;
742+
assertSame(countDownRunnable, wrapper.getWrappedRunnable());
743+
disposable.dispose();
744+
}
745+
}

src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,4 +561,22 @@ public void runnableDisposedAsyncTimed2() throws Exception {
561561
executorScheduler.shutdownNow();
562562
}
563563
}
564+
565+
@Test
566+
public void unwrapScheduleDirectTaskAfterDispose() {
567+
Scheduler scheduler = getScheduler();
568+
final CountDownLatch cdl = new CountDownLatch(1);
569+
Runnable countDownRunnable = new Runnable() {
570+
@Override
571+
public void run() {
572+
cdl.countDown();
573+
}
574+
};
575+
Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS);
576+
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable;
577+
assertSame(countDownRunnable, wrapper.getWrappedRunnable());
578+
disposable.dispose();
579+
580+
assertSame(Functions.EMPTY_RUNNABLE, wrapper.getWrappedRunnable());
581+
}
564582
}

src/test/java/io/reactivex/schedulers/SchedulerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,4 +307,66 @@ public void customScheduleDirectDisposed() {
307307

308308
assertTrue(d.isDisposed());
309309
}
310+
311+
@Test
312+
public void unwrapDefaultPeriodicTask() {
313+
TestScheduler scheduler = new TestScheduler();
314+
315+
Runnable runnable = new Runnable() {
316+
@Override
317+
public void run() {
318+
}
319+
};
320+
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS);
321+
322+
assertSame(runnable, wrapper.getWrappedRunnable());
323+
}
324+
325+
@Test
326+
public void unwrapScheduleDirectTask() {
327+
TestScheduler scheduler = new TestScheduler();
328+
329+
Runnable runnable = new Runnable() {
330+
@Override
331+
public void run() {
332+
}
333+
};
334+
SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) scheduler.scheduleDirect(runnable, 100, TimeUnit.MILLISECONDS);
335+
assertSame(runnable, wrapper.getWrappedRunnable());
336+
}
337+
338+
@Test
339+
public void unwrapWorkerPeriodicTask() {
340+
final Runnable runnable = new Runnable() {
341+
@Override
342+
public void run() {
343+
}
344+
};
345+
346+
Scheduler scheduler = new Scheduler() {
347+
@Override
348+
public Worker createWorker() {
349+
return new Worker() {
350+
@Override
351+
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
352+
SchedulerRunnableIntrospection outerWrapper = (SchedulerRunnableIntrospection) run;
353+
SchedulerRunnableIntrospection innerWrapper = (SchedulerRunnableIntrospection) outerWrapper.getWrappedRunnable();
354+
assertSame(runnable, innerWrapper.getWrappedRunnable());
355+
return (Disposable) innerWrapper;
356+
}
357+
358+
@Override
359+
public void dispose() {
360+
}
361+
362+
@Override
363+
public boolean isDisposed() {
364+
return false;
365+
}
366+
};
367+
}
368+
};
369+
370+
scheduler.schedulePeriodicallyDirect(runnable, 100, 100, TimeUnit.MILLISECONDS);
371+
}
310372
}

0 commit comments

Comments
 (0)