Skip to content

Commit 91d9f7c

Browse files
elizarovqwwdfsad
authored andcommitted
Flow exceptions handling facilities (#1280)
* Flow documentation introduces and explains "exception transparency" concept in detail. * Introduce exception handling operators: * catch intermediate operator. * retry and retryWhen intermediate operators. * collect() without lambda terminal operator. * onErrorXxx operators deprecated and moved to migration file when it was appropriate.
1 parent 9420df3 commit 91d9f7c

File tree

20 files changed

+370
-124
lines changed

20 files changed

+370
-124
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
816816
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
817817
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
818818
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
819+
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
819820
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
821+
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
820822
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
821823
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
822824
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
@@ -863,12 +865,13 @@ public final class kotlinx/coroutines/flow/FlowKt {
863865
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
864866
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
865867
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
866-
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
867-
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
868868
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
869869
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
870-
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
870+
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
871+
public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
871872
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
873+
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
874+
public static final fun retryWhen (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
872875
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
873876
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
874877
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
@@ -899,6 +902,10 @@ public final class kotlinx/coroutines/flow/MigrationKt {
899902
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
900903
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
901904
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
905+
public static final fun onErrorResumeNext (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
906+
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
907+
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
908+
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
902909
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
903910
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
904911
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/Annotations.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public annotation class ExperimentalCoroutinesApi
3131
@MustBeDocumented
3232
@Retention(value = AnnotationRetention.BINARY)
3333
@Experimental(level = Experimental.Level.WARNING)
34+
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.TYPEALIAS, AnnotationTarget.PROPERTY)
3435
public annotation class FlowPreview
3536

3637
/**

kotlinx-coroutines-core/common/src/flow/Flow.kt

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ import kotlin.coroutines.*
1212
* A cold asynchronous data stream that sequentially emits values
1313
* and completes normally or with an exception.
1414
*
15-
* _Cold flow_ means that intermediate operators on a flow such as [map] and [filter] do not trigger its execution,
16-
* which is only done by terminal operators like [single]. By default, flows are _sequential_ and all flow
17-
* operations are executed sequentially in the same coroutine, see [buffer] for details.
18-
*
19-
* _Collecting the flow_ means executing all its operations.
20-
* Flow values can be collected in a suspending manner without actual blocking using the [collect] extension that
21-
* completes normally or exceptionally:
15+
* _Intermediate operators_ on the flow such as [map], [filter], [take], [zip], etc are functions that are
16+
* applied to the _upstream_ flow or flows and return a _downstream_ flow where further operators can be applied to.
17+
* Intermediate operations do not execute any code in the flow and are not suspending functions themselves.
18+
* They only set up a chain of operations for future execution and quickly return.
19+
* This is known as a _cold flow_ property.
20+
*
21+
* _Terminal operators_ on the flow are suspending functions such as [collect], [single], [reduce], [toList], etc.
22+
* They are applied to the upstream flow and trigger execution of all operations.
23+
* Execution of the flow is also called _collecting the flow_ and is always performed in a suspending manner
24+
* without actual blocking. Terminal operator complete normally or exceptionally depending on successful or failed
25+
* execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example:
2226
*
2327
* ```
2428
* try {
@@ -30,10 +34,13 @@ import kotlin.coroutines.*
3034
* }
3135
* ```
3236
*
33-
* Additionally, the library provides a rich set of terminal operators such as [single], [reduce] and others.
37+
* By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine,
38+
* with an exception for a few operations specifically designed to introduce concurrency into flow
39+
* the execution such a [buffer] and [flatMapMerge]. See their documentation for details.
3440
*
35-
* Flows don't carry information whether they are cold streams (which can be collected repeatedly and
36-
* trigger their evaluation every time [collect] is executed) or hot ones, but, conventionally, they represent cold streams.
41+
* Flow interface does not carry information whether a flow is a truly a cold stream that can be collected repeatedly and
42+
* triggers execution of the same code every time it is collected or if it is a hot stream that emits different
43+
* values from the same running source on each collection. However, conventionally flows represent cold streams.
3744
* Transitions between hot and cold streams are supported via channels and the corresponding API:
3845
* [channelFlow], [produceIn], [broadcastIn].
3946
*
@@ -48,7 +55,18 @@ import kotlin.coroutines.*
4855
* * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from
4956
* potentially concurrent calls to [send][kotlinx.coroutines.channels.SendChannel.send] function.
5057
*
51-
* ### Flow context
58+
* ### Flow constraints
59+
*
60+
* All implementations of `Flow` interface must adhere to two key properties that are described in detail below:
61+
*
62+
* * Context preservation.
63+
* * Exception transparency.
64+
*
65+
* These properties ensure the ability to perform local reasoning about the code with flows and modularize the code
66+
* in such a way so that upstream flow emitters can be developed separately from downstream flow collectors.
67+
* A user of the flow does not needs to know implementation details of the upstream flows it uses.
68+
*
69+
* ### Context preservation
5270
*
5371
* The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks
5472
* it downstream, thus making reasoning about the execution context of particular transformations or terminal
@@ -77,7 +95,7 @@ import kotlin.coroutines.*
7795
* }
7896
* ```
7997
*
80-
* From the implementation point of view it means that all flow implementations should
98+
* From the implementation point of view, it means that all flow implementations should
8199
* emit only from the same coroutine.
82100
* This constraint is efficiently enforced by the default [flow] builder.
83101
* The [flow] builder should be used if flow implementation does not start any coroutines.
@@ -108,29 +126,57 @@ import kotlin.coroutines.*
108126
* - Collecting another flow from a separate context is allowed, but it has the same effect as
109127
* [flowOn] operator on that flow, which is more efficient.
110128
*
129+
* ### Exception transparency
130+
*
131+
* Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint
132+
* it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into
133+
* `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with
134+
* [catch][Flow.catch] operator and it is designed to catch only exception coming from upstream flow while passing
135+
* all the downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
136+
* throw any unhandled exception that occurs in its code or in upstream flows, for example:
137+
*
138+
* ```
139+
* flow { emitData() }
140+
* .map { computeOne(it) }
141+
* .catch { ... } // catches exceptions in emitData and computeOne
142+
* .map { computeTwo(it) }
143+
* .collect { process(it) } // throws exceptions from process and computeTwo
144+
* ```
145+
*
146+
* Failure to adhere to the exception transparency requirement would result in strange behaviours that would make
147+
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
148+
* by the upstream flow, limiting the ability of local reasoning about the code.
149+
*
150+
* Currently, flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
151+
* in the future either at run time or at compile time.
152+
*
153+
* ### Reactive streams
154+
*
111155
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
112-
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from kotlinx-coroutines-reactive module.
156+
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
113157
*/
114158
@ExperimentalCoroutinesApi
115159
public interface Flow<out T> {
116-
117160
/**
118161
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
119162
* This method should never be implemented or used directly.
120163
*
121164
* The only way to implement flow interface directly is to extend [AbstractFlow].
122-
* To collect it into the specific collector, either `collector.emitAll(flow)` or `collect { }` extension should be used.
123-
* Such limitation ensures that context preservation property is not violated and prevents most of the developer mistakes
124-
* related to concurrency, inconsistent flow dispatchers and cancellation.
165+
* To collect it into the specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
166+
* should be used. Such limitation ensures that context preservation property is not violated and prevents most
167+
* of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.
125168
*/
126169
@InternalCoroutinesApi
127170
public suspend fun collect(collector: FlowCollector<T>)
128171
}
129172

130173
/**
131174
* Base class to extend to have a stateful implementation of the flow.
132-
* It tracks all the properties required for context preservation and throws [IllegalStateException] if any of the properties are violated.
175+
* It tracks all the properties required for context preservation and throws [IllegalStateException]
176+
* if any of the properties are violated.
177+
*
133178
* Example of the implementation:
179+
*
134180
* ```
135181
* // list.asFlow() + collect counter
136182
* class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() {

kotlinx-coroutines-core/common/src/flow/Migration.kt

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,18 @@ public fun PublishSubject(): Any = error("Should not be called")
105105
/** @suppress **/
106106
@Deprecated(
107107
level = DeprecationLevel.ERROR,
108-
message = "Flow analogue is named onErrorCollect",
109-
replaceWith = ReplaceWith("onErrorCollect(fallback)")
108+
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
109+
replaceWith = ReplaceWith("catch { emitAll(fallback) }")
110110
)
111111
public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should not be called")
112112

113+
@Deprecated(
114+
level = DeprecationLevel.ERROR,
115+
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
116+
replaceWith = ReplaceWith("catch { emitAll(fallback) }")
117+
)
118+
public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = error("Should not be called")
119+
113120
/**
114121
* Self-explanatory, the reason of deprecation is "context preservation" property (you can read more in [Flow] documentation)
115122
* @suppress
@@ -181,15 +188,15 @@ public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Sh
181188
*/
182189
@Deprecated(
183190
level = DeprecationLevel.ERROR,
184-
message = "Flow analogue is named flattenConcat",
191+
message = "Flow analogue of 'merge' is 'flattenConcat'",
185192
replaceWith = ReplaceWith("flattenConcat()")
186193
)
187194
public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
188195

189196
/** @suppress **/
190197
@Deprecated(
191198
level = DeprecationLevel.ERROR,
192-
message = "Flow analogue is named flattenConcat",
199+
message = "Flow analogue of 'flatten' is 'flattenConcat'",
193200
replaceWith = ReplaceWith("flattenConcat()")
194201
)
195202
public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
@@ -210,7 +217,7 @@ public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
210217
*/
211218
@Deprecated(
212219
level = DeprecationLevel.ERROR,
213-
message = "Kotlin analogue of compose is 'let'",
220+
message = "Flow analogue of 'compose' is 'let'",
214221
replaceWith = ReplaceWith("let(transformer)")
215222
)
216223
public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")
@@ -220,7 +227,7 @@ public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> =
220227
*/
221228
@Deprecated(
222229
level = DeprecationLevel.ERROR,
223-
message = "Kotlin analogue of 'skip' is 'drop'",
230+
message = "Flow analogue of 'skip' is 'drop'",
224231
replaceWith = ReplaceWith("drop(count)")
225232
)
226233
public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
@@ -246,3 +253,23 @@ public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = error
246253
replaceWith = ReplaceWith("scan(initial, operation)")
247254
)
248255
public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = error("Should not be called")
256+
257+
@Deprecated(
258+
level = DeprecationLevel.ERROR,
259+
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'",
260+
replaceWith = ReplaceWith("catch { emit(fallback) }")
261+
)
262+
// Note: this version without predicate gives better "replaceWith" action
263+
public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = error("Should not be called")
264+
265+
@Deprecated(
266+
level = DeprecationLevel.ERROR,
267+
message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'",
268+
replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emit(fallback) else throw e }")
269+
)
270+
public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: (Throwable) -> Boolean = { true }): Flow<T> =
271+
catch { e ->
272+
// Note: default value is for binary compatibility with preview version, that is why it has body
273+
if (!predicate(e)) throw e
274+
emit(fallback)
275+
}

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ internal class ChannelFlowOperatorImpl<T>(
134134
// Now if the underlying collector was accepting concurrent emits, then this one is too
135135
// todo: we might need to generalize this pattern for "thread-safe" operators that can fuse with channels
136136
private fun <T> FlowCollector<T>.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector<T> = when (this) {
137-
// SendingCollector does not care about the context at all so can be used as it
138-
is SendingCollector -> this
137+
// SendingCollector & NopCollector do not care about the context at all and can be used as is
138+
is SendingCollector, is NopCollector -> this
139139
// Original collector is concurrent, so wrap into ConcurrentUndispatchedContextCollector (also concurrent)
140140
is ConcurrentFlowCollector -> ConcurrentUndispatchedContextCollector(this, emitContext)
141141
// Otherwise just wrap into UndispatchedContextCollector interface implementation
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.flow.*
8+
9+
internal object NopCollector : ConcurrentFlowCollector<Any?> {
10+
override suspend fun emit(value: Any?) {
11+
// does nothing
12+
}
13+
}

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> {
151151
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
152152

153153
/**
154-
* The operator that changes the context where this flow is executed to the given [context].
154+
* Changes the context where this flow is executed to the given [context].
155155
* This operator is composable and affects only preceding operators that do not have its own context.
156156
* This operator is context preserving: [context] **does not** leak into the downstream flow.
157157
*

0 commit comments

Comments
 (0)