Skip to content

Commit bec405b

Browse files
authored
Introduce SharedFlow.collectLatest (#4454)
Fixes https://youtrack.jetbrains.com/issue/KTIJ-34450
1 parent 0ea765f commit bec405b

File tree

3 files changed

+20
-0
lines changed

3 files changed

+20
-0
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
10311031
public static final synthetic fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10321032
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10331033
public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1034+
public static final fun collectLatest (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10341035
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
10351036
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
10361037
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,7 @@ final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/Broadcas
11441144
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeEach(kotlin/Function1<#A, kotlin/Unit>) // kotlinx.coroutines.channels/consumeEach|[email protected]<0:0>(kotlin.Function1<0:0,kotlin.Unit>){0§<kotlin.Any?>}[0]
11451145
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collect(crossinline kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collect|[email protected]<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§<kotlin.Any?>}[0]
11461146
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectIndexed(crossinline kotlin.coroutines/SuspendFunction2<kotlin/Int, #A, kotlin/Unit>) // kotlinx.coroutines.flow/collectIndexed|[email protected]<0:0>(kotlin.coroutines.SuspendFunction2<kotlin.Int,0:0,kotlin.Unit>){0§<kotlin.Any?>}[0]
1147+
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/collectLatest(noinline kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>): kotlin/Nothing // kotlinx.coroutines.flow/collectLatest|[email protected]<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§<kotlin.Any?>}[0]
11471148
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|[email protected]<0:0>(){0§<kotlin.Any?>}[0]
11481149
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/last(): #A // kotlinx.coroutines.flow/last|[email protected]<0:0>(){0§<kotlin.Any?>}[0]
11491150
final suspend inline fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/SharedFlow<#A>).kotlinx.coroutines.flow/toList(): kotlin.collections/List<#A> // kotlinx.coroutines.flow/toList|[email protected]<0:0>(){0§<kotlin.Any?>}[0]

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,21 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
111111
collect(object : FlowCollector<T> {
112112
override suspend fun emit(value: T) = action(value)
113113
})
114+
115+
// -------------------- Collecting operations on a SharedFlow --------------------
116+
// -------------------- These mirror the operations above and are introduced when requested --------------------
117+
118+
/**
119+
* Terminal flow operator that collects the given flow with a provided [action].
120+
* The crucial difference from [collect] is that when the original flow emits a new value
121+
* then the [action] block for the previous value is cancelled.
122+
*
123+
* This is a special version of [collectLatest] for [SharedFlow].
124+
* Its only difference from the usual [collectLatest] on [Flow]
125+
* is that this version returns [Nothing] to indicate that it never completes.
126+
* See [SharedFlow] for more details.
127+
*/
128+
public suspend inline fun <T> SharedFlow<T>.collectLatest(noinline action: suspend (value: T) -> Unit): Nothing {
129+
(this as Flow<T>).collectLatest(action)
130+
throw IllegalStateException("SharedFlow never completes, this call should never return.")
131+
}

0 commit comments

Comments
 (0)