Skip to content

Commit 038c6b7

Browse files
committed
Add ReceiveChannel.toList(destination)
Use case: collecting elements up until the point the channel is closed without losing the elements when `toList` when the exception is thrown. This function is similar to `Flow<T>.toList(destination)`, which we already have, so the addition makes sense from the point of view of consistency as well.
1 parent 7a32e3e commit 038c6b7

File tree

2 files changed

+71
-2
lines changed

2 files changed

+71
-2
lines changed

kotlinx-coroutines-core/common/src/channels/Channels.common.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,43 @@ public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
194194
}
195195
}
196196

197+
/**
198+
* Consumes the elements of this channel into the given [destination] mutable list.
199+
*
200+
* This function will attempt to receive elements and put them into the list until the channel is
201+
* [closed][SendChannel.close].
202+
* Calling [toList] on channels that are not eventually closed is always incorrect:
203+
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
204+
* - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
205+
* until exhausting it.
206+
*
207+
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
208+
* However, the [destination] list is left in a consistent state containing all the elements received from the channel
209+
* up to that point.
210+
*
211+
* The operation is _terminal_.
212+
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
213+
*
214+
* Example:
215+
* ```
216+
* val values = listOf(1, 5, 2, 9, 3, 3, 1)
217+
* // start a new coroutine that creates a channel,
218+
* // sends elements to it, and closes it
219+
* // once the coroutine's body finishes
220+
* val channel = produce {
221+
* values.forEach { send(it) }
222+
* }
223+
* val destination = mutableListOf<Int>()
224+
* channel.toList(destination)
225+
* check(destination == values)
226+
* ```
227+
*/
228+
public suspend inline fun <T> ReceiveChannel<T>.toList(destination: MutableList<T>) {
229+
consumeEach {
230+
destination.add(it)
231+
}
232+
}
233+
197234
@PublishedApi
198235
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
199236
cancel(cause?.let {

kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,13 @@ class ChannelsTest: TestBase() {
8585
}
8686

8787
@Test
88-
fun testEmptyList() = runTest {
88+
fun testEmptyToList() = runTest {
8989
assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
9090
}
9191

9292
@Test
9393
fun testToList() = runTest {
9494
assertEquals(testList, testList.asReceiveChannel().toList())
95-
9695
}
9796

9897
@Test
@@ -104,6 +103,39 @@ class ChannelsTest: TestBase() {
104103
}
105104
}
106105

106+
@Test
107+
fun testEmptyToListWithDestination() = runTest {
108+
val initialList = listOf(-1, -2, -3)
109+
val destination = initialList.toMutableList()
110+
emptyList<Nothing>().asReceiveChannel().toList(destination)
111+
assertEquals(initialList, destination)
112+
}
113+
114+
@Test
115+
fun testToListWithDestination() = runTest {
116+
val initialList = listOf(-1, -2, -3)
117+
val destination = initialList.toMutableList()
118+
testList.asReceiveChannel().toList(destination)
119+
assertEquals(initialList + testList, destination)
120+
}
121+
122+
@Test
123+
fun testToListWithDestinationOnFailedChannel() = runTest {
124+
val initialList = listOf(-1, -2, -3)
125+
val destination = initialList.toMutableList()
126+
val channel = Channel<Int>(10)
127+
val elementsToSend = (1..5).toList()
128+
elementsToSend.forEach {
129+
val result = channel.trySend(it)
130+
assertTrue(result.isSuccess)
131+
}
132+
channel.close(TestException())
133+
assertFailsWith<TestException> {
134+
channel.toList(destination)
135+
}
136+
assertEquals(initialList + elementsToSend, destination)
137+
}
138+
107139
private fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
108140
GlobalScope.produce(context) {
109141
for (element in this@asReceiveChannel)

0 commit comments

Comments
 (0)