Skip to content

Commit b34bc69

Browse files
Merge pull request #1537 from GeorgiKhomeriki/master
recursive scheduling in RxScala
2 parents b12f3eb + cc1ffe6 commit b34bc69

File tree

2 files changed

+63
-0
lines changed

2 files changed

+63
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,22 @@ trait Worker extends Subscription {
9292
)
9393
}
9494

95+
/**
96+
* Schedules an Action for recursively repeated execution.
97+
*
98+
* @param action the Action to schedule recursively
99+
* @return a subscription to be able to unsubscribe the action
100+
*/
101+
def scheduleRec(action: => Unit): Subscription = {
102+
def work: Unit = {
103+
action
104+
if (!this.isUnsubscribed) {
105+
this.schedule(work)
106+
}
107+
}
108+
this.schedule(work)
109+
}
110+
95111
/**
96112
* Schedules a cancelable action to be executed periodically. This default implementation schedules
97113
* recursively and waits for actions to complete (instead of potentially executing long-running actions
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package rx.lang.scala
2+
3+
import java.util.concurrent.TimeUnit
4+
5+
import org.junit.Assert.assertTrue
6+
import org.junit.Test
7+
import org.scalatest.junit.JUnitSuite
8+
import rx.lang.scala.schedulers.TestScheduler
9+
10+
class SchedulerTests extends JUnitSuite {
11+
12+
@Test def testScheduleRecSingleRound() {
13+
val scheduler = TestScheduler()
14+
val worker = scheduler.createWorker
15+
var count = 0
16+
worker.scheduleRec({ count += 1; worker.unsubscribe() })
17+
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
18+
assertTrue(count == 1)
19+
}
20+
21+
@Test def testScheduleRecMultipleRounds() {
22+
val scheduler = TestScheduler()
23+
val worker = scheduler.createWorker
24+
var count = 0
25+
worker.scheduleRec({ count += 1; if(count == 100) worker.unsubscribe() })
26+
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
27+
assertTrue(count == 100)
28+
}
29+
30+
@Test def testScheduleRecUnsubscribe() {
31+
val scheduler = TestScheduler()
32+
val worker = scheduler.createWorker
33+
var count = 0
34+
val subscription = worker.scheduleRec({ count += 1 })
35+
subscription.unsubscribe()
36+
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
37+
assertTrue(count == 0)
38+
}
39+
40+
@Test(expected = classOf[Exception])
41+
def testScheduleRecException() {
42+
val scheduler = TestScheduler()
43+
scheduler.createWorker.scheduleRec({ throw new Exception() })
44+
scheduler.advanceTimeBy(1L, TimeUnit.SECONDS)
45+
}
46+
47+
}

0 commit comments

Comments
 (0)