-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Use case
SharedFlow currently requires a fair amount of boilerplate to configure. Below is an example of taking an upstream flow, mapping the output and emitting it as a SharedFlow:
class BarProvider(scope: CoroutineScope, upstream: Flow<Foo>) {
val bars: SharedFlow<Bar>
private val _bars = MutableSharedFlow<Bar>(/* params */)
init {
scope.launch {
upstream.collect { foo ->
_bars.emit(fooToBar(foo))
}
}
}
/* ... */
}This is a common pattern in Kotlin, and one that is generally solved with builders. A very similar example would be Channel and CoroutineScope.produce. I submit that the same solution can be used for SharedFlow:
class BarProvider(scope: CoroutineScope, upstream: Flow<Foo>) {
val bars = scope.sharedFlow(/* params */) {
upstream.collect { foo ->
emit(fooToBar(foo))
}
}
/* ... */
}The Shape of the API
fun <T> CoroutineScope.sharedFlow(
sharingStarted: SharingStarted,
context: CoroutineContext = EmptyCoroutineContext,
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
@BuilderInference block: suspend SharedFlowBuilderScope<T>.() -> Unit,
): SharedFlow<T>
interface SharedFlowBuilderScope<T> : CoroutineScope, MutableSharedFlow<T>The function signature looks very similar to the MutableSharedFlow factory, with an added context, builder, and sharingStarted parameters.
SharingStarted (from the shareIn flow operator) is likely required to prevent synchronization issues (see shareIn for how it affects dispatching). However, SharingStarted currently has no way to cancel the producing job, thus preventing the parent job from ever finishing normally (SharingStarted.Eagerly and SharingStarted.Lazily are handled specially in shareIn). This presents a bigger problem for the proposed sharedFlow builder, as it would keep the receiver CoroutineScope open indefinitely. Therefore a SharingCommand.CANCEL is also proposed to signal that the shared flow should not be restarted.