Skip to content

Commit 804d036

Browse files
konrad-kaminskielizarov
authored andcommitted
#49 Rx1 Completable.awaitCompleted implementation. (#50)
#49 Rx1 Completable.awaitCompleted implementation.
1 parent 523516a commit 804d036

File tree

3 files changed

+47
-0
lines changed

3 files changed

+47
-0
lines changed

reactive/kotlinx-coroutines-rx1/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Suspending extension functions and suspending iteration:
1414

1515
| **Name** | **Description**
1616
| -------- | ---------------
17+
| [Completable.awaitCompleted][rx.Completable.awaitCompleted] | Awaits for completion of the completable value
1718
| [Single.await][rx.Single.await] | Awaits for completion of the single value and returns it
1819
| [Observable.awaitFirst][rx.Observable.awaitFirst] | Returns the first value from the given observable
1920
| [Observable.awaitFirstOrDefault][rx.Observable.awaitFirstOrDefault] | Returns the first value from the given observable or default
@@ -44,6 +45,7 @@ Conversion functions:
4445
[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-completable.html
4546
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-single.html
4647
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-observable.html
48+
[rx.Completable.awaitCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-completable/await-completed.html
4749
[rx.Single.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-single/await.html
4850
[rx.Observable.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first.html
4951
[rx.Observable.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-default.html

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@ import kotlinx.coroutines.experimental.Job
2222
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
2323
import rx.*
2424

25+
// ------------------------ Completable ------------------------
26+
27+
/**
28+
* Awaits for completion of this completable without blocking a thread.
29+
* Returns `Unit` or throws the corresponding exception if this completable had produced error.
30+
*
31+
* This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
32+
* suspending function is suspended, this function immediately resumes with [CancellationException].
33+
*/
34+
public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
35+
subscribe(object : CompletableSubscriber {
36+
override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCompletion(s) }
37+
override fun onCompleted() { cont.resume(Unit) }
38+
override fun onError(e: Throwable) { cont.resumeWithException(e) }
39+
})
40+
}
41+
2542
// ------------------------ Single ------------------------
2643

2744
/**

reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/CompletableTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,32 @@ class CompletableTest : TestBase() {
8383
yield()
8484
finish(7)
8585
}
86+
87+
@Test
88+
fun testAwaitSuccess() = runBlocking<Unit> {
89+
expect(1)
90+
val completable = rxCompletable(context) {
91+
expect(3)
92+
}
93+
expect(2)
94+
completable.awaitCompleted() // shall launch coroutine
95+
finish(4)
96+
}
97+
98+
@Test
99+
fun testAwaitFailure() = runBlocking<Unit> {
100+
expect(1)
101+
val completable = rxCompletable(context) {
102+
expect(3)
103+
throw RuntimeException("OK")
104+
}
105+
expect(2)
106+
try {
107+
completable.awaitCompleted() // shall launch coroutine and throw exception
108+
expectUnreached()
109+
} catch (e: RuntimeException) {
110+
finish(4)
111+
assertThat(e.message, IsEqual("OK"))
112+
}
113+
}
86114
}

0 commit comments

Comments
 (0)