From 038c6b74de02b8f88234287ca9a63102b535a4ea Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Sep 2025 10:14:25 +0200 Subject: [PATCH 1/3] Add ReceiveChannel.toList(destination) Use case: collecting elements up until the point the channel is closed without losing the elements when `toList` when the exception is thrown. This function is similar to `Flow.toList(destination)`, which we already have, so the addition makes sense from the point of view of consistency as well. --- .../common/src/channels/Channels.common.kt | 37 +++++++++++++++++++ .../common/test/channels/ChannelsTest.kt | 36 +++++++++++++++++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index 15534b08fe..818460e76b 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -194,6 +194,43 @@ public suspend fun ReceiveChannel.toList(): List = buildList { } } +/** + * Consumes the elements of this channel into the given [destination] mutable list. + * + * This function will attempt to receive elements and put them into the list until the channel is + * [closed][SendChannel.close]. + * Calling [toList] on channels that are not eventually closed is always incorrect: + * - It will suspend indefinitely if the channel is not closed, but no new elements arrive. + * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory + * until exhausting it. + * + * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause. + * However, the [destination] list is left in a consistent state containing all the elements received from the channel + * up to that point. + * + * The operation is _terminal_. + * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * + * Example: + * ``` + * val values = listOf(1, 5, 2, 9, 3, 3, 1) + * // start a new coroutine that creates a channel, + * // sends elements to it, and closes it + * // once the coroutine's body finishes + * val channel = produce { + * values.forEach { send(it) } + * } + * val destination = mutableListOf() + * channel.toList(destination) + * check(destination == values) + * ``` + */ +public suspend inline fun ReceiveChannel.toList(destination: MutableList) { + consumeEach { + destination.add(it) + } +} + @PublishedApi internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { cancel(cause?.let { diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt index 235609c804..83ae2f4e24 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt @@ -85,14 +85,13 @@ class ChannelsTest: TestBase() { } @Test - fun testEmptyList() = runTest { + fun testEmptyToList() = runTest { assertTrue(emptyList().asReceiveChannel().toList().isEmpty()) } @Test fun testToList() = runTest { assertEquals(testList, testList.asReceiveChannel().toList()) - } @Test @@ -104,6 +103,39 @@ class ChannelsTest: TestBase() { } } + @Test + fun testEmptyToListWithDestination() = runTest { + val initialList = listOf(-1, -2, -3) + val destination = initialList.toMutableList() + emptyList().asReceiveChannel().toList(destination) + assertEquals(initialList, destination) + } + + @Test + fun testToListWithDestination() = runTest { + val initialList = listOf(-1, -2, -3) + val destination = initialList.toMutableList() + testList.asReceiveChannel().toList(destination) + assertEquals(initialList + testList, destination) + } + + @Test + fun testToListWithDestinationOnFailedChannel() = runTest { + val initialList = listOf(-1, -2, -3) + val destination = initialList.toMutableList() + val channel = Channel(10) + val elementsToSend = (1..5).toList() + elementsToSend.forEach { + val result = channel.trySend(it) + assertTrue(result.isSuccess) + } + channel.close(TestException()) + assertFailsWith { + channel.toList(destination) + } + assertEquals(initialList + elementsToSend, destination) + } + private fun Iterable.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context) { for (element in this@asReceiveChannel) From 2d8f52b3d5f32f6222ae0c55405bda89380d782a Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Sep 2025 12:48:49 +0200 Subject: [PATCH 2/3] Keep only one overload --- .../api/kotlinx-coroutines-core.api | 4 +- .../api/kotlinx-coroutines-core.klib.api | 1 + .../common/src/channels/Channels.common.kt | 47 +++---------------- 3 files changed, 10 insertions(+), 42 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 2a97c37f6c..015a257854 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -820,7 +820,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt { public static synthetic fun takeWhile$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun toChannel (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlinx/coroutines/channels/SendChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun toCollection (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final synthetic fun toList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun toList$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/List;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun toMap (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toMap (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun toMutableList (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index a839ffcfa0..3c495f4acb 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -1142,6 +1142,7 @@ final suspend fun kotlinx.coroutines/yield() // kotlinx.coroutines/yield|yield() final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/fold(#B, crossinline kotlin.coroutines/SuspendFunction2<#B, #A, #B>): #B // kotlinx.coroutines.flow/fold|fold@kotlinx.coroutines.flow.Flow<0:0>(0:1;kotlin.coroutines.SuspendFunction2<0:1,0:0,0:1>){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/BroadcastChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.BroadcastChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] +final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collect(crossinline kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collect|collect@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectIndexed(crossinline kotlin.coroutines/SuspendFunction2) // kotlinx.coroutines.flow/collectIndexed|collectIndexed@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction2){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.SharedFlow<0:0>(){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index 818460e76b..b805d88ff8 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -161,41 +161,9 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) for (e in this) action(e) } -/** - * Returns a [List] containing all the elements sent to this channel, preserving their order. - * - * This function will attempt to receive elements and put them into the list until the channel is - * [closed][SendChannel.close]. - * Calling [toList] on channels that are not eventually closed is always incorrect: - * - It will suspend indefinitely if the channel is not closed, but no new elements arrive. - * - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory - * until exhausting it. - * - * If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * Example: - * ``` - * val values = listOf(1, 5, 2, 9, 3, 3, 1) - * // start a new coroutine that creates a channel, - * // sends elements to it, and closes it - * // once the coroutine's body finishes - * val channel = produce { - * values.forEach { send(it) } - * } - * check(channel.toList() == values) - * ``` - */ -public suspend fun ReceiveChannel.toList(): List = buildList { - consumeEach { - add(it) - } -} - /** * Consumes the elements of this channel into the given [destination] mutable list. + * If none is provided, a new [ArrayList] will be created. * * This function will attempt to receive elements and put them into the list until the channel is * [closed][SendChannel.close]. @@ -220,16 +188,11 @@ public suspend fun ReceiveChannel.toList(): List = buildList { * val channel = produce { * values.forEach { send(it) } * } - * val destination = mutableListOf() - * channel.toList(destination) - * check(destination == values) + * check(channel.toList() == values) * ``` */ -public suspend inline fun ReceiveChannel.toList(destination: MutableList) { - consumeEach { - destination.add(it) - } -} +public suspend inline fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = + consumeEach(destination::add).let { destination } @PublishedApi internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { @@ -238,3 +201,5 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { }) } +@Deprecated("Preserving binary compatibility, was stable", level = DeprecationLevel.HIDDEN) +public suspend fun ReceiveChannel.toList(): List = toList(ArrayList()) \ No newline at end of file From 6039cf8428670b75a1319a054f3b23a6567b989c Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 10 Sep 2025 13:03:17 +0200 Subject: [PATCH 3/3] Fixup --- .../api/kotlinx-coroutines-core.klib.api | 2 +- .../common/src/channels/Channels.common.kt | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 3c495f4acb..c76a81d1d8 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -1104,6 +1104,7 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel< final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/single(): #A // kotlinx.coroutines.channels/single|single@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/singleOrNull(): #A? // kotlinx.coroutines.channels/singleOrNull|singleOrNull@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] +final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableList(): kotlin.collections/MutableList<#A> // kotlinx.coroutines.channels/toMutableList|toMutableList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableSet(): kotlin.collections/MutableSet<#A> // kotlinx.coroutines.channels/toMutableSet|toMutableSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toSet(): kotlin.collections/Set<#A> // kotlinx.coroutines.channels/toSet|toSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§}[0] @@ -1142,7 +1143,6 @@ final suspend fun kotlinx.coroutines/yield() // kotlinx.coroutines/yield|yield() final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/fold(#B, crossinline kotlin.coroutines/SuspendFunction2<#B, #A, #B>): #B // kotlinx.coroutines.flow/fold|fold@kotlinx.coroutines.flow.Flow<0:0>(0:1;kotlin.coroutines.SuspendFunction2<0:1,0:0,0:1>){0§;1§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/BroadcastChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.BroadcastChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|consumeEach@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§}[0] -final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toList(kotlin.collections/MutableList<#A> = ...): kotlin.collections/List<#A> // kotlinx.coroutines.channels/toList|toList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.collections.MutableList<0:0>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collect(crossinline kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collect|collect@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectIndexed(crossinline kotlin.coroutines/SuspendFunction2) // kotlinx.coroutines.flow/collectIndexed|collectIndexed@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction2){0§}[0] final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.SharedFlow<0:0>(){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index b805d88ff8..3bf9219be8 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -162,7 +162,7 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) } /** - * Consumes the elements of this channel into the given [destination] mutable list. + * [Consumes][consume] the elements of this channel into the given [destination] mutable list. * If none is provided, a new [ArrayList] will be created. * * This function will attempt to receive elements and put them into the list until the channel is @@ -186,12 +186,12 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) * // sends elements to it, and closes it * // once the coroutine's body finishes * val channel = produce { - * values.forEach { send(it) } + * values.forEach { send(it) } * } * check(channel.toList() == values) * ``` */ -public suspend inline fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = +public suspend fun ReceiveChannel.toList(destination: MutableList = ArrayList()): List = consumeEach(destination::add).let { destination } @PublishedApi @@ -202,4 +202,4 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { } @Deprecated("Preserving binary compatibility, was stable", level = DeprecationLevel.HIDDEN) -public suspend fun ReceiveChannel.toList(): List = toList(ArrayList()) \ No newline at end of file +public suspend fun ReceiveChannel.toList(): List = toList(ArrayList())