Skip to content

Commit 2bc88ed

Browse files
committed
launch ChannelFlow.collect CoroutineStart.UNDISPATCHED when dispatcher didn't change
1 parent b488ce5 commit 2bc88ed

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,23 @@ public abstract class ChannelFlow<T>(
109109
* For non-atomic start it is possible to observe the situation,
110110
* where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
111111
* handlers, while the pipeline before does not, because it was cancelled during its dispatch.
112-
* Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
112+
* Thus `onCompletion` and `finally` blocks won't be executed, and it may lead to a different kind of memory leaks.
113113
*/
114114
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
115-
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
115+
produceImplInternal(scope, CoroutineStart.ATOMIC)
116+
117+
internal open fun produceImplInternal(scope: CoroutineScope, start: CoroutineStart): ReceiveChannel<T> = scope.produce(context, produceCapacity, onBufferOverflow, start = start, block = collectToFun)
116118

117119
override suspend fun collect(collector: FlowCollector<T>): Unit =
118120
coroutineScope {
119-
collector.emitAll(produceImpl(this))
121+
// If upstream and collect have the same dispatcher, launch the `produce` coroutine undispatched.
122+
// This allows the collector to reliably subscribe to the flow before it starts emitting.
123+
val current = currentCoroutineContext()[ContinuationInterceptor]
124+
val desired = context[ContinuationInterceptor]
125+
val start = if (desired == null || desired == current) {
126+
CoroutineStart.UNDISPATCHED
127+
} else CoroutineStart.ATOMIC
128+
collector.emitAll(produceImplInternal(this, start))
120129
}
121130

122131
protected open fun additionalToStringProps(): String? = null

0 commit comments

Comments
 (0)