Skip to content

Commit 835ed4d

Browse files
authored
Promote reactive bridges for Flow to stable API (#1549)
* Promote reactive adapters for Flow to stable API * Promote Publisher<T>.collect to stable API * Promote rx2 extensions to stable, increase deprecation level for obsolete reactive primitives
1 parent 353510a commit 835ed4d

File tree

18 files changed

+40
-50
lines changed

18 files changed

+40
-50
lines changed

reactive/kotlinx-coroutines-reactive/src/Channel.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,21 @@ import org.reactivestreams.*
2020
* @param request how many items to request from publisher in advance (optional, one by default).
2121
*/
2222
@ObsoleteCoroutinesApi
23-
@Suppress("CONFLICTING_OVERLOADS")
2423
public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
2524
val channel = SubscriptionChannel<T>(request)
2625
subscribe(channel)
2726
return channel
2827
}
2928

3029
// Will be promoted to error in 1.3.0, removed in 1.4.0
31-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
30+
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
3231
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
3332
openSubscription().consumeEach(action)
3433

3534
/**
3635
* Subscribes to this [Publisher] and performs the specified action for each received element.
3736
* Cancels subscription if any exception happens during collect.
3837
*/
39-
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
4038
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
4139
openSubscription().consumeEach(action)
4240

reactive/kotlinx-coroutines-reactive/src/Publish.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public fun <T> publish(
4848

4949
@Deprecated(
5050
message = "CoroutineScope.publish is deprecated in favour of top-level publish",
51-
level = DeprecationLevel.WARNING,
51+
level = DeprecationLevel.ERROR,
5252
replaceWith = ReplaceWith("publish(context, block)")
5353
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
5454
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ import kotlin.coroutines.*
2323
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
2424
* are discarded.
2525
*/
26-
@ExperimentalCoroutinesApi
2726
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
2827
PublisherAsFlow(this, 1)
2928

3029
/**
3130
* Transforms the given flow to a spec-compliant [Publisher].
3231
*/
33-
@ExperimentalCoroutinesApi
3432
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
3533

3634
private class PublisherAsFlow<T : Any>(

reactive/kotlinx-coroutines-reactor/src/Flux.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public fun <T> flux(
5050

5151
@Deprecated(
5252
message = "CoroutineScope.flux is deprecated in favour of top-level flux",
53-
level = DeprecationLevel.WARNING,
53+
level = DeprecationLevel.ERROR,
5454
replaceWith = ReplaceWith("flux(context, block)")
5555
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
5656
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-reactor/src/Mono.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public fun <T> mono(
3939

4040
@Deprecated(
4141
message = "CoroutineScope.mono is deprecated in favour of top-level mono",
42-
level = DeprecationLevel.WARNING,
42+
level = DeprecationLevel.ERROR,
4343
replaceWith = ReplaceWith("mono(context, block)")
4444
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
4545
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package kotlinx.coroutines.reactor
66

7-
import kotlinx.coroutines.*
87
import kotlinx.coroutines.flow.Flow
98
import kotlinx.coroutines.flow.flowOn
109
import kotlinx.coroutines.reactive.FlowSubscription
@@ -15,7 +14,6 @@ import reactor.core.publisher.Flux
1514
* Converts the given flow to a cold flux.
1615
* The original flow is cancelled when the flux subscriber is disposed.
1716
*/
18-
@ExperimentalCoroutinesApi
1917
public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
2018

2119
private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {

reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class FluxMultiTest : TestBase() {
7373
val mono = mono {
7474
var result = ""
7575
try {
76-
flux.consumeEach { result += it }
76+
flux.collect { result += it }
7777
} catch(e: IOException) {
7878
result += e.message
7979
}

reactive/kotlinx-coroutines-rx2/src/RxChannel.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import kotlinx.coroutines.internal.*
1919
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
2020
*/
2121
@ObsoleteCoroutinesApi
22-
@Suppress("CONFLICTING_OVERLOADS")
2322
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
2423
val channel = SubscriptionChannel<T>()
2524
subscribe(channel)
@@ -34,36 +33,33 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
3433
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
3534
*/
3635
@ObsoleteCoroutinesApi
37-
@Suppress("CONFLICTING_OVERLOADS")
3836
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
3937
val channel = SubscriptionChannel<T>()
4038
subscribe(channel)
4139
return channel
4240
}
4341

4442
// Will be promoted to error in 1.3.0, removed in 1.4.0
45-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
43+
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
4644
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
4745
openSubscription().consumeEach(action)
4846

4947
// Will be promoted to error in 1.3.0, removed in 1.4.0
50-
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
48+
@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
5149
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
5250
openSubscription().consumeEach(action)
5351

5452
/**
5553
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
5654
* Cancels subscription if any exception happens during collect.
5755
*/
58-
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
5956
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit) =
6057
openSubscription().consumeEach(action)
6158

6259
/**
6360
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
6461
* Cancels subscription if any exception happens during collect.
6562
*/
66-
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
6763
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit) =
6864
openSubscription().consumeEach(action)
6965

reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public fun rxCompletable(
3636

3737
@Deprecated(
3838
message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable",
39-
level = DeprecationLevel.WARNING,
39+
level = DeprecationLevel.ERROR,
4040
replaceWith = ReplaceWith("rxCompletable(context, block)")
4141
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
4242
@LowPriorityInOverloadResolution

reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public fun <T: Any> rxFlowable(
4545

4646
@Deprecated(
4747
message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable",
48-
level = DeprecationLevel.WARNING,
48+
level = DeprecationLevel.ERROR,
4949
replaceWith = ReplaceWith("rxFlowable(context, block)")
5050
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
5151
@LowPriorityInOverloadResolution

0 commit comments

Comments
 (0)