Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions kotlinx-coroutines-core/common/src/channels/Channels.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,43 @@ public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re the original toList() docmuentation - shall it say that toList must be used by a single consumer (the implication of consumeEach and the iterator used there)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already says so:

  • The operation is terminal.
  • This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the UX is bad and in its current form people will use it incorrectly with a higher probability.

Terminal doesn't redirect anywhere.

Oh wait, you can technically call it from different coroutines, it just doesn't make much sense. Nevermind then.

* Consumes the elements of this channel into the given [destination] mutable list.
*
* This function will attempt to receive elements and put them into the list until the channel is
* [closed][SendChannel.close].
* Calling [toList] on channels that are not eventually closed is always incorrect:
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
* - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
* until exhausting it.
*
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
* However, the [destination] list is left in a consistent state containing all the elements received from the channel
* up to that point.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* Example:
* ```
* val values = listOf(1, 5, 2, 9, 3, 3, 1)
* // start a new coroutine that creates a channel,
* // sends elements to it, and closes it
* // once the coroutine's body finishes
* val channel = produce {
* values.forEach { send(it) }
* }
* val destination = mutableListOf<Int>()
* channel.toList(destination)
* check(destination == values)
* ```
*/
public suspend inline fun <T> ReceiveChannel<T>.toList(destination: MutableList<T>) {
consumeEach {
destination.add(it)
}
}

@PublishedApi
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let {
Expand Down
36 changes: 34 additions & 2 deletions kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@ class ChannelsTest: TestBase() {
}

@Test
fun testEmptyList() = runTest {
fun testEmptyToList() = runTest {
assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
}

@Test
fun testToList() = runTest {
assertEquals(testList, testList.asReceiveChannel().toList())

}

@Test
Expand All @@ -104,6 +103,39 @@ class ChannelsTest: TestBase() {
}
}

@Test
fun testEmptyToListWithDestination() = runTest {
val initialList = listOf(-1, -2, -3)
val destination = initialList.toMutableList()
emptyList<Nothing>().asReceiveChannel().toList(destination)
assertEquals(initialList, destination)
}

@Test
fun testToListWithDestination() = runTest {
val initialList = listOf(-1, -2, -3)
val destination = initialList.toMutableList()
testList.asReceiveChannel().toList(destination)
assertEquals(initialList + testList, destination)
}

@Test
fun testToListWithDestinationOnFailedChannel() = runTest {
val initialList = listOf(-1, -2, -3)
val destination = initialList.toMutableList()
val channel = Channel<Int>(10)
val elementsToSend = (1..5).toList()
elementsToSend.forEach {
val result = channel.trySend(it)
assertTrue(result.isSuccess)
}
channel.close(TestException())
assertFailsWith<TestException> {
channel.toList(destination)
}
assertEquals(initialList + elementsToSend, destination)
}

private fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
GlobalScope.produce(context) {
for (element in this@asReceiveChannel)
Expand Down