Skip to content

Commit f013267

Browse files
committed
Introduce kotlinx-coroutines-rx library
1 parent 6effdcf commit f013267

File tree

7 files changed

+400
-3
lines changed

7 files changed

+400
-3
lines changed

README.md

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
# kotlinx.coroutines
2-
Two libraries built upon Kotlin coroutines:
2+
Three libraries built upon Kotlin coroutines:
33
* `kotlinx-coroutines-async` with convenient interfaces/wrappers to commonly
44
used asynchronous API shipped with standard JDK, namely promise-like `CompletableFuture`
55
and asynchronous channels from `java.nio` package
66
* `kotlinx-coroutines-generate` provides ability to create `Sequence` objects
77
generated by coroutine body containing `yield` suspension points
8+
* `kotlinx-coroutines-rx` allows to use `Observable` objects from
9+
[RxJava](https://github.com/ReactiveX/RxJava) inside a coroutine body to suspend on them
810

911
## Examples
1012
### Async
@@ -47,8 +49,46 @@ fun main(args: Array<String>) {
4749
}
4850
```
4951

50-
For more examples you can look at `kotlinx-coroutines-async-example-ui` sample
51-
project or in tests directories.
52+
### RxJava
53+
```kotlin
54+
import kotlinx.coroutines.asyncRx
55+
import retrofit2.Retrofit
56+
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
57+
import retrofit2.converter.gson.GsonConverterFactory
58+
import retrofit2.http.GET
59+
import retrofit2.http.Path
60+
import rx.Observable
61+
62+
interface GitHub {
63+
@GET("orgs/{user}/repos")
64+
fun orgRepos(@Path("user") user: String): Observable<List<Repo>>
65+
}
66+
67+
data class Repo(val name: String)
68+
69+
fun main(args: Array<String>) {
70+
val retrofit = Retrofit.Builder().apply {
71+
baseUrl("https://api.github.com")
72+
addConverterFactory(GsonConverterFactory.create())
73+
addCallAdapterFactory(RxJavaCallAdapterFactory.create())
74+
}.build()
75+
76+
val github = retrofit.create(GitHub::class.java)
77+
78+
asyncRx<Unit> {
79+
for (org in listOf("Kotlin", "ReactiveX")) {
80+
// `awaitSingle()` call here is a suspension point,
81+
// i.e. coroutine's code stops on it until request is not completed
82+
val repos = github.orgRepos(org).take(5).awaitSingle().joinToString()
83+
84+
println("$org: $repos")
85+
}
86+
}
87+
}
88+
```
89+
90+
For more examples you can look at `kotlinx-coroutines-async-example-ui`
91+
and `kotlinx-coroutines-rx-example` samples projects or in tests directories.
5292

5393
## Maven
5494

@@ -78,6 +118,11 @@ Add dependencies:
78118
<artifactId>kotlinx-coroutines-async</artifactId>
79119
<version>0.1-alpha-1</version>
80120
</dependency>
121+
<dependency>
122+
<groupId>org.jetbrains.kotlinx</groupId>
123+
<artifactId>kotlinx-coroutines-rx</artifactId>
124+
<version>0.1-alpha-1</version>
125+
</dependency>
81126
```
82127

83128
## Gradle
@@ -87,6 +132,7 @@ Just add dependencies:
87132
```groovy
88133
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-generate:0.1-alpha-1'
89134
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-async:0.1-alpha-1'
135+
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-rx:0.1-alpha-1'
90136
```
91137

92138
*NB:* As `async` library is built upon `CompletableFuture` it requires JDK 8 (24 Android API level)

kotlinx-coroutines-rx-example/pom.xml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>org.jetbrains.kotlinx</groupId>
8+
<artifactId>kotlinx-coroutines</artifactId>
9+
<version>0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>kotlinx-coroutines-rx-example</artifactId>
13+
<packaging>jar</packaging>
14+
15+
<name>Example of asyncRx usage</name>
16+
17+
<properties>
18+
<kotlin.version>1.1-SNAPSHOT</kotlin.version>
19+
</properties>
20+
21+
<build>
22+
<sourceDirectory>src/main/kotlin</sourceDirectory>
23+
<plugins>
24+
<plugin>
25+
<groupId>org.apache.maven.plugins</groupId>
26+
<artifactId>maven-deploy-plugin</artifactId>
27+
<configuration>
28+
<skip>true</skip>
29+
</configuration>
30+
</plugin>
31+
</plugins>
32+
</build>
33+
34+
<dependencies>
35+
<dependency>
36+
<groupId>io.reactivex</groupId>
37+
<artifactId>rxjava</artifactId>
38+
<version>1.1.5</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>com.squareup.retrofit2</groupId>
42+
<artifactId>retrofit</artifactId>
43+
<version>2.1.0</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>com.squareup.retrofit2</groupId>
47+
<artifactId>converter-gson</artifactId>
48+
<version>2.1.0</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>com.squareup.retrofit2</groupId>
52+
<artifactId>adapter-rxjava</artifactId>
53+
<version>2.1.0</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.jetbrains.kotlinx</groupId>
57+
<artifactId>kotlinx-coroutines-rx</artifactId>
58+
<version>${version}</version>
59+
<scope>compile</scope>
60+
</dependency>
61+
</dependencies>
62+
63+
</project>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package kotlinx.coroutines.example
2+
3+
import kotlinx.coroutines.asyncRx
4+
import retrofit2.Retrofit
5+
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
6+
import retrofit2.converter.gson.GsonConverterFactory
7+
import retrofit2.http.GET
8+
import retrofit2.http.Path
9+
import rx.Observable
10+
11+
interface GitHub {
12+
@GET("/repos/{owner}/{repo}/contributors")
13+
fun contributors(
14+
@Path("owner") owner: String,
15+
@Path("repo") repo: String
16+
): Observable<List<Contributor>>
17+
18+
@GET("users/{user}/repos")
19+
fun listRepos(@Path("user") user: String): Observable<List<Repo>>
20+
}
21+
22+
data class Contributor(val login: String, val contributions: Int)
23+
data class Repo(val name: String)
24+
25+
fun main(args: Array<String>) {
26+
val retrofit = Retrofit.Builder().apply {
27+
baseUrl("https://api.github.com")
28+
addConverterFactory(GsonConverterFactory.create())
29+
addCallAdapterFactory(RxJavaCallAdapterFactory.create())
30+
}.build()
31+
32+
val github = retrofit.create(GitHub::class.java)
33+
34+
asyncRx<Unit> {
35+
val contributors =
36+
github.contributors("JetBrains", "Kotlin")
37+
.awaitSingle().take(10)
38+
39+
for ((name, contributions) in contributors) {
40+
println("$name has $contributions contributions, other repos: ")
41+
42+
val otherRepos =
43+
github.listRepos(name).awaitSingle()
44+
.map { it.name }.joinToString(", ")
45+
46+
println(otherRepos)
47+
}
48+
}
49+
}

kotlinx-coroutines-rx/pom.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>org.jetbrains.kotlinx</groupId>
8+
<artifactId>kotlinx-coroutines</artifactId>
9+
<version>0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>kotlinx-coroutines-rx</artifactId>
13+
<packaging>jar</packaging>
14+
15+
<name>Kotlin coroutines support for Rx library</name>
16+
17+
<properties>
18+
<kotlin.version>1.1-SNAPSHOT</kotlin.version>
19+
</properties>
20+
21+
<build>
22+
<sourceDirectory>src/main/kotlin</sourceDirectory>
23+
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
24+
</build>
25+
26+
<dependencies>
27+
<dependency>
28+
<groupId>io.reactivex</groupId>
29+
<artifactId>rxjava</artifactId>
30+
<version>1.1.5</version>
31+
</dependency>
32+
</dependencies>
33+
34+
</project>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package kotlinx.coroutines
2+
3+
import rx.Observable
4+
import rx.subjects.AsyncSubject
5+
6+
/**
7+
* Run asynchronous computations based on [c] coroutine parameter
8+
*
9+
* Execution starts immediately within the 'asyncRx' call and it runs until
10+
* the first suspension point is reached ('*await' call for some Observable instance).
11+
* Remaining part of coroutine will be executed as it's passed into 'subscribe'
12+
* call of awaited Observable.
13+
*
14+
* @param c a coroutine representing reactive computations
15+
*
16+
* @return Observable with single value containing expression returned from coroutine
17+
*/
18+
fun <T> asyncRx(
19+
coroutine c: Controller<T>.() -> Continuation<Unit>
20+
): Observable<T> {
21+
val controller = Controller<T>()
22+
c(controller).resume(Unit)
23+
24+
return controller.result
25+
}
26+
27+
class Controller<T> internal constructor() {
28+
internal val result: AsyncSubject<T> = AsyncSubject.create<T>()
29+
30+
suspend fun <V> Observable<V>.awaitFirst(x: Continuation<V>) {
31+
this.first().subscribeWithContinuation(x)
32+
}
33+
34+
suspend fun <V> Observable<V>.awaitLast(x: Continuation<V>) {
35+
this.last().subscribeWithContinuation(x)
36+
}
37+
38+
suspend fun <V> Observable<V>.awaitSingle(x: Continuation<V>) {
39+
this.single().subscribeWithContinuation(x)
40+
}
41+
42+
private fun <V> Observable<V>.subscribeWithContinuation(x: Continuation<V>) {
43+
subscribe(x::resume, x::resumeWithException)
44+
}
45+
46+
suspend fun <V> Observable<V>.applyForEachAndAwait(
47+
block: (V) -> Unit,
48+
x: Continuation<Unit>
49+
) {
50+
this.subscribe(block, x::resumeWithException, { x.resume(Unit) })
51+
}
52+
53+
operator fun handleResult(v: T, x: Continuation<Nothing>) {
54+
result.onNext(v)
55+
result.onCompleted()
56+
}
57+
58+
operator fun handleException(t: Throwable, x: Continuation<Nothing>) {
59+
result.onError(t)
60+
}
61+
}

0 commit comments

Comments
 (0)