Skip to content

Commit e1d7612

Browse files
Merge pull request #1047 from benjchristensen/scheduler-0.18-with-inner
Scheduler Simplification
2 parents 526d091 + 46d3c66 commit e1d7612

File tree

164 files changed

+1591
-2052
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+1591
-2052
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package rx.lang.scala
1717

1818
import scala.concurrent.duration.Duration
19-
import rx.functions.Action1
19+
import rx.functions.Action0
2020
import rx.lang.scala.schedulers._
2121
import scala.concurrent.duration
2222
import rx.lang.scala.JavaConversions._
@@ -42,42 +42,8 @@ trait Scheduler {
4242
*/
4343
def now: Long = this.asJavaScheduler.now()
4444

45-
/**
46-
* Schedules a cancelable action to be executed.
47-
*
48-
* @param action Action to schedule.
49-
* @return a subscription to be able to unsubscribe from action.
50-
*/
51-
def schedule(action: Inner => Unit): Subscription = this.asJavaScheduler.schedule(action)
45+
def createInner: Inner = this.asJavaScheduler.createInner()
5246

53-
/**
54-
* Schedules a cancelable action to be executed periodically.
55-
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
56-
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
57-
*
58-
* @param action
59-
* The action to execute periodically.
60-
* @param initialDelay
61-
* Time to wait before executing the action for the first time.
62-
* @param period
63-
* The time interval to wait each time in between executing the action.
64-
* @return A subscription to be able to unsubscribe from action.
65-
*/
66-
def schedulePeriodically(action: Inner => Unit, initialDelay: Duration, period: Duration): Subscription =
67-
this.asJavaScheduler.schedulePeriodically (
68-
new Action1[rx.Scheduler.Inner] {
69-
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
70-
},
71-
initialDelay.toNanos,
72-
period.toNanos,
73-
duration.NANOSECONDS
74-
)
75-
76-
def scheduleRec(work: (=>Unit)=>Unit): Subscription = {
77-
Subscription(asJavaScheduler.schedule(new Action1[rx.Scheduler.Inner] {
78-
override def call(inner: rx.Scheduler.Inner): Unit = work{ inner.schedule(this) }
79-
}))
80-
}
8147
}
8248

