diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index c676eedf62..417aac7374 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -109,14 +109,24 @@ public abstract class ChannelFlow( * For non-atomic start it is possible to observe the situation, * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`) * handlers, while the pipeline before does not, because it was cancelled during its dispatch. - * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks. + * Thus `onCompletion` and `finally` blocks won't be executed, and it may lead to a different kind of memory leaks. */ public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = - scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) + produceImplInternal(scope, CoroutineStart.ATOMIC) + + internal open fun produceImplInternal(scope: CoroutineScope, start: CoroutineStart): ReceiveChannel = + scope.produce(context, produceCapacity, onBufferOverflow, start = start, block = collectToFun) override suspend fun collect(collector: FlowCollector): Unit = coroutineScope { - collector.emitAll(produceImpl(this)) + // If upstream and collect have the same dispatcher, launch the `produce` coroutine undispatched. + // This allows the collector to reliably subscribe to the flow before it starts emitting. + val current = currentCoroutineContext()[ContinuationInterceptor] + val desired = context[ContinuationInterceptor] + val start = if (desired == null || desired == current) { + CoroutineStart.UNDISPATCHED + } else CoroutineStart.ATOMIC + collector.emitAll(produceImplInternal(this, start)) } protected open fun additionalToStringProps(): String? = null diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index c8e9667fb2..a499a6b21f 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -1,8 +1,8 @@ package kotlinx.coroutines.channels -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.testing.* import kotlin.coroutines.* import kotlin.test.* @@ -75,7 +75,7 @@ class ProduceTest : TestBase() { try { c.receive() expectUnreached() - } catch (e: TestCancellationException) { + } catch (_: TestCancellationException) { expect(5) } yield() // to produce @@ -196,7 +196,7 @@ class ProduceTest : TestBase() { assertFailsWith { (channel as ProducerScope<*>).awaitClose() } callbackFlow { expect(1) - launch { + launch(start = CoroutineStart.UNDISPATCHED) { expect(2) assertFailsWith { awaitClose { expectUnreached() } @@ -286,7 +286,7 @@ class ProduceTest : TestBase() { produced.cancel() try { source.receive() - } catch (e: CancellationException) { + } catch (_: CancellationException) { finish(4) } } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 1d7ce160f2..56e2cbdd6c 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -1,8 +1,8 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* import kotlin.test.* class ChannelFlowTest : TestBase() { @@ -20,10 +20,9 @@ class ChannelFlowTest : TestBase() { fun testBuffer() = runTest { val flow = channelFlow { assertTrue(trySend(1).isSuccess) - assertTrue(trySend(2).isSuccess) - assertFalse(trySend(3).isSuccess) + assertFalse(trySend(2).isSuccess) }.buffer(1) - assertEquals(listOf(1, 2), flow.toList()) + assertEquals(listOf(1), flow.toList()) } @Test @@ -34,7 +33,7 @@ class ChannelFlowTest : TestBase() { assertTrue(trySend(3).isSuccess) assertTrue(trySend(4).isSuccess) }.buffer(Channel.CONFLATED) - assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated + assertEquals(listOf(4), flow.toList()) } @Test @@ -164,14 +163,13 @@ class ChannelFlowTest : TestBase() { val flow = channelFlow { // ~ callback-based API, no children outerScope.launch(Job()) { - expect(2) send(1) expectUnreached() } expect(1) } assertEquals(emptyList(), flow.toList()) - finish(3) + finish(2) } @Test @@ -207,6 +205,7 @@ class ChannelFlowTest : TestBase() { @Test fun testDoesntDispatchUnnecessarilyWhenCollected() = runTest { + // Test that `.collectLatest` consistently subscribes to the flow when invoked on the same dispatcher as upstream. expect(1) val myFlow = flow { expect(3) diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt index 2b553a5e64..7cf84434e7 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt @@ -2,9 +2,9 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* import kotlin.test.* class FlowCallbackTest : TestBase() { @@ -14,24 +14,25 @@ class FlowCallbackTest : TestBase() { val flow = callbackFlow { // ~ callback-based API outerScope.launch(Job()) { - expect(2) try { + expect(4) send(1) expectUnreached() } catch (e: IllegalStateException) { - expect(3) + expect(5) assertTrue(e.message!!.contains("awaitClose")) } + finish(6) } expect(1) } try { flow.collect() } catch (e: IllegalStateException) { - expect(4) + expect(2) assertTrue(e.message!!.contains("awaitClose")) } - finish(5) + expect(3) } @Test diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BufferConflationTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BufferConflationTest.kt index 789a7132a2..5795126869 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/BufferConflationTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/BufferConflationTest.kt @@ -1,14 +1,13 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* import kotlin.test.* /** * A _behavioral_ test for conflation options that can be configured by the [buffer] operator to test that it is * implemented properly and that adjacent [buffer] calls are fused properly. -*/ + */ class BufferConflationTest : TestBase() { private val n = 100 // number of elements to emit for test @@ -20,8 +19,8 @@ class BufferConflationTest : TestBase() { expect(1) // emit all and conflate, then collect first & last val expectedList = when (onBufferOverflow) { - BufferOverflow.DROP_OLDEST -> listOf(0) + (n - capacity until n).toList() // first item & capacity last ones - BufferOverflow.DROP_LATEST -> (0..capacity).toList() // first & capacity following ones + BufferOverflow.DROP_OLDEST -> (n - capacity until n).toList() // first item & capacity last ones + BufferOverflow.DROP_LATEST -> (0 until capacity).toList() // first & capacity following ones else -> error("cannot happen") } flow { @@ -38,6 +37,26 @@ class BufferConflationTest : TestBase() { finish(n + 2 + expectedList.size) } + @Test + fun testConflateExplicit() = runTest { + val n = 3 + expect(1) + flow { + expect(2) + emit(0) + expect(3) + emit(1) + expect(4) + emit(2) + } + .conflate() + .collect { value -> + assertEquals(2, value) + expect(n + 2) + } + finish(n + 2 + 1) + } + @Test fun testConflate() = checkConflate(1) { @@ -138,6 +157,6 @@ class BufferConflationTest : TestBase() { fun testBuffer3DropOldestOverrideBuffer8DropLatest() = checkConflate(3, BufferOverflow.DROP_OLDEST) { buffer(8, onBufferOverflow = BufferOverflow.DROP_LATEST) - .buffer(3, BufferOverflow.DROP_OLDEST) + .buffer(3, BufferOverflow.DROP_OLDEST) } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt index c8407901a1..4266d43b75 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt @@ -1,8 +1,8 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.testing.* import kotlin.math.* import kotlin.test.* @@ -11,16 +11,16 @@ import kotlin.test.* * implemented properly and that adjacent [buffer] calls are fused properly. */ class BufferTest : TestBase() { - private val n = 200 // number of elements to emit for test + private val n = 200 // number of elements to emit for each test private val defaultBufferSize = 64 // expected default buffer size (per docs) // Use capacity == -1 to check case of "no buffer" - private fun checkBuffer(capacity: Int, op: suspend Flow.() -> Flow) = runTest { - expect(1) + private fun checkBufferDispatched(capacity: Int, op: suspend Flow.() -> Flow) = runTest { /* - Channels perform full rendezvous. Sender does not suspend when there is a suspended receiver and vice-versa. - Thus, perceived batch size is +2 from capacity. + Channels perform full rendezvous. Sender does not suspend when there is a suspended receiver and vice versa. + Thus, the perceived batch size is +2 from capacity when the sender coroutine is dispatched. */ + expect(1) val batchSize = capacity + 2 flow { repeat(n) { i -> @@ -41,11 +41,134 @@ class BufferTest : TestBase() { finish(2 * n + 2) } + // Use capacity == -1 to check the case of "no buffer" + internal fun checkBuffer(capacity: Int, op: suspend Flow.() -> Flow) = runTest { + /* + The perceived capacity is capacity + 1 for the first batch and capacity + 2 for future batches + for the undispatched sender coroutine. + + Depending on the op, the sender coroutine can be launched as dispatched or undispatched. + E.g., flowOn with a different dispatcher is always dispatched, while only `.buffer(...)` is undispatched. + + If the coroutine launched undispatched (the most common case in these tests, + e.g. `flow { ... }.buffer(3).collect()`), the sender will arrive before the receiver subscribed, + thus making the perceived capacity +1 from capacity for the first batch, and +2 from capacity on + all the later batches. + + In addition to that, the sender also gets to execute first and populate the buffer. + See tests named *Explicit* below to understand the execution order. + */ + expect(1) + val batchSize = capacity + 2 + val firstBatchSize = min(n, capacity + 1) + flow { + repeat(n) { i -> + if (i < firstBatchSize) { + expect(i + 2) // the first expect for i == 0 is expect(1) + } else { + val batchNo = ((i - firstBatchSize) / batchSize) + 1 + val batchIdx = (i - firstBatchSize) % batchSize + expect(2 * (firstBatchSize + (batchNo - 1) * batchSize) + batchIdx + 2) + } + emit(i) + } + } + .op() // insert user-defined operator + .collect { i -> + if (i < firstBatchSize) { + expect(firstBatchSize + i + 2) + } else { + val batchNo = ((i - firstBatchSize) / batchSize) + 1 + val batchIdx = (i - firstBatchSize) % batchSize + // last batch might have smaller size + val curBatchSize = + min(firstBatchSize + batchNo * batchSize, n) - (firstBatchSize + (batchNo - 1) * batchSize) + expect(2 * (firstBatchSize + (batchNo - 1) * batchSize) + curBatchSize + batchIdx + 2) + } + } + finish(2 * n + 2) + } + @Test // capacity == -1 to checkBuffer means "no buffer" -- emits / collects are sequentially ordered fun testBaseline() = checkBuffer(-1) { this } + @Test + fun testBufferRendezvousExplicitSimple() = runTest { + flow { + expect(1) + emit("a") // suspends to wait for a receiver + expect(3) + } + .buffer(0) + .collect { value -> + assertEquals("a", value) + expect(2) + } + finish(4) + } + + @Test + fun testBufferRendezvousExplicit() = runTest { + flow { + expect(1) + emit("a") // suspends waiting for the receiver + expect(3) + emit("b") // doesn't suspend, because the receiver is already waiting + expect(4) + emit("c") // suspends waiting for the receiver + expect(7) + } + .buffer(0) + .collect { + when (it) { + "a" -> expect(2) + "b" -> expect(5) + "c" -> expect(6) + } + } + finish(8) + } + + @Test + fun testBuffer2Explicit() = runTest { + flow { + expect(1) + emit("a") // doesn't suspend, buffer has space + expect(2) + emit("b") // doesn't suspend, buffer has space + expect(3) + emit("c") // suspends until the buffer has space + expect(7) + emit("d") // doesn't suspend, the receiver is waiting + expect(8) + emit("e") // doesn't suspend, buffer has space + expect(9) + emit("f") // doesn't suspend, buffer has space + expect(10) + emit("g") // suspends until the buffer has space + expect(15) + emit("h") // doesn't suspend, the receiver is waiting + expect(16) + } + .buffer(2) + .collect { + when (it) { + "a" -> expect(4) + "b" -> expect(5) + "c" -> expect(6) + "d" -> expect(11) + "e" -> expect(12) + "f" -> expect(13) + "g" -> expect(14) + "h" -> expect(17) + else -> expectUnreached() + } + } + finish(18) + } + @Test fun testBufferDefault() = checkBuffer(defaultBufferSize) { @@ -76,6 +199,12 @@ class BufferTest : TestBase() { buffer(3) } + @Test + fun testBufferNotCompletelyFilled() = + checkBuffer(n + 10) { + buffer(n + 10) + } + @Test fun testBuffer00Fused() = checkBuffer(0) { @@ -132,19 +261,19 @@ class BufferTest : TestBase() { @Test // flowOn operator uses default buffer size when dispatcher changes fun testFlowOnDispatcherBufferDefault() = - checkBuffer(defaultBufferSize) { + checkBufferDispatched(defaultBufferSize) { flowOn(wrapperDispatcher()) } @Test // flowOn(...).buffer(n) sets explicit buffer size to n fun testFlowOnDispatcherBufferFused() = - checkBuffer(5) { + checkBufferDispatched(5) { flowOn(wrapperDispatcher()).buffer(5) } - + @Test // buffer(n).flowOn(...) sets explicit buffer size to n fun testBufferFlowOnDispatcherFused() = - checkBuffer(6) { + checkBufferDispatched(6) { buffer(6).flowOn(wrapperDispatcher()) } @@ -162,7 +291,7 @@ class BufferTest : TestBase() { @Test // multiple flowOn/buffer all fused together fun testBufferFlowOnMultipleFused() = - checkBuffer(12) { + checkBufferDispatched(12) { flowOn(wrapperDispatcher()).buffer(3) .flowOn(CoroutineName("Name")).buffer(4) .flowOn(wrapperDispatcher()).buffer(5) @@ -187,8 +316,18 @@ class BufferTest : TestBase() { val flow = emptyFlow() assertFailsWith { flow.buffer(capacity = -3) } assertFailsWith { flow.buffer(capacity = Int.MIN_VALUE) } - assertFailsWith { flow.buffer(capacity = Channel.CONFLATED, onBufferOverflow = BufferOverflow.DROP_LATEST) } - assertFailsWith { flow.buffer(capacity = Channel.CONFLATED, onBufferOverflow = BufferOverflow.DROP_OLDEST) } + assertFailsWith { + flow.buffer( + capacity = Channel.CONFLATED, + onBufferOverflow = BufferOverflow.DROP_LATEST + ) + } + assertFailsWith { + flow.buffer( + capacity = Channel.CONFLATED, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConflateTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConflateTest.kt index 7b3878c10b..f920fb0401 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ConflateTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ConflateTest.kt @@ -1,7 +1,7 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* +import kotlinx.coroutines.testing.* import kotlin.test.* class ConflateTest : TestBase() { @@ -20,4 +20,22 @@ class ConflateTest : TestBase() { assertEquals(listOf(1, 10, 20, 30), result) finish(2) } -} \ No newline at end of file + + +// @Test +// fun testDispatched() = withVirtualTime { +// expect(1) +// val flow = flow { +// for (i in 1..30) { +// delay(10) +// emit(i) +// } +// } +// val result = flow.flowOn(wrapperDispatcher()).conflate().onEach { +// println("Emitting $it") // prints 1 once and hangs // todo: ask Dmitry +// delay(100) +// }.toList() +// assertEquals(listOf(1, 10, 20, 30), result) +// finish(2) +// } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt index e3b4ba2e49..7f8ce79a2e 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt @@ -35,32 +35,33 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() { } @Test - fun testAtomicStart() = runTest { + fun testUndispatchedStart() = runTest { try { coroutineScope { val job = coroutineContext[Job]!! val flow = flow { - expect(3) + expect(2) emit(1) } - .onCompletion { expect(5) } + .onCompletion { expect(4) } .flatMapMerge { - expect(4) - flowOf(it).onCompletion { expectUnreached() } } - .onCompletion { expect(6) } + expect(3) + flowOf(it).onCompletion { expect(6) } + } + .onCompletion { expect(7) } launch { expect(1) flow.collect() } launch { - expect(2) + expect(5) yield() job.cancel() } } - } catch (e: CancellationException) { - finish(7) + } catch (_: CancellationException) { + finish(8) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt index 1391d7e810..2045bbc75d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt @@ -1,14 +1,12 @@ package kotlinx.coroutines.rx2 -import kotlinx.coroutines.testing.* -import io.reactivex.Observable -import io.reactivex.ObservableSource -import io.reactivex.Observer -import io.reactivex.disposables.Disposables -import io.reactivex.subjects.PublishSubject +import io.reactivex.* +import io.reactivex.disposables.* +import io.reactivex.subjects.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.testing.* import kotlinx.coroutines.testing.flow.* import kotlin.test.* @@ -72,8 +70,7 @@ class ObservableAsFlowTest : TestBase() { expect(1) flow.collect { expect(it) } expectUnreached() - } - catch (e: Exception) { + } catch (e: Exception) { assertSame(exception, e.cause) expect(5) } @@ -99,8 +96,7 @@ class ObservableAsFlowTest : TestBase() { if (it == 3) throw exception } expectUnreached() - } - catch (e: Exception) { + } catch (e: Exception) { assertSame(exception, e.cause) expect(4) } @@ -128,7 +124,7 @@ class ObservableAsFlowTest : TestBase() { assertNotNull(observer) job.cancel() val disposable = Disposables.empty() - observer!!.onSubscribe(disposable) + observer.onSubscribe(disposable) assertTrue(disposable.isDisposed) finish(3) } @@ -154,7 +150,7 @@ class ObservableAsFlowTest : TestBase() { fun testConflated() = runTest { val source = Observable.range(1, 5) val list = source.asFlow().conflate().toList() - assertEquals(listOf(1, 5), list) + assertEquals(listOf(5), list) } @Test