Skip to content

Commit a1fe3da

Browse files
committed
Scheduler.asCoroutineDispatcher extension for Rx 2.x
1 parent 1dfc26c commit a1fe3da

File tree

4 files changed

+107
-1
lines changed

4 files changed

+107
-1
lines changed

reactive/kotlinx-coroutines-rx2/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ Conversion functions:
3333
| [Job.asCompletable][kotlinx.coroutines.experimental.Job.asCompletable] | Converts job to hot completable
3434
| [Deferred.asSingle][kotlinx.coroutines.experimental.Deferred.asSingle] | Converts deferred value to hot single
3535
| [ReceiveChannel.asObservable][kotlinx.coroutines.experimental.channels.ReceiveChannel.asObservable] | Converts streaming channel to hot observable
36+
| [Scheduler.asCoroutineDispatcher][io.reactivex.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]
3637

3738
<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
3839
<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
3940
<!--- INDEX kotlinx.coroutines.experimental -->
4041
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
42+
[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
4143
<!--- INDEX kotlinx.coroutines.experimental.channels -->
4244
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
4345
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
@@ -57,6 +59,7 @@ Conversion functions:
5759
[kotlinx.coroutines.experimental.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-job/as-completable.html
5860
[kotlinx.coroutines.experimental.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-deferred/as-single.html
5961
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.channels.-receive-channel/as-observable.html
62+
[io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-scheduler/as-coroutine-dispatcher.html
6063
<!--- END -->
6164

6265
# Package kotlinx.coroutines.experimental.rx2

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SING
9292

9393
// ------------------------ private ------------------------
9494

95-
private fun CancellableContinuation<*>.disposeOnCompletion(d: Disposable) =
95+
internal fun CancellableContinuation<*>.disposeOnCompletion(d: Disposable) =
9696
invokeOnCompletion { d.dispose() }
9797

9898
private enum class Mode(val s: String) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx2
18+
19+
import io.reactivex.Scheduler
20+
import kotlinx.coroutines.experimental.CancellableContinuation
21+
import kotlinx.coroutines.experimental.CoroutineDispatcher
22+
import kotlinx.coroutines.experimental.Delay
23+
import kotlinx.coroutines.experimental.DisposableHandle
24+
import java.util.concurrent.TimeUnit
25+
import kotlin.coroutines.experimental.CoroutineContext
26+
27+
/**
28+
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
29+
* and provides native [delay][Delay.delay] support.
30+
*/
31+
public fun Scheduler.asCoroutineDispatcher() = SchedulerCoroutineDispatcher(this)
32+
33+
/**
34+
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
35+
* @param scheduler a scheduler.
36+
*/
37+
public class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : CoroutineDispatcher(), Delay {
38+
override fun dispatch(context: CoroutineContext, block: Runnable) {
39+
scheduler.scheduleDirect(block)
40+
}
41+
42+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
43+
val disposable = scheduler.scheduleDirect({
44+
with(continuation) { resumeUndispatched(Unit) }
45+
}, time, unit)
46+
continuation.disposeOnCompletion(disposable)
47+
}
48+
49+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
50+
val disposable = scheduler.scheduleDirect(block, time, unit)
51+
return object : DisposableHandle {
52+
override fun dispose() {
53+
disposable.dispose()
54+
}
55+
}
56+
}
57+
58+
override fun toString(): String = scheduler.toString()
59+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx2
18+
19+
import io.reactivex.schedulers.Schedulers
20+
import kotlinx.coroutines.experimental.*
21+
import org.hamcrest.core.IsEqual
22+
import org.hamcrest.core.IsNot
23+
import org.junit.Assert.assertThat
24+
import org.junit.Test
25+
26+
class SchedulerTest : TestBase() {
27+
@Test
28+
fun testIoScheduler(): Unit = runBlocking {
29+
expect(1)
30+
val mainThread = Thread.currentThread()
31+
run(Schedulers.io().asCoroutineDispatcher()) {
32+
val t1 = Thread.currentThread()
33+
println(t1)
34+
assertThat(t1, IsNot(IsEqual(mainThread)))
35+
expect(2)
36+
delay(100)
37+
val t2 = Thread.currentThread()
38+
println(t2)
39+
assertThat(t2, IsNot(IsEqual(mainThread)))
40+
expect(3)
41+
}
42+
finish(4)
43+
}
44+
}

0 commit comments

Comments
 (0)