8349
object Inner {
@@ -90,20 +56,20 @@ trait Inner extends Subscription {
9056
/**
9157
* Schedules a cancelable action to be executed in delayTime.
9258
*/
93-
def schedule(action: Inner => Unit, delayTime: Duration): Unit =
59+
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
9460
this.asJavaInner.schedule(
95-
new Action1[rx.Scheduler.Inner] {
96-
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
61+
new Action0 {
62+
override def call(): Unit = action()
9763
},
9864
delayTime.length,
9965
delayTime.unit)
10066

10167
/**
10268
* Schedules a cancelable action to be executed immediately.
10369
*/
104-
def schedule(action: Inner=>Unit): Unit = this.asJavaInner.schedule(
105-
new Action1[rx.Scheduler.Inner]{
106-
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
70+
def schedule(action: Unit => Unit): Subscription = this.asJavaInner.schedule(
71+
new Action0 {
72+
override def call(): Unit = action()
10773
}
10874
)
10975

rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java

Lines changed: 21 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import rx.Scheduler;
2121
import rx.Subscription;
22+
import rx.functions.Action0;
2223
import rx.functions.Action1;
2324
import rx.subscriptions.BooleanSubscription;
25+
import rx.subscriptions.Subscriptions;
2426
import android.os.Handler;
2527

2628
/**
@@ -40,43 +42,15 @@ public HandlerThreadScheduler(Handler handler) {
4042
this.handler = handler;
4143
}
4244

43-
/**
44-
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds.
45-
*
46-
* See {@link #schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)}
47-
*/
48-
@Override
49-
public Subscription schedule(Action1<Inner> action) {
50-
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
51-
inner.schedule(action);
52-
return inner;
53-
}
54-
55-
/**
56-
* Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action.
57-
*
58-
* @param state
59-
* State to pass into the action.
60-
* @param action
61-
* Action to schedule.
62-
* @param delayTime
63-
* Time the action is to be delayed before executing.
64-
* @param unit
65-
* Time unit of the delay time.
66-
* @return A Subscription from which one can unsubscribe from.
67-
*/
6845
@Override
69-
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
70-
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
71-
inner.schedule(action, delayTime, unit);
72-
return inner;
46+
public Inner createInner() {
47+
return new InnerHandlerThreadScheduler(handler);
7348
}
74-
49+
7550
private static class InnerHandlerThreadScheduler extends Inner {
7651

7752
private final Handler handler;
7853
private BooleanSubscription innerSubscription = new BooleanSubscription();
79-
private Inner _inner = this;
8054

8155
public InnerHandlerThreadScheduler(Handler handler) {
8256
this.handler = handler;
@@ -93,31 +67,31 @@ public boolean isUnsubscribed() {
9367
}
9468

9569
@Override
96-
public void schedule(final Action1<Inner> action, long delayTime, TimeUnit unit) {
97-
handler.postDelayed(new Runnable() {
70+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
71+
final Runnable runnable = new Runnable() {
9872
@Override
9973
public void run() {
100-
if (_inner.isUnsubscribed()) {
74+
if (isUnsubscribed()) {
10175
return;
10276
}
103-
action.call(_inner);
77+
action.call();
10478
}
105-
}, unit.toMillis(delayTime));
106-
}
107-
108-
@Override
109-
public void schedule(final Action1<Inner> action) {
110-
handler.postDelayed(new Runnable() {
79+
};
80+
handler.postDelayed(runnable, unit.toMillis(delayTime));
81+
return Subscriptions.create(new Action0() {
11182

11283
@Override
113-
public void run() {
114-
if (_inner.isUnsubscribed()) {
115-
return;
116-
}
117-
action.call(_inner);
84+
public void call() {
85+
handler.removeCallbacks(runnable);
86+
11887
}
88+
89+
});
90+
}
11991

120-
}, 0L);
92+
@Override
93+
public Subscription schedule(final Action0 action) {
94+
return schedule(action, 0, TimeUnit.MILLISECONDS);
12195
}
12296

12397
}

rxjava-contrib/rxjava-android/src/main/java/rx/android/subscriptions/AndroidSubscriptions.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ public void call() {
4242
if (Looper.getMainLooper() == Looper.myLooper()) {
4343
unsubscribe.call();
4444
} else {
45-
AndroidSchedulers.mainThread().schedule(new Action1<Inner>() {
45+
final Inner inner = AndroidSchedulers.mainThread().createInner();
46+
inner.schedule(new Action0() {
4647
@Override
47-
public void call(Inner inner) {
48+
public void call() {
4849
unsubscribe.call();
50+
inner.unsubscribe();
4951
}
5052
});
5153
}

rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import rx.Scheduler;
3232
import rx.Scheduler.Inner;
33+
import rx.functions.Action0;
3334
import rx.functions.Action1;
3435
import android.os.Handler;
3536

@@ -41,35 +42,37 @@ public class HandlerThreadSchedulerTest {
4142
public void shouldScheduleImmediateActionOnHandlerThread() {
4243
final Handler handler = mock(Handler.class);
4344
@SuppressWarnings("unchecked")
44-
final Action1<Inner> action = mock(Action1.class);
45+
final Action0 action = mock(Action0.class);
4546

4647
Scheduler scheduler = new HandlerThreadScheduler(handler);
47-
scheduler.schedule(action);
48+
Inner inner = scheduler.createInner();
49+
inner.schedule(action);
4850

4951
// verify that we post to the given Handler
5052
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
5153
verify(handler).postDelayed(runnable.capture(), eq(0L));
5254

5355
// verify that the given handler delegates to our action
5456
runnable.getValue().run();
55-
verify(action).call(any(Inner.class));
57+
verify(action).call();
5658
}
5759

5860
@Test
5961
public void shouldScheduleDelayedActionOnHandlerThread() {
6062
final Handler handler = mock(Handler.class);
6163
@SuppressWarnings("unchecked")
62-
final Action1<Inner> action = mock(Action1.class);
64+
final Action0 action = mock(Action0.class);
6365

6466
Scheduler scheduler = new HandlerThreadScheduler(handler);
65-
scheduler.schedule(action, 1L, TimeUnit.SECONDS);
67+
Inner inner = scheduler.createInner();
68+
inner.schedule(action, 1L, TimeUnit.SECONDS);
6669

6770
// verify that we post to the given Handler
6871
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
6972
verify(handler).postDelayed(runnable.capture(), eq(1000L));
7073

7174
// verify that the given handler delegates to our action
7275
runnable.getValue().run();
73-
verify(action).call(any(Inner.class));
76+
verify(action).call();
7477
}
7578
}

0 commit comments

Comments
 (0)