Skip to content

Commit d6bb148

Browse files
konrad-kaminskielizarov
authored andcommitted
Add support for Rx2 Maybe. (#45)
Add support for Rx2 Maybe
1 parent 804d036 commit d6bb148

File tree

8 files changed

+430
-5
lines changed

8 files changed

+430
-5
lines changed

reactive/kotlinx-coroutines-rx2/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Coroutine builders:
77
| **Name** | **Result** | **Scope** | **Description**
88
| --------------- | --------------------------------------- | ---------------- | ---------------
99
| [rxCompletable] | `Completable` | [CoroutineScope] | Cold completable that starts coroutine on subscribe
10+
| [rxMaybe] | `Maybe` | [CoroutineScope] | Cold maybe that starts coroutine on subscribe
1011
| [rxSingle] | `Single` | [CoroutineScope] | Cold single that starts coroutine on subscribe
1112
| [rxObservable] | `Observable` | [ProducerScope] | Cold observable that starts coroutine on subscribe
1213
| [rxFlowable] | `Flowable` | [ProducerScope] | Cold observable that starts coroutine on subscribe with **backpressure** support
@@ -16,6 +17,9 @@ Suspending extension functions and suspending iteration:
1617
| **Name** | **Description**
1718
| -------- | ---------------
1819
| [CompletableSource.await][io.reactivex.CompletableSource.await] | Awaits for completion of the completable value
20+
| [MaybeSource.await][io.reactivex.MaybeSource.await] | Awaits for the value of the maybe and returns it or null
21+
| [MaybeSource.awaitOrDefault][io.reactivex.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default
22+
| [MaybeSource.open][io.reactivex.MaybeSource.open] | Subscribes to maybe and returns [ReceiveChannel]
1923
| [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it
2024
| [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
2125
| [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
@@ -50,10 +54,14 @@ Conversion functions:
5054
<!--- DOCS_ROOT reactive/kotlinx-coroutines-rx2/target/dokka/kotlinx-coroutines-rx2 -->
5155
<!--- INDEX kotlinx.coroutines.experimental.rx2 -->
5256
[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-completable.html
57+
[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-maybe.html
5358
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-single.html
5459
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-observable.html
5560
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
5661
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-completable-source/await.html
62+
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await.html
63+
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await-or-default.html
64+
[io.reactivex.MaybeSource.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/open.html
5765
[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-single-source/await.html
5866
[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first.html
5967
[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-default.html

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,38 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
4040
})
4141
}
4242

43+
// ------------------------ MaybeSource ------------------------
44+
45+
/**
46+
* Awaits for completion of the maybe without blocking a thread.
47+
* Returns the resulting value, null if no value was produced or throws the corresponding exception if this
48+
* maybe had produced error.
49+
*
50+
* This suspending function is cancellable.
51+
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
52+
* immediately resumes with [CancellationException].
53+
*/
54+
@Suppress("UNCHECKED_CAST")
55+
public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
56+
57+
/**
58+
* Awaits for completion of the maybe without blocking a thread.
59+
* Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
60+
* maybe had produced error.
61+
*
62+
* This suspending function is cancellable.
63+
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
64+
* immediately resumes with [CancellationException].
65+
*/
66+
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
67+
subscribe(object : MaybeObserver<T> {
68+
override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
69+
override fun onComplete() { cont.resume(default) }
70+
override fun onSuccess(t: T) { cont.resume(t) }
71+
override fun onError(error: Throwable) { cont.resumeWithException(error) }
72+
})
73+
}
74+
4375
// ------------------------ SingleSource ------------------------
4476

4577
/**

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental.rx2
1818

19+
import io.reactivex.MaybeObserver
20+
import io.reactivex.MaybeSource
1921
import io.reactivex.Observable
2022
import io.reactivex.ObservableSource
2123
import io.reactivex.Observer
@@ -25,7 +27,7 @@ import kotlinx.coroutines.experimental.channels.ReceiveChannel
2527
import java.io.Closeable
2628

2729
/**
28-
* Return type for [ObservableSource.open] that can be used to [receive] elements from the
30+
* Return type for [ObservableSource.open] and [MaybeSource.open] that can be used to [receive] elements from the
2931
* subscription and to manually [close] it.
3032
*/
3133
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
@@ -35,6 +37,15 @@ public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeabl
3537
public override fun close()
3638
}
3739

40+
/**
41+
* Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
42+
*/
43+
public fun <T> MaybeSource<T>.open(): SubscriptionReceiveChannel<T> {
44+
val channel = SubscriptionChannel<T>()
45+
subscribe(channel)
46+
return channel
47+
}
48+
3849
/**
3950
* Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
4051
*/
@@ -57,6 +68,16 @@ public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> {
5768
"Use `source.consumeEach { x -> ... }`.")
5869
public operator fun <T> ObservableSource<T>.iterator() = open().iterator()
5970

71+
/**
72+
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
73+
*/
74+
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
75+
public suspend fun <T> MaybeSource<T>.consumeEach(action: suspend (T) -> Unit) {
76+
open().use { channel ->
77+
for (x in channel) action(x)
78+
}
79+
}
80+
6081
/**
6182
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
6283
*/
@@ -67,7 +88,7 @@ public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Un
6788
}
6889
}
6990

70-
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T> {
91+
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T>, MaybeObserver<T> {
7192
@Volatile
7293
var subscription: Disposable? = null
7394

@@ -86,6 +107,10 @@ private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionRecei
86107
subscription = sub
87108
}
88109

110+
override fun onSuccess(t: T) {
111+
offer(t)
112+
}
113+
89114
override fun onNext(t: T) {
90115
offer(t)
91116
}

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717
package kotlinx.coroutines.experimental.rx2
1818

1919
import io.reactivex.Completable
20+
import io.reactivex.Maybe
2021
import kotlinx.coroutines.experimental.Deferred
2122
import kotlinx.coroutines.experimental.Job
2223
import kotlinx.coroutines.experimental.channels.ReceiveChannel
23-
import kotlinx.coroutines.experimental.launch
2424
import io.reactivex.Observable
2525
import io.reactivex.Single
26-
import io.reactivex.functions.Cancellable
27-
import org.reactivestreams.Subscription
2826
import kotlin.coroutines.experimental.CoroutineContext
2927

3028
/**
@@ -40,6 +38,19 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
4038
this@asCompletable.join()
4139
}
4240

41+
/**
42+
* Converts this deferred value to the hot reactive maybe that signals
43+
* [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
44+
*
45+
* Every subscriber gets the same completion value.
46+
* Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
47+
*
48+
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
49+
*/
50+
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe<T>(context) {
51+
this@asMaybe.await()
52+
}
53+
4354
/**
4455
* Converts this deferred value to the hot reactive single that signals either
4556
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx2
18+
19+
import io.reactivex.Maybe
20+
import io.reactivex.MaybeEmitter
21+
import io.reactivex.functions.Cancellable
22+
import kotlinx.coroutines.experimental.AbstractCoroutine
23+
import kotlinx.coroutines.experimental.CoroutineScope
24+
import kotlinx.coroutines.experimental.Job
25+
import kotlinx.coroutines.experimental.newCoroutineContext
26+
import kotlin.coroutines.experimental.CoroutineContext
27+
import kotlin.coroutines.experimental.startCoroutine
28+
29+
/**
30+
* Creates cold [maybe][Maybe] that will run a given [block] in a coroutine.
31+
* Every time the returned observable is subscribed, it starts a new coroutine in the specified [context].
32+
* Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
33+
*
34+
* | **Coroutine action** | **Signal to subscriber**
35+
* | ------------------------------------- | ------------------------
36+
* | Returns a non-null value | `onSuccess`
37+
* | Returns a null | `onComplete`
38+
* | Failure with exception or unsubscribe | `onError`
39+
*/
40+
public fun <T> rxMaybe(
41+
context: CoroutineContext,
42+
block: suspend CoroutineScope.() -> T?
43+
): Maybe<T> = Maybe.create { subscriber ->
44+
val newContext = newCoroutineContext(context)
45+
val coroutine = RxMaybeCoroutine(newContext, subscriber)
46+
coroutine.initParentJob(context[Job])
47+
subscriber.setCancellable(coroutine)
48+
block.startCoroutine(coroutine, coroutine)
49+
}
50+
51+
private class RxMaybeCoroutine<T>(
52+
override val parentContext: CoroutineContext,
53+
private val subscriber: MaybeEmitter<T>
54+
) : AbstractCoroutine<T>(true), Cancellable {
55+
@Suppress("UNCHECKED_CAST")
56+
override fun afterCompletion(state: Any?, mode: Int) {
57+
if (subscriber.isDisposed) return
58+
when {
59+
state is CompletedExceptionally -> subscriber.onError(state.exception)
60+
state != null -> subscriber.onSuccess(state as T)
61+
else -> subscriber.onComplete()
62+
}
63+
}
64+
65+
// Cancellable impl
66+
override fun cancel() { cancel(cause = null) }
67+
}

reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental.rx2
1818

19+
import io.reactivex.Maybe
1920
import io.reactivex.Observable
2021
import io.reactivex.Single
2122

@@ -55,3 +56,23 @@ fun checkErroneous(
5556
}
5657
}
5758

59+
fun <T> checkMaybeValue(
60+
maybe: Maybe<T>,
61+
checker: (T?) -> Unit
62+
) {
63+
val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull()
64+
checker(maybeValue)
65+
}
66+
67+
fun checkErroneous(
68+
maybe: Maybe<*>,
69+
checker: (Throwable) -> Unit
70+
) {
71+
try {
72+
(maybe as Maybe<Any>).blockingGet()
73+
error("Should have failed")
74+
} catch (e: Throwable) {
75+
checker(e)
76+
}
77+
}
78+

reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kotlinx.coroutines.experimental.rx2
1919
import kotlinx.coroutines.experimental.*
2020
import kotlinx.coroutines.experimental.channels.produce
2121
import org.junit.Assert.assertEquals
22+
import org.junit.Assert.assertNull
2223
import org.junit.Test
2324

2425
class ConvertTest : TestBase() {
@@ -55,6 +56,50 @@ class ConvertTest : TestBase() {
5556
finish(5)
5657
}
5758

59+
@Test
60+
fun testToMaybe() {
61+
val d = async(CommonPool) {
62+
delay(50)
63+
"OK"
64+
}
65+
val maybe1 = d.asMaybe(Unconfined)
66+
checkMaybeValue(maybe1) {
67+
assertEquals("OK", it)
68+
}
69+
val maybe2 = d.asMaybe(Unconfined)
70+
checkMaybeValue(maybe2) {
71+
assertEquals("OK", it)
72+
}
73+
}
74+
75+
@Test
76+
fun testToMaybeEmpty() {
77+
val d = async(CommonPool) {
78+
delay(50)
79+
null
80+
}
81+
val maybe1 = d.asMaybe(Unconfined)
82+
checkMaybeValue(maybe1, ::assertNull)
83+
val maybe2 = d.asMaybe(Unconfined)
84+
checkMaybeValue(maybe2, ::assertNull)
85+
}
86+
87+
@Test
88+
fun testToMaybeFail() {
89+
val d = async(CommonPool) {
90+
delay(50)
91+
throw TestException("OK")
92+
}
93+
val maybe1 = d.asMaybe(Unconfined)
94+
checkErroneous(maybe1) {
95+
check(it is TestException && it.message == "OK") { "$it" }
96+
}
97+
val maybe2 = d.asMaybe(Unconfined)
98+
checkErroneous(maybe2) {
99+
check(it is TestException && it.message == "OK") { "$it" }
100+
}
101+
}
102+
58103
@Test
59104
fun testToSingle() {
60105
val d = async(CommonPool) {

0 commit comments

Comments
 (0)