Skip to content

Commit 5d273a9

Browse files
konrad-kaminskielizarov
authored andcommitted
Rx1 Scheduler support.
1 parent 3535cee commit 5d273a9

File tree

3 files changed

+107
-1
lines changed

3 files changed

+107
-1
lines changed

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,6 @@ private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutin
116116
}))
117117
}
118118

119-
private fun <T> CancellableContinuation<T>.unsubscribeOnCompletion(sub: Subscription) {
119+
internal fun <T> CancellableContinuation<T>.unsubscribeOnCompletion(sub: Subscription) {
120120
invokeOnCompletion { sub.unsubscribe() }
121121
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx1
18+
19+
import rx.Scheduler
20+
import kotlinx.coroutines.experimental.CancellableContinuation
21+
import kotlinx.coroutines.experimental.CoroutineDispatcher
22+
import kotlinx.coroutines.experimental.Delay
23+
import kotlinx.coroutines.experimental.DisposableHandle
24+
import rx.Subscription
25+
import java.util.concurrent.TimeUnit
26+
import kotlin.coroutines.experimental.CoroutineContext
27+
28+
/**
29+
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
30+
* and provides native [delay][Delay.delay] support.
31+
*/
32+
public fun Scheduler.asCoroutineDispatcher() = SchedulerCoroutineDispatcher(this)
33+
34+
/**
35+
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
36+
* @param scheduler a scheduler.
37+
*/
38+
public class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : CoroutineDispatcher(), Delay {
39+
override fun dispatch(context: CoroutineContext, block: Runnable) {
40+
scheduler.createWorker().schedule { block.run() }
41+
}
42+
43+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
44+
scheduler.createWorker()
45+
.schedule({
46+
with(continuation) { resumeUndispatched(Unit) }
47+
}, time, unit)
48+
.let { subscription ->
49+
continuation.unsubscribeOnCompletion(subscription)
50+
}
51+
52+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
53+
scheduler.createWorker().schedule({ block.run() }, time, unit).asDisposableHandle()
54+
55+
private fun Subscription.asDisposableHandle(): DisposableHandle = object : DisposableHandle {
56+
override fun dispose() = unsubscribe()
57+
}
58+
59+
override fun toString(): String = scheduler.toString()
60+
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
61+
override fun hashCode(): Int = System.identityHashCode(scheduler)
62+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx1
18+
19+
import kotlinx.coroutines.experimental.*
20+
import org.hamcrest.core.IsEqual
21+
import org.hamcrest.core.IsNot
22+
import org.junit.Assert.assertThat
23+
import org.junit.Test
24+
import rx.schedulers.Schedulers
25+
26+
class SchedulerTest : TestBase() {
27+
@Test
28+
fun testIoScheduler(): Unit = runBlocking {
29+
expect(1)
30+
val mainThread = Thread.currentThread()
31+
run(Schedulers.io().asCoroutineDispatcher()) {
32+
val t1 = Thread.currentThread()
33+
println(t1)
34+
assertThat(t1, IsNot(IsEqual(mainThread)))
35+
expect(2)
36+
delay(100)
37+
val t2 = Thread.currentThread()
38+
println(t2)
39+
assertThat(t2, IsNot(IsEqual(mainThread)))
40+
expect(3)
41+
}
42+
finish(4)
43+
}
44+
}

0 commit comments

Comments
 (0)