Skip to content

Commit 3ae898c

Browse files
Add support for Reactor.
1 parent 4d927c7 commit 3ae898c

File tree

16 files changed

+1206
-0
lines changed

16 files changed

+1206
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Basic modules:
1313
Modules that provide builders and iteration support for various reactive streams libraries:
1414

1515
* [kotlinx-coroutines-reactive](reactive/kotlinx-coroutines-reactive) -- utilities for [Reactive Streams](http://www.reactive-streams.org)
16+
* [kotlinx-coroutines-reactor](reactive/kotlinx-coroutines-reactor) -- utilities for [Reactor](https://projectreactor.io)
1617
* [kotlinx-coroutines-rx1](reactive/kotlinx-coroutines-rx1) -- utilities for [RxJava 1.x](https://github.com/ReactiveX/RxJava/tree/1.x)
1718
* [kotlinx-coroutines-rx2](reactive/kotlinx-coroutines-rx2) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
1819

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
<module>kotlinx-coroutines-jdk8</module>
101101
<module>kotlinx-coroutines-nio</module>
102102
<module>reactive/kotlinx-coroutines-reactive</module>
103+
<module>reactive/kotlinx-coroutines-reactor</module>
103104
<module>reactive/kotlinx-coroutines-rx1</module>
104105
<module>reactive/kotlinx-coroutines-rx2</module>
105106
<module>reactive/kotlinx-coroutines-rx-example</module>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Module kotlinx-coroutines-reactor
2+
3+
Utilities for [Reactor](https://projectreactor.io).
4+
5+
Coroutine builders:
6+
7+
| **Name** | **Result** | **Scope** | **Description**
8+
| --------------- | -------------------------------------- | ---------------- | ---------------
9+
| [mono] | `Mono` | [CoroutineScope] | Cold mono that starts coroutine on subscribe
10+
| [flux] | `Flux` | [CoroutineScope] | Cold flux that starts coroutine on subscribe
11+
12+
Note, that `Mono` and `Flux` are a subclass of [Reactive Streams](http://www.reactive-streams.org)
13+
`Publisher` and extensions for it are covered by
14+
[kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive) module.
15+
16+
Conversion functions:
17+
18+
| **Name** | **Description**
19+
| -------- | ---------------
20+
| [Job.asMono][kotlinx.coroutines.experimental.Job.asMono] | Converts job to hot mono
21+
| [Deferred.asMono][kotlinx.coroutines.experimental.Deferred.asMono] | Converts deferred value to hot mono
22+
| [ReceiveChannel.asFlux][kotlinx.coroutines.experimental.channels.ReceiveChannel.asFlux] | Converts streaming channel to hot flux
23+
| [Scheduler.asCoroutineDispatcher][reactor.core.scheduler.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]
24+
| [TimedScheduler.asCoroutineDispatcher][reactor.core.scheduler.TimedScheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher] supporting [Delay]
25+
26+
<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
27+
<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
28+
<!--- INDEX kotlinx.coroutines.experimental -->
29+
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
30+
[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-dispatcher/index.html
31+
<!--- INDEX kotlinx.coroutines.experimental.channels -->
32+
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
33+
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
34+
[ChannelIterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel-iterator/index.html
35+
<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor -->
36+
<!--- DOCS_ROOT reactive/kotlinx-coroutines-reactor/target/dokka/kotlinx-coroutines-reactor -->
37+
<!--- INDEX kotlinx.coroutines.experimental.reactor -->
38+
[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/mono.html
39+
[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/flux.html
40+
[kotlinx.coroutines.experimental.Job.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.-job/as-mono.html
41+
[kotlinx.coroutines.experimental.Deferred.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.-deferred/as-mono.html
42+
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.channels.-receive-channel/as-flux.html
43+
[reactor.core.scheduler.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/reactor.core.scheduler.-scheduler/as-coroutine-dispatcher.html
44+
[reactor.core.scheduler.TimedScheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/reactor.core.scheduler.-timed-scheduler/as-coroutine-dispatcher.html
45+
<!--- END -->
46+
47+
# Package kotlinx.coroutines.experimental.reactor
48+
49+
Utilities for [Reactor](https://projectreactor.io).
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2016-2017 JetBrains s.r.o.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>org.jetbrains.kotlinx</groupId>
25+
<artifactId>kotlinx-coroutines</artifactId>
26+
<version>0.14</version>
27+
<relativePath>../../pom.xml</relativePath>
28+
</parent>
29+
30+
<artifactId>kotlinx-coroutines-reactor</artifactId>
31+
<packaging>jar</packaging>
32+
33+
<build>
34+
<sourceDirectory>src/main/kotlin</sourceDirectory>
35+
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
36+
</build>
37+
38+
<dependencyManagement>
39+
<dependencies>
40+
<dependency>
41+
<groupId>io.projectreactor</groupId>
42+
<artifactId>reactor-bom</artifactId>
43+
<version>Aluminium-RELEASE</version>
44+
<type>pom</type>
45+
<scope>import</scope>
46+
</dependency>
47+
</dependencies>
48+
</dependencyManagement>
49+
50+
<dependencies>
51+
<dependency>
52+
<groupId>io.projectreactor</groupId>
53+
<artifactId>reactor-core</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>io.projectreactor.addons</groupId>
57+
<artifactId>reactor-test</artifactId>
58+
<scope>test</scope>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.jetbrains.kotlinx</groupId>
62+
<artifactId>kotlinx-coroutines-core</artifactId>
63+
<version>${project.version}</version>
64+
<scope>compile</scope>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.jetbrains.kotlinx</groupId>
68+
<artifactId>kotlinx-coroutines-core</artifactId>
69+
<version>${project.version}</version>
70+
<classifier>tests</classifier>
71+
<scope>test</scope>
72+
</dependency>
73+
<dependency>
74+
<groupId>org.jetbrains.kotlinx</groupId>
75+
<artifactId>kotlinx-coroutines-reactive</artifactId>
76+
<version>${project.version}</version>
77+
<scope>compile</scope>
78+
</dependency>
79+
</dependencies>
80+
81+
</project>
82+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package kotlinx.coroutines.experimental.reactor
2+
3+
import kotlinx.coroutines.experimental.Deferred
4+
import kotlinx.coroutines.experimental.Job
5+
import kotlinx.coroutines.experimental.channels.ReceiveChannel
6+
import reactor.core.publisher.Flux
7+
import reactor.core.publisher.Mono
8+
import kotlin.coroutines.experimental.CoroutineContext
9+
10+
/**
11+
* Converts this job to the hot reactive mono that signals
12+
* with [success][MonoSink.success] when the corresponding job completes.
13+
*
14+
* Every subscriber gets the signal at the same time.
15+
* Unsubscribing from the resulting mono **does not** affect the original job in any way.
16+
*
17+
* @param context -- the coroutine context from which the resulting mono is going to be signalled
18+
*/
19+
public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
20+
21+
/**
22+
* Converts this deferred value to the hot reactive mono that signals
23+
* [success][MonoSink.success] or [error][MonoSink.error].
24+
*
25+
* Every subscriber gets the same completion value.
26+
* Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
27+
*
28+
* @param context -- the coroutine context from which the resulting mono is going to be signalled
29+
*/
30+
public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
31+
32+
/**
33+
* Converts a stream of elements received from the channel to the hot reactive flux.
34+
*
35+
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
36+
* they'll receive values in round-robin way.
37+
*
38+
* @param context -- the coroutine context from which the resulting flux is going to be signalled
39+
*/
40+
public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext): Flux<T> = flux(context) {
41+
for (t in this@asFlux)
42+
send(t)
43+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package kotlinx.coroutines.experimental.reactor
2+
3+
import kotlinx.coroutines.experimental.channels.ProducerScope
4+
import kotlinx.coroutines.experimental.reactive.publish
5+
import reactor.core.publisher.Flux
6+
import kotlin.coroutines.experimental.CoroutineContext
7+
8+
/**
9+
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
10+
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
11+
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
12+
*
13+
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
14+
* `onNext` is not invoked concurrently.
15+
*
16+
* | **Coroutine action** | **Signal to subscriber**
17+
* | -------------------------------------------- | ------------------------
18+
* | `send` | `onNext`
19+
* | Normal completion or `close` without cause | `onComplete`
20+
* | Failure with exception or `close` with cause | `onError`
21+
*/
22+
fun <T> flux(
23+
context: CoroutineContext,
24+
block: suspend ProducerScope<T>.() -> Unit
25+
): Flux<T> = Flux.from(publish(context, block))
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package kotlinx.coroutines.experimental.reactor
17+
18+
import kotlinx.coroutines.experimental.AbstractCoroutine
19+
import kotlinx.coroutines.experimental.CoroutineScope
20+
import kotlinx.coroutines.experimental.Job
21+
import kotlinx.coroutines.experimental.newCoroutineContext
22+
import reactor.core.Disposable
23+
import reactor.core.publisher.Mono
24+
import reactor.core.publisher.MonoSink
25+
import kotlin.coroutines.experimental.CoroutineContext
26+
import kotlin.coroutines.experimental.startCoroutine
27+
28+
/**
29+
* Creates cold [mono][Mono] that will run a given [block] in a coroutine.
30+
* Every time the returned mono is subscribed, it starts a new coroutine in the specified [context].
31+
* Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
32+
*
33+
* | **Coroutine action** | **Signal to sink**
34+
* | ------------------------------------- | ------------------------
35+
* | Returns a non-null value | `success(value)`
36+
* | Returns a null | `success`
37+
* | Failure with exception or unsubscribe | `error`
38+
*/
39+
fun <T> mono(
40+
context: CoroutineContext,
41+
block: suspend CoroutineScope.() -> T?
42+
): Mono<T> = Mono.create { sink ->
43+
val newContext = newCoroutineContext(context)
44+
val coroutine = MonoCoroutine(newContext, sink)
45+
coroutine.initParentJob(context[Job])
46+
sink.setCancellation(coroutine)
47+
block.startCoroutine(coroutine, coroutine)
48+
}
49+
50+
private class MonoCoroutine<in T>(
51+
override val parentContext: CoroutineContext,
52+
private val sink: MonoSink<T>
53+
) : AbstractCoroutine<T>(true), Disposable {
54+
var disposed = false
55+
56+
@Suppress("UNCHECKED_CAST")
57+
override fun afterCompletion(state: Any?, mode: Int) {
58+
when {
59+
disposed -> {}
60+
state is CompletedExceptionally -> sink.error(state.exception)
61+
state != null -> sink.success(state as T)
62+
else -> sink.success()
63+
}
64+
}
65+
66+
override fun dispose() {
67+
disposed = true
68+
cancel(cause = null)
69+
}
70+
71+
override fun isDisposed(): Boolean = disposed
72+
}
73+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package kotlinx.coroutines.experimental.reactor
2+
3+
import kotlinx.coroutines.experimental.CancellableContinuation
4+
import kotlinx.coroutines.experimental.CoroutineDispatcher
5+
import kotlinx.coroutines.experimental.Delay
6+
import kotlinx.coroutines.experimental.DisposableHandle
7+
import kotlinx.coroutines.experimental.disposeOnCompletion
8+
import reactor.core.Cancellation
9+
import reactor.core.scheduler.Scheduler
10+
import reactor.core.scheduler.TimedScheduler
11+
import java.util.concurrent.TimeUnit
12+
import kotlin.coroutines.experimental.CoroutineContext
13+
14+
/**
15+
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher].
16+
*/
17+
fun Scheduler.asCoroutineDispatcher() = SchedulerCoroutineDispatcher(this)
18+
19+
/**
20+
* Converts an instance of [TimedScheduler] to an implementation of [CoroutineDispatcher]
21+
* and provides native [delay][Delay.delay] support.
22+
*/
23+
fun TimedScheduler.asCoroutineDispatcher() = TimedSchedulerCoroutineDispatcher(this)
24+
25+
/**
26+
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
27+
* @param scheduler a scheduler.
28+
*/
29+
open class SchedulerCoroutineDispatcher(private val scheduler: Scheduler) : CoroutineDispatcher() {
30+
override fun dispatch(context: CoroutineContext, block: Runnable) {
31+
scheduler.schedule(block)
32+
}
33+
34+
override fun toString(): String = scheduler.toString()
35+
}
36+
37+
/**
38+
* Implements [CoroutineDispatcher] on top of an arbitrary [TimedScheduler].
39+
* @param scheduler a timed scheduler.
40+
*/
41+
open class TimedSchedulerCoroutineDispatcher(private val scheduler: TimedScheduler) : SchedulerCoroutineDispatcher(scheduler), Delay {
42+
43+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
44+
val disposable = scheduler.schedule({
45+
with(continuation) { resumeUndispatched(Unit) }
46+
}, time, unit)
47+
48+
continuation.disposeOnCompletion(disposable.asDisposableHandle())
49+
}
50+
51+
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
52+
scheduler.schedule(block, time, unit).asDisposableHandle()
53+
}
54+
55+
private fun Cancellation.asDisposableHandle(): DisposableHandle =
56+
object : DisposableHandle {
57+
override fun dispose() = this@asDisposableHandle.dispose()
58+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package kotlinx.coroutines.experimental.reactor
2+
3+
import reactor.core.publisher.Flux
4+
import reactor.core.publisher.Mono
5+
6+
fun <T> checkMonoValue(
7+
mono: Mono<T>,
8+
checker: (T) -> Unit
9+
) {
10+
val monoValue = mono.block()
11+
checker(monoValue)
12+
}
13+
14+
fun checkErroneous(
15+
mono: Mono<*>,
16+
checker: (Throwable) -> Unit
17+
) {
18+
try {
19+
mono.block()
20+
error("Should have failed")
21+
} catch (e: Throwable) {
22+
checker(e)
23+
}
24+
}
25+
26+
fun <T> checkSingleValue(
27+
flux: Flux<T>,
28+
checker: (T) -> Unit
29+
) {
30+
val singleValue = flux.toIterable().single()
31+
checker(singleValue)
32+
}
33+
34+
fun checkErroneous(
35+
flux: Flux<*>,
36+
checker: (Throwable) -> Unit
37+
) {
38+
val singleNotification = flux.materialize().toIterable().single()
39+
checker(singleNotification.throwable)
40+
}

0 commit comments

Comments
 (0)