diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt index 986a41bd06..fc9d0a9fa2 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt @@ -17,7 +17,7 @@ internal class ChannelFlowTransformLatest( ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow) override suspend fun flowCollect(collector: FlowCollector) { - assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream + assert { collector is SendingCollector } // So cancellation behavior is not leaking into the downstream coroutineScope { var previousFlow: Job? = null flow.collect { value -> @@ -25,8 +25,8 @@ internal class ChannelFlowTransformLatest( cancel(ChildCancelledException()) join() } - // Do not pay for dispatch here, it's never necessary - previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { + // Dispatch to avoid blocking the upstream flow, which may produce more values immediately + previousFlow = launch { collector.transform(value) } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt index b98e97d442..a64b090520 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapLatestTest.kt @@ -7,26 +7,45 @@ import kotlin.test.* class FlatMapLatestTest : TestBase() { @Test - fun testFlatMapLatest() = runTest { - val flow = flowOf(1, 2, 3).flatMapLatest { value -> + fun testFlatMapLatestSuspension() = runTest { + val flow = flowOfYielding(1, 2, 3).flatMapLatest { value -> flowOf(value, value + 1) } assertEquals(listOf(1, 2, 2, 3, 3, 4), flow.toList()) } @Test - fun testEmission() = runTest { + fun testEmissionSuspension() = runTest { val list = flow { repeat(5) { emit(it) + yield() } }.flatMapLatest { flowOf(it) }.toList() assertEquals(listOf(0, 1, 2, 3, 4), list) } + @Test + fun testFlatMapLatestNoSuspension() = runTest { + val flow = flowOf(1, 2, 3).flatMapLatest { value -> + flowOf(value, value + 1) + } + assertEquals(listOf(3, 4), flow.toList()) + } + + @Test + fun testEmissionNoSuspension() = runTest { + val list = flow { + repeat(5) { + emit(it) + } + }.flatMapLatest { flowOf(it) }.toList() + assertEquals(listOf(4), list) + } + @Test fun testSwitchIntuitiveBehaviour() = runTest { - val flow = flowOf(1, 2, 3, 4, 5) + val flow = flowOfYielding(1, 2, 3, 4, 5) flow.flatMapLatest { flow { expect(it) @@ -40,22 +59,24 @@ class FlatMapLatestTest : TestBase() { } @Test - fun testSwitchRendevouzBuffer() = runTest { - val flow = flowOf(1, 2, 3, 4, 5) + fun testSwitchRendezvousBuffer() = runTest { + val flow = flowOfYielding(1, 2, 3, 4, 5) flow.flatMapLatest { flow { emit(it) // Reach here every uneven element because of channel's unfairness expect(it) } - }.buffer(0).onEach { expect(it + 1) } - .collect() + }.buffer(0).collect { + expect(it + 1) + yield() // give the `flowOfYielding` a chance to cancel the previous flow + } finish(7) } @Test fun testHangFlows() = runTest { - val flow = listOf(1, 2, 3, 4).asFlow() + val flow = flowOfYielding(1, 2, 3, 4) val result = flow.flatMapLatest { value -> flow { if (value != 4) hang { expect(value) } @@ -74,7 +95,7 @@ class FlatMapLatestTest : TestBase() { @Test fun testFailureInTransform() = runTest { - val flow = flowOf(1, 2).flatMapLatest { value -> + val flow = flowOfYielding(1, 2).flatMapLatest { value -> flow { if (value == 1) { emit(1) @@ -128,7 +149,7 @@ class FlatMapLatestTest : TestBase() { @Test fun testTake() = runTest { - val flow = flowOf(1, 2, 3, 4, 5).flatMapLatest { flowOf(it) } + val flow = flowOfYielding(1, 2, 3, 4, 5).flatMapLatest { flowOf(it) } assertEquals(listOf(1), flow.take(1).toList()) } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt index e072eabd71..2e76681c7b 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/TransformLatestTest.kt @@ -7,8 +7,8 @@ import kotlin.test.* class TransformLatestTest : TestBase() { @Test - fun testTransformLatest() = runTest { - val flow = flowOf(1, 2, 3).transformLatest { value -> + fun testTransformLatestSuspension() = runTest { + val flow = flowOfYielding(1, 2, 3).transformLatest { value -> emit(value) emit(value + 1) } @@ -19,6 +19,7 @@ class TransformLatestTest : TestBase() { fun testEmission() = runTest { val list = flow { repeat(5) { + yield() emit(it) } }.transformLatest { @@ -29,7 +30,7 @@ class TransformLatestTest : TestBase() { @Test fun testSwitchIntuitiveBehaviour() = runTest { - val flow = flowOf(1, 2, 3, 4, 5) + val flow = flowOfYielding(1, 2, 3, 4, 5) flow.transformLatest { expect(it) emit(it) @@ -42,28 +43,48 @@ class TransformLatestTest : TestBase() { @Test fun testSwitchRendezvousBuffer() = runTest { - val flow = flowOf(1, 2, 3, 4, 5) + val flow = flowOfYielding(1, 2, 3, 4, 5) flow.transformLatest { emit(it) // Reach here every uneven element because of channel's unfairness expect(it) - }.buffer(0).onEach { expect(it + 1) }.collect() + }.buffer(0).collect { + expect(it + 1) + yield() // give the `flowOfYielding` a chance to cancel the previous flow + } finish(7) } @Test fun testSwitchBuffer() = runTest { - val flow = flowOf(1, 2, 3, 42, 4) + val allowCollecting = CompletableDeferred() + val flow = flow { + emit(-1) + repeat(10) { + yield() + emit(it) + } + allowCollecting.complete(Unit) + } flow.transformLatest { emit(it) - expect(it) - }.buffer(2).collect() - finish(5) + }.buffer(2).collect { + when(it) { + -1 -> { + // a start signal. Now we emulate a slow collector. + allowCollecting.await() // sleep for a long time + } + 0 -> expect(1) + 1 -> expect(2) + 9 -> expect(3) + } + } + finish(4) } @Test fun testHangFlows() = runTest { - val flow = listOf(1, 2, 3, 4).asFlow() + val flow = listOf(1, 2, 3, 4).asFlow().onEach { yield() } val result = flow.transformLatest { value -> if (value != 4) hang { expect(value) } emit(42) @@ -83,27 +104,27 @@ class TransformLatestTest : TestBase() { val flow = flow { assertEquals("source", NamedDispatchers.name()) expect(1) - emit(4) + emit(-1) // will be cancelled by the later value expect(2) - emit(5) + emit(4) expect(3) - }.flowOn(NamedDispatchers("source")).transformLatest { value -> + }.flowOn(NamedDispatchers("source")).transformLatest { value -> emitAll(flow { assertEquals("switch$value", NamedDispatchers.name()) expect(value) emit(value) }.flowOn(NamedDispatchers("switch$value"))) }.onEach { - expect(it + 2) + expect(it + 1) assertEquals("main", NamedDispatchers.nameOr("main")) } - assertEquals(2, flow.count()) - finish(8) + assertEquals(1, flow.count()) + finish(6) } @Test fun testFailureInTransform() = runTest { - val flow = flowOf(1, 2).transformLatest { value -> + val flow = flowOfYielding(1, 2).transformLatest { value -> if (value == 1) { emit(1) hang { expect(1) } @@ -151,7 +172,20 @@ class TransformLatestTest : TestBase() { @Test fun testTake() = runTest { - val flow = flowOf(1, 2, 3, 4, 5).transformLatest { emit(it) } + val flow = flowOfYielding(1, 2, 3, 4, 5).transformLatest { emit(it) } assertEquals(listOf(1), flow.take(1).toList()) } } + +/** + * The same as [flowOf] but yields before each emission. + * + * This is useful for testing the behavior of operators that cancel the previous emission + * when a new value is emitted, such as [transformLatest]. + */ +internal fun flowOfYielding(vararg values: Int): Flow = flow { + for (value in values) { + yield() + emit(value) + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt index 2cecf8c43f..f818b4dd71 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/CollectLatestTest.kt @@ -5,14 +5,6 @@ import kotlinx.coroutines.* import kotlin.test.* class CollectLatestTest : TestBase() { - @Test - fun testNoSuspension() = runTest { - flowOf(1, 2, 3).collectLatest { - expect(it) - } - finish(4) - } - @Test fun testSuspension() = runTest { flowOf(1, 2, 3).collectLatest { @@ -27,6 +19,7 @@ class CollectLatestTest : TestBase() { try { flow { emit(1) + yield() throw TestException() }.collectLatest { expect(1) } expectUnreached() @@ -50,4 +43,23 @@ class CollectLatestTest : TestBase() { } } -} \ No newline at end of file + + /** Tests that if new values appear immediately, they cancel processing the old value. */ + @Test + fun testNoSuspension() = runTest { + flowOf(3, 2, 1).collectLatest { value -> + expect(value) + } + finish(2) + } + + /** Tests that upstream errors that happen immediately after emitting a value cancel processing the old value. */ + @Test + fun testUpstreamErrorNoSuspension() = runTest({it is TestException}) { + flow { + emit(1) + throw TestException() + }.collectLatest { expectUnreached() } + expectUnreached() + } +} diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt index bf194c0d0d..e566bd7976 100644 --- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* import java.io.* import java.util.concurrent.* import java.util.concurrent.atomic.* @@ -57,4 +58,25 @@ class RunInterruptibleTest : TestBase() { }.join() finish(5) } + + /** + * Tests that [collectLatest] reacts to new elements even if its thread is blocked by [runInterruptible]. + */ + @Test + fun testCollectLatestCancellation() = runTest { + withContext(Dispatchers.IO) { + flow { + repeat(10) { + emit(it) + yield() + } + }.collectLatest { value -> + if (value != 9) { + runInterruptible { + Thread.sleep(Long.MAX_VALUE) + } + } + } + } + } }