@@ -6,13 +6,13 @@ import io.smallrye.mutiny.subscription.MultiSubscriber
66import java.util.concurrent.Flow.Subscription
77import java.util.concurrent.atomic.AtomicReference
88import kotlin.coroutines.cancellation.CancellationException
9- import kotlin.coroutines.coroutineContext
109import kotlinx.coroutines.CoroutineScope
1110import kotlinx.coroutines.cancel
1211import kotlinx.coroutines.channels.BufferOverflow
1312import kotlinx.coroutines.channels.Channel
1413import kotlinx.coroutines.channels.awaitClose
1514import kotlinx.coroutines.channels.trySendBlocking
15+ import kotlinx.coroutines.currentCoroutineContext
1616import kotlinx.coroutines.flow.Flow
1717import kotlinx.coroutines.flow.buffer
1818import kotlinx.coroutines.flow.callbackFlow
@@ -32,7 +32,7 @@ import kotlinx.coroutines.launch
3232fun <T > Multi<T>.asFlow (
3333 bufferCapacity : Int = Channel .UNLIMITED ,
3434 bufferOverflowStrategy : BufferOverflow = BufferOverflow .SUSPEND
35- ): Flow <T > = callbackFlow< T > {
35+ ): Flow <T > = callbackFlow {
3636 val parentCtx = coroutineContext
3737
3838 val subscriber = object : MultiSubscriber <T > {
@@ -77,7 +77,7 @@ fun <T> Multi<T>.asFlow(
7777 * without respecting the requested amount of the subscriber.
7878 */
7979suspend fun <T > Flow<T>.asMulti (): Multi <T > {
80- val parentCtx = coroutineContext
80+ val parentCtx = currentCoroutineContext()
8181 return Multi .createFrom().emitter { em: MultiEmitter <in T > ->
8282 val job = CoroutineScope (parentCtx).launch {
8383 try {
@@ -101,4 +101,4 @@ suspend fun <T> Flow<T>.asMulti(): Multi<T> {
101101 }
102102}
103103
104- private class NonPropagatingCancellationException : kotlin.coroutines.cancellation. CancellationException ()
104+ private class NonPropagatingCancellationException : CancellationException ()
0 commit comments