Skip to content

Commit cbc1c6b

Browse files
Scheduler.inner() for Core
1 parent 81e0f7a commit cbc1c6b

File tree

150 files changed

+1276
-1760
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

150 files changed

+1276
-1760
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 58 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -17,157 +17,100 @@
1717

1818
import java.util.concurrent.TimeUnit;
1919

20-
import rx.functions.Action1;
20+
import rx.functions.Action0;
21+
import rx.schedulers.Schedulers;
2122

2223
/**
2324
* Represents an object that schedules units of work.
2425
* <p>
25-
* The methods left to implement are:
26-
* <ul>
27-
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
28-
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
29-
* </ul>
26+
* Common implementations can be found in {@link Schedulers}.
3027
* <p>
3128
* Why is this an abstract class instead of an interface?
3229
* <p>
3330
* <ol>
3431
* <li>Java doesn't support extension methods and there are many overload methods needing default
35-
* implementations.</li>
32+
* implementations.</li>
3633
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for
37-
* a long time.</li>
34+
* a long time.</li>
3835
* <li>If only an interface were used Scheduler implementations would then need to extend from an
39-
* AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
40-
* functionality.</li>
36+
* AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
37+
* functionality.</li>
4138
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library
42-
* maintenance.</li>
39+
* maintenance.</li>
4340
* </ol>
4441
*/
4542
public abstract class Scheduler {
4643

4744
/**
48-
* Schedules an Action on a new Scheduler instance (typically another thread) for execution.
49-
*
50-
* @param action
51-
* Action to schedule
52-
* @return a subscription to be able to unsubscribe from action
53-
*/
54-
55-
public abstract Subscription schedule(Action1<Scheduler.Inner> action);
56-
57-
/**
58-
* Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point
59-
* in the future.
45+
* Retrieve or create a new {@link Scheduler.Inner} that represents serial execution of actions.
46+
* <p>
47+
* When work is completed it should be unsubscribed. Work on a {@link Scheduler.Inner} is guaranteed to be sequential.
6048
*
61-
* @param action
62-
* the Action to schedule
63-
* @param delayTime
64-
* time to wait before executing the action
65-
* @param unit
66-
* the time unit the delay time is given in
67-
* @return a subscription to be able to unsubscribe from action
49+
* @return Inner representing a serial queue of actions to be executed
6850
*/
69-
public abstract Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
51+
public abstract Inner inner();
7052

7153
/**
72-
* Schedules a cancelable action to be executed periodically. This default implementation schedules
73-
* recursively and waits for actions to complete (instead of potentially executing long-running actions
74-
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
75-
*
76-
* @param action
77-
* the Action to execute periodically
78-
* @param initialDelay
79-
* time to wait before executing the action for the first time
80-
* @param period
81-
* the time interval to wait each time in between executing the action
82-
* @param unit
83-
* the time unit the interval above is given in
84-
* @return a subscription to be able to unsubscribe from action
54+
* Sequential Scheduler for executing actions on a single thread or event loop.
55+
* <p>
56+
* Unsubscribing the {@Inner} unschedules all outstanding work and allows resources cleanup.
8557
*/
86-
public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit) {
87-
final long periodInNanos = unit.toNanos(period);
88-
89-
final Action1<Scheduler.Inner> recursiveAction = new Action1<Scheduler.Inner>() {
90-
@Override
91-
public void call(Inner inner) {
92-
if (!inner.isUnsubscribed()) {
93-
long startedAt = now();
94-
action.call(inner);
95-
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
96-
inner.schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
97-
}
98-
}
99-
};
100-
return schedule(recursiveAction, initialDelay, unit);
101-
}
102-
103-
public final Subscription scheduleRecursive(final Action1<Recurse> action) {
104-
return schedule(new Action1<Inner>() {
105-
106-
@Override
107-
public void call(Inner inner) {
108-
action.call(new Recurse(inner, action));
109-
}
110-
111-
});
112-
}
113-
114-
public static final class Recurse {
115-
private final Action1<Recurse> action;
116-
private final Inner inner;
117-
118-
private Recurse(Inner inner, Action1<Recurse> action) {
119-
this.inner = inner;
120-
this.action = action;
121-
}
122-
123-
/**
124-
* Schedule the current function for execution immediately.
125-
*/
126-
public final void schedule() {
127-
final Recurse self = this;
128-
inner.schedule(new Action1<Inner>() {
129-
130-
@Override
131-
public void call(Inner _inner) {
132-
action.call(self);
133-
}
134-
135-
});
136-
}
58+
public abstract static class Inner implements Subscription {
13759

13860
/**
139-
* Schedule the current function for execution in the future.
61+
* Schedules an Action for execution.
62+
*
63+
* @param action
64+
* Action to schedule
65+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
14066
*/
141-
public final void schedule(long delay, TimeUnit unit) {
142-
final Recurse self = this;
143-
inner.schedule(new Action1<Inner>() {
144-
145-
@Override
146-
public void call(Inner _inner) {
147-
action.call(self);
148-
}
14967

150-
}, delay, unit);
151-
}
152-
}
153-
154-
public abstract static class Inner implements Subscription {
68+
public abstract Subscription schedule(Action0 action);
15569

15670
/**
157-
* Schedules an action to be executed in delayTime.
71+
* Schedules an Action for execution at some point in the future.
15872
*
73+
* @param action
74+
* the Action to schedule
15975
* @param delayTime
160-
* time the action is to be delayed before executing
76+
* time to wait before executing the action
16177
* @param unit
162-
* time unit of the delay time
78+
* the time unit the delay time is given in
79+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
16380
*/
164-
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
81+
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
16582

16683
/**
167-
* Schedules a cancelable action to be executed in delayTime.
84+
* Schedules a cancelable action to be executed periodically. This default implementation schedules
85+
* recursively and waits for actions to complete (instead of potentially executing long-running actions
86+
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
16887
*
88+
* @param action
89+
* the Action to execute periodically
90+
* @param initialDelay
91+
* time to wait before executing the action for the first time
92+
* @param period
93+
* the time interval to wait each time in between executing the action
94+
* @param unit
95+
* the time unit the interval above is given in
96+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
16997
*/
170-
public abstract void schedule(Action1<Scheduler.Inner> action);
98+
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
99+
final long periodInNanos = unit.toNanos(period);
100+
101+
final Action0 recursiveAction = new Action0() {
102+
@Override
103+
public void call() {
104+
if (!isUnsubscribed()) {
105+
long startedAt = now();
106+
action.call();
107+
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
108+
schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
109+
}
110+
}
111+
};
112+
return schedule(recursiveAction, initialDelay, unit);
113+
}
171114

172115
/**
173116
* @return the scheduler's notion of current absolute time in milliseconds.

0 commit comments

Comments
 (0)