Skip to content

Commit 502610e

Browse files
authored
Decouple asFlow from batchSize and move it to buffer instead, promote… (#1279)
* Optimize Publisher.asFlow and fix conflation * Use channel.receiveOrNull instead of for loop iteration (it is more efficient) * Calling Publisher.asFlow().produceIn(...) uses a single channel and is implemented via Publisher.openSubscription()
1 parent a563608 commit 502610e

File tree

9 files changed

+265
-45
lines changed

9 files changed

+265
-45
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,11 +916,31 @@ public final class kotlinx/coroutines/flow/MigrationKt {
916916
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
917917
}
918918

919+
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
920+
public final field capacity I
921+
public final field context Lkotlin/coroutines/CoroutineContext;
922+
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
923+
public fun additionalToStringProps ()Ljava/lang/String;
924+
public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
925+
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
926+
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
927+
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
928+
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
929+
public fun toString ()Ljava/lang/String;
930+
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
931+
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
932+
}
933+
919934
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
920935
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
921936
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
922937
}
923938

939+
public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector {
940+
public fun <init> (Lkotlinx/coroutines/channels/SendChannel;)V
941+
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
942+
}
943+
924944
public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
925945
public synthetic fun <init> (II)V
926946
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,5 @@ public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
3131
public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
3232
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
3333
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
34-
public static synthetic fun from$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
3534
}
3635

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@ import kotlin.jvm.*
1515
internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
1616
this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)
1717

18-
// Operators that use channels extend this ChannelFlow and are always fused with each other
19-
internal abstract class ChannelFlow<T>(
18+
/**
19+
* Operators that use channels extend this ChannelFlow and are always fused with each other.
20+
*
21+
* @suppress **This an internal API and should not be used from general code.**
22+
*/
23+
@InternalCoroutinesApi
24+
public abstract class ChannelFlow<T>(
2025
// upstream context
2126
@JvmField val context: CoroutineContext,
2227
// buffer capacity between upstream and downstream context
@@ -62,7 +67,7 @@ internal abstract class ChannelFlow<T>(
6267
fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
6368
scope.broadcast(context, produceCapacity, start, block = collectToFun)
6469

65-
fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
70+
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
6671
scope.flowProduce(context, produceCapacity, block = collectToFun)
6772

6873
override suspend fun collect(collector: FlowCollector<T>) =

kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
89
import kotlinx.coroutines.channels.*
910
import kotlinx.coroutines.channels.ArrayChannel
1011
import kotlinx.coroutines.flow.*
@@ -17,8 +18,13 @@ internal fun <T> FlowCollector<T>.asConcurrentFlowCollector(): ConcurrentFlowCol
1718
// Two basic implementations are here: SendingCollector and ConcurrentFlowCollector
1819
internal interface ConcurrentFlowCollector<T> : FlowCollector<T>
1920

20-
// Concurrent collector because it sends to a channel
21-
internal class SendingCollector<T>(
21+
/**
22+
* Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently.
23+
*
24+
* @suppress **This an internal API and should not be used from general code.**
25+
*/
26+
@InternalCoroutinesApi
27+
public class SendingCollector<T>(
2228
private val channel: SendChannel<T>
2329
) : ConcurrentFlowCollector<T> {
2430
override suspend fun emit(value: T) = channel.send(value)

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ private class FlowCoroutine<T>(
7070
context: CoroutineContext,
7171
uCont: Continuation<T>
7272
) : ScopeCoroutine<T>(context, uCont) {
73-
7473
public override fun childCancelled(cause: Throwable): Boolean {
7574
if (cause is ChildCancelledException) return true
7675
return cancelImpl(cause)
@@ -81,7 +80,6 @@ private class FlowProduceCoroutine<T>(
8180
parentContext: CoroutineContext,
8281
channel: Channel<T>
8382
) : ProducerCoroutine<T>(parentContext, channel) {
84-
8583
public override fun childCancelled(cause: Throwable): Boolean {
8684
if (cause is ChildCancelledException) return true
8785
return cancelImpl(cause)

reactive/kotlinx-coroutines-reactive/src/Channel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
4040
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
4141
openSubscription().consumeEach(action)
4242

43-
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
43+
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
4444
private class SubscriptionChannel<T>(
4545
private val request: Int
4646
) : LinkedListChannel<T>(), Subscriber<T> {

reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt

Lines changed: 92 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,66 +7,123 @@ package kotlinx.coroutines.reactive.flow
77
import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
99
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.flow.internal.*
11+
import kotlinx.coroutines.reactive.*
1012
import org.reactivestreams.*
13+
import kotlin.coroutines.*
1114

1215
/**
1316
* Transforms the given reactive [Publisher] into [Flow].
14-
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
15-
* and [Subscription.request] size.
17+
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
18+
* More precisely, to it specifies the value of the subscription's [request][Subscription.request].
19+
* `1` is used by default.
1620
*
1721
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
1822
* are discarded.
1923
*/
24+
@JvmName("from")
25+
@ExperimentalCoroutinesApi
26+
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
27+
PublisherAsFlow(this, 1)
28+
2029
@FlowPreview
21-
@JvmOverloads // For nice Java API
2230
@JvmName("from")
23-
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int = 1): Flow<T> =
24-
PublisherAsFlow(this, batchSize)
31+
@Deprecated(
32+
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
33+
level = DeprecationLevel.ERROR,
34+
replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"])
35+
)
36+
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)
2537

26-
private class PublisherAsFlow<T : Any>(private val publisher: Publisher<T>, private val batchSize: Int) : Flow<T> {
38+
private class PublisherAsFlow<T : Any>(
39+
private val publisher: Publisher<T>,
40+
capacity: Int
41+
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
42+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
43+
PublisherAsFlow(publisher, capacity)
44+
45+
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
46+
// use another channel for conflation (cannot do openSubscription)
47+
if (capacity < 0) return super.produceImpl(scope)
48+
// Open subscription channel directly
49+
val channel = publisher.openSubscription(capacity)
50+
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
51+
channel.cancel(cause?.let {
52+
it as? CancellationException ?: CancellationException("Job was cancelled", it)
53+
})
54+
}
55+
if (handle != null && handle !== NonDisposableHandle) {
56+
(channel as SendChannel<*>).invokeOnClose {
57+
handle.dispose()
58+
}
59+
}
60+
return channel
61+
}
62+
63+
private val requestSize: Long
64+
get() = when (capacity) {
65+
Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming
66+
Channel.RENDEZVOUS -> 1L // need to request at least one anyway
67+
Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE
68+
else -> capacity.toLong().also { check(it >= 1) }
69+
}
2770

2871
override suspend fun collect(collector: FlowCollector<T>) {
29-
val channel = Channel<T>(batchSize)
30-
val subscriber = ReactiveSubscriber(channel, batchSize)
72+
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
3173
publisher.subscribe(subscriber)
3274
try {
33-
var consumed = 0
34-
for (i in channel) {
35-
collector.emit(i)
36-
if (++consumed == batchSize) {
37-
consumed = 0
38-
subscriber.subscription.request(batchSize.toLong())
75+
var consumed = 0L
76+
while (true) {
77+
val value = subscriber.takeNextOrNull() ?: break
78+
collector.emit(value)
79+
if (++consumed == requestSize) {
80+
consumed = 0L
81+
subscriber.makeRequest()
3982
}
4083
}
4184
} finally {
42-
subscriber.subscription.cancel()
85+
subscriber.cancel()
4386
}
4487
}
4588

46-
@Suppress("SubscriberImplementation")
47-
private class ReactiveSubscriber<T : Any>(
48-
private val channel: Channel<T>,
49-
private val batchSize: Int
50-
) : Subscriber<T> {
89+
// The second channel here is used only for broadcast
90+
override suspend fun collectTo(scope: ProducerScope<T>) =
91+
collect(SendingCollector(scope.channel))
92+
}
5193

52-
lateinit var subscription: Subscription
94+
@Suppress("SubscriberImplementation")
95+
private class ReactiveSubscriber<T : Any>(
96+
capacity: Int,
97+
private val requestSize: Long
98+
) : Subscriber<T> {
99+
private lateinit var subscription: Subscription
100+
private val channel = Channel<T>(capacity)
53101

54-
override fun onComplete() {
55-
channel.close()
56-
}
102+
suspend fun takeNextOrNull(): T? = channel.receiveOrNull()
57103

58-
override fun onSubscribe(s: Subscription) {
59-
subscription = s
60-
s.request(batchSize.toLong())
61-
}
104+
override fun onNext(value: T) {
105+
// Controlled by requestSize
106+
require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" }
107+
}
62108

63-
override fun onNext(t: T) {
64-
// Controlled by batch-size
65-
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
66-
}
109+
override fun onComplete() {
110+
channel.close()
111+
}
67112

68-
override fun onError(t: Throwable?) {
69-
channel.close(t)
70-
}
113+
override fun onError(t: Throwable?) {
114+
channel.close(t)
115+
}
116+
117+
override fun onSubscribe(s: Subscription) {
118+
subscription = s
119+
makeRequest()
120+
}
121+
122+
fun makeRequest() {
123+
subscription.request(requestSize)
124+
}
125+
126+
fun cancel() {
127+
subscription.cancel()
71128
}
72129
}

reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
package kotlinx.coroutines.reactive.flow
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
89
import kotlinx.coroutines.flow.*
910
import kotlinx.coroutines.reactive.*
1011
import kotlin.test.*
1112

1213
class PublisherAsFlowTest : TestBase() {
13-
1414
@Test
1515
fun testCancellation() = runTest {
1616
var onNext = 0
@@ -42,4 +42,108 @@ class PublisherAsFlowTest : TestBase() {
4242
assertEquals(1, onError)
4343
assertEquals(1, onCancelled)
4444
}
45+
46+
@Test
47+
fun testBufferSize1() = runTest {
48+
val publisher = publish {
49+
expect(1)
50+
send(3)
51+
52+
expect(2)
53+
send(5)
54+
55+
expect(4)
56+
send(7)
57+
expect(6)
58+
}
59+
60+
publisher.asFlow().collect {
61+
expect(it)
62+
}
63+
64+
finish(8)
65+
}
66+
67+
@Test
68+
fun testBufferSize10() = runTest {
69+
val publisher = publish {
70+
expect(1)
71+
send(5)
72+
73+
expect(2)
74+
send(6)
75+
76+
expect(3)
77+
send(7)
78+
expect(4)
79+
}
80+
81+
publisher.asFlow().buffer(10).collect {
82+
expect(it)
83+
}
84+
85+
finish(8)
86+
}
87+
88+
@Test
89+
fun testConflated() = runTest {
90+
val publisher = publish {
91+
for (i in 1..5) send(i)
92+
}
93+
val list = publisher.asFlow().conflate().toList()
94+
assertEquals(listOf(1, 5), list)
95+
}
96+
97+
@Test
98+
fun testProduce() = runTest {
99+
val flow = publish { repeat(10) { send(it) } }.asFlow()
100+
check((0..9).toList(), flow.produceIn(this))
101+
check((0..9).toList(), flow.buffer(2).produceIn(this))
102+
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
103+
check(listOf(0, 9), flow.conflate().produceIn(this))
104+
}
105+
106+
private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
107+
val result = ArrayList<Int>(10)
108+
channel.consumeEach { result.add(it) }
109+
assertEquals(expected, result)
110+
}
111+
112+
@Test
113+
fun testProduceCancellation() = runTest {
114+
expect(1)
115+
// publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
116+
val flow = publish {
117+
expect(3)
118+
repeat(10) { value ->
119+
when (value) {
120+
in 0..6 -> send(value)
121+
7 -> try {
122+
send(value)
123+
} catch (e: CancellationException) {
124+
finish(6)
125+
throw e
126+
}
127+
else -> expectUnreached()
128+
}
129+
}
130+
}.asFlow()
131+
assertFailsWith<TestException> {
132+
coroutineScope {
133+
expect(2)
134+
val channel = flow.produceIn(this)
135+
channel.consumeEach { value ->
136+
when (value) {
137+
in 0..4 -> {}
138+
5 -> {
139+
expect(4)
140+
throw TestException()
141+
}
142+
else -> expectUnreached()
143+
}
144+
}
145+
}
146+
}
147+
expect(5)
148+
}
45149
}

0 commit comments

Comments
 (0)