Skip to content

Commit bb906c7

Browse files
authored
Merge pull request #25 from lucassales2/feature/safe-collect
Tweak collectFlow
2 parents 78552ef + 3d50cae commit bb906c7

File tree

3 files changed

+254
-64
lines changed

3 files changed

+254
-64
lines changed

app/src/main/java/com/monstarlab/arch/extensions/FlowExtensions.kt

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,76 @@
11
package com.monstarlab.arch.extensions
22

3+
import androidx.lifecycle.Lifecycle
34
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.flow.Flow
5-
import kotlinx.coroutines.flow.collect
6-
import kotlinx.coroutines.flow.combine
7-
import kotlinx.coroutines.flow.flow
5+
import kotlinx.coroutines.ExperimentalCoroutinesApi
6+
import kotlinx.coroutines.flow.*
87
import kotlinx.coroutines.launch
98

9+
/**
10+
* Copied from AOSP https://android.googlesource.com/platform/frameworks/support/+/67cbbea03d7036f3bd27aae897a3d44b2ee027f5/lifecycle/lifecycle-runtime-ktx/src/main/java/androidx/lifecycle/FlowExt.kt
11+
* Flow operator that emits values from `this` upstream Flow when the [lifecycle] is
12+
* at least at [minActiveState] state. The emissions will be stopped when the lifecycle state
13+
* falls below [minActiveState] state.
14+
*
15+
* The flow will automatically start and cancel collecting from `this` upstream flow as the
16+
* [lifecycle] moves in and out of the target state.
17+
*
18+
* If [this] upstream Flow completes emitting items, `flowWithLifecycle` will trigger the flow
19+
* collection again when the [minActiveState] state is reached.
20+
*
21+
* This is NOT a terminal operator. This operator is usually followed by [collect], or
22+
* [onEach] and [launchIn] to process the emitted values.
23+
*
24+
* Note: this operator creates a hot flow that only closes when the [lifecycle] is destroyed or
25+
* the coroutine that collects from the flow is cancelled.
26+
*
27+
* ```
28+
* class MyActivity : AppCompatActivity() {
29+
* override fun onCreate(savedInstanceState: Bundle?) {
30+
* /* ... */
31+
* // Launches a coroutine that collects items from a flow when the Activity
32+
* // is at least started. It will automatically cancel when the activity is stopped and
33+
* // start collecting again whenever it's started again.
34+
* lifecycleScope.launch {
35+
* flow
36+
* .flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
37+
* .collect {
38+
* // Consume flow emissions
39+
* }
40+
* }
41+
* }
42+
* }
43+
* ```
44+
*
45+
* Warning: [Lifecycle.State.INITIALIZED] is not allowed in this API. Passing it as a
46+
* parameter will throw an [IllegalArgumentException].
47+
*
48+
* Tip: If multiple flows need to be collected using `flowWithLifecycle`, consider using
49+
* the [LifecycleOwner.addRepeatingJob] API to collect from all of them using a different
50+
* [launch] per flow instead. This will be more efficient as only one [LifecycleObserver] will be
51+
* added to the [lifecycle] instead of one per flow.
52+
*
53+
* @param lifecycle The [Lifecycle] where the restarting collecting from `this` flow work will be
54+
* kept alive.
55+
* @param minActiveState [Lifecycle.State] in which the upstream flow gets collected. The
56+
* collection will stop if the lifecycle falls below that state, and will restart if it's in that
57+
* state again.
58+
* @return [Flow] that only emits items from `this` upstream flow when the [lifecycle] is at
59+
* least in the [minActiveState].
60+
*/
61+
@OptIn(ExperimentalCoroutinesApi::class)
62+
public fun <T> Flow<T>.flowWithLifecycle(
63+
lifecycle: Lifecycle,
64+
minActiveState: Lifecycle.State = Lifecycle.State.STARTED
65+
): Flow<T> = callbackFlow {
66+
lifecycle.repeatOnLifecycle(minActiveState) {
67+
this@flowWithLifecycle.collect {
68+
send(it)
69+
}
70+
}
71+
close()
72+
}
73+
1074
fun <T1, T2> CoroutineScope.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, collectBlock: (suspend (T1, T2) -> Unit)) {
1175
launch {
1276
flow1.combine(flow2) { v1, v2 ->
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package com.monstarlab.arch.extensions
2+
3+
/*
4+
* Copyright 2021 The Android Open Source Project
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import androidx.lifecycle.Lifecycle
20+
import androidx.lifecycle.LifecycleEventObserver
21+
import androidx.lifecycle.LifecycleOwner
22+
import androidx.lifecycle.lifecycleScope
23+
import kotlinx.coroutines.CoroutineScope
24+
import kotlinx.coroutines.Dispatchers
25+
import kotlinx.coroutines.Job
26+
import kotlinx.coroutines.coroutineScope
27+
import kotlinx.coroutines.launch
28+
import kotlinx.coroutines.suspendCancellableCoroutine
29+
import kotlinx.coroutines.withContext
30+
import kotlin.coroutines.CoroutineContext
31+
import kotlin.coroutines.EmptyCoroutineContext
32+
import kotlin.coroutines.resume
33+
/**
34+
* Copied from AOSP https://android.googlesource.com/platform/frameworks/support/+/67cbbea03d7036f3bd27aae897a3d44b2ee027f5/lifecycle/lifecycle-runtime-ktx/src/main/java/androidx/lifecycle/RepeatOnLifecycle.kt
35+
* Launches and runs the given [block] in a coroutine when `this` [LifecycleOwner]'s [Lifecycle]
36+
* is at least at [state]. The launched coroutine will be cancelled when the lifecycle state falls
37+
* below [state].
38+
*
39+
* The [block] will cancel and re-launch as the lifecycle moves in and out of the target state.
40+
* To permanently remove the work from the lifecycle, [Job.cancel] the returned [Job].
41+
*
42+
* ```
43+
* // Runs the block of code in a coroutine when the lifecycleOwner is at least STARTED.
44+
* // The coroutine will be cancelled when the ON_STOP event happens and will restart executing
45+
* // if the lifecycleOwner's lifecycle receives the ON_START event again.
46+
* lifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) {
47+
* uiStateFlow.collect { uiState ->
48+
* updateUi(uiState)
49+
* }
50+
* }
51+
* ```
52+
*
53+
* The best practice is to call this function when the lifecycleOwner is initialized. For
54+
* example, `onCreate` in an Activity, or `onViewCreated` in a Fragment. Otherwise, multiple
55+
* repeating jobs doing the same could be registered and be executed at the same time.
56+
*
57+
* Warning: [Lifecycle.State.INITIALIZED] is not allowed in this API. Passing it as a
58+
* parameter will throw an [IllegalArgumentException].
59+
*
60+
* @see Lifecycle.repeatOnLifecycle for details
61+
*
62+
* @param state [Lifecycle.State] in which the coroutine running [block] starts. That coroutine
63+
* will cancel if the lifecycle falls below that state, and will restart if it's in that state
64+
* again.
65+
* @param coroutineContext [CoroutineContext] used to execute [block].
66+
* @param block The block to run when the lifecycle is at least in [state] state.
67+
* @return [Job] to manage the repeating work.
68+
*/
69+
public fun LifecycleOwner.addRepeatingJob(
70+
state: Lifecycle.State,
71+
coroutineContext: CoroutineContext = EmptyCoroutineContext,
72+
block: suspend CoroutineScope.() -> Unit
73+
): Job = lifecycleScope.launch(coroutineContext) {
74+
lifecycle.repeatOnLifecycle(state, block)
75+
}
76+
/**
77+
* Runs the given [block] in a new coroutine when `this` [Lifecycle] is at least at [state] and
78+
* suspends the execution until `this` [Lifecycle] is [Lifecycle.State.DESTROYED].
79+
*
80+
* The [block] will cancel and re-launch as the lifecycle moves in and out of the target state.
81+
*
82+
* Warning: [Lifecycle.State.INITIALIZED] is not allowed in this API. Passing it as a
83+
* parameter will throw an [IllegalArgumentException].
84+
*
85+
* @param state [Lifecycle.State] in which `block` runs in a new coroutine. That coroutine
86+
* will cancel if the lifecycle falls below that state, and will restart if it's in that state
87+
* again.
88+
* @param block The block to run when the lifecycle is at least in [state] state.
89+
*/
90+
public suspend fun Lifecycle.repeatOnLifecycle(
91+
state: Lifecycle.State,
92+
block: suspend CoroutineScope.() -> Unit
93+
) {
94+
require(state !== Lifecycle.State.INITIALIZED) {
95+
"repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state."
96+
}
97+
if (currentState === Lifecycle.State.DESTROYED) {
98+
return
99+
}
100+
coroutineScope {
101+
withContext(Dispatchers.Main.immediate) {
102+
// Check the current state of the lifecycle as the previous check is not guaranteed
103+
// to be done on the main thread.
104+
if (currentState === Lifecycle.State.DESTROYED) return@withContext
105+
// Instance of the running repeating coroutine
106+
var launchedJob: Job? = null
107+
// Registered observer
108+
var observer: LifecycleEventObserver? = null
109+
try {
110+
// Suspend the coroutine until the lifecycle is destroyed or
111+
// the coroutine is cancelled
112+
suspendCancellableCoroutine<Unit> { cont ->
113+
// Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
114+
// cancels when it moves falls below that state.
115+
val startWorkEvent = Lifecycle.Event.upTo(state)
116+
val cancelWorkEvent = Lifecycle.Event.downFrom(state)
117+
observer = LifecycleEventObserver { _, event ->
118+
if (event == startWorkEvent) {
119+
// Launch the repeating work preserving the calling context
120+
launchedJob = this@coroutineScope.launch(block = block)
121+
return@LifecycleEventObserver
122+
}
123+
if (event == cancelWorkEvent) {
124+
launchedJob?.cancel()
125+
launchedJob = null
126+
}
127+
if (event == Lifecycle.Event.ON_DESTROY) {
128+
cont.resume(Unit)
129+
}
130+
}
131+
this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
132+
}
133+
} finally {
134+
launchedJob?.cancel()
135+
observer?.let {
136+
this@repeatOnLifecycle.removeObserver(it)
137+
}
138+
}
139+
}
140+
}
141+
}

app/src/main/java/com/monstarlab/arch/extensions/ViewExtensions.kt

Lines changed: 45 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ package com.monstarlab.arch.extensions
33
import android.view.View
44
import androidx.core.view.isVisible
55
import androidx.fragment.app.Fragment
6-
import androidx.lifecycle.DefaultLifecycleObserver
7-
import androidx.lifecycle.LifecycleOwner
6+
import androidx.lifecycle.Lifecycle
87
import androidx.lifecycle.lifecycleScope
98
import com.google.android.material.snackbar.Snackbar
109
import com.monstarlab.core.sharedui.errorhandling.ViewError
1110
import kotlinx.coroutines.channels.awaitClose
1211
import kotlinx.coroutines.flow.*
1312

14-
fun Fragment.snackErrorFlow(targetFlow: SharedFlow<ViewError>, root: View, length: Int = Snackbar.LENGTH_SHORT) {
13+
fun Fragment.snackErrorFlow(
14+
targetFlow: SharedFlow<ViewError>,
15+
root: View,
16+
length: Int = Snackbar.LENGTH_SHORT
17+
) {
1518
collectFlow(targetFlow) { viewError ->
1619
Snackbar.make(root, viewError.message, length).show()
1720
}
@@ -23,75 +26,57 @@ fun Fragment.visibilityFlow(targetFlow: Flow<Boolean>, vararg view: View) {
2326
}
2427
}
2528

26-
fun <T> Fragment.collectFlow(targetFlow: Flow<T>, collectBlock: ((T) -> Unit)) {
27-
safeViewCollect {
28-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
29-
targetFlow.collect {
30-
collectBlock.invoke(it)
29+
fun <T> Fragment.collectFlow(
30+
targetFlow: Flow<T>,
31+
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
32+
collectBlock: ((T) -> Unit)
33+
) {
34+
lifecycleScope.launchWhenStarted {
35+
targetFlow.flowWithLifecycle(viewLifecycleOwner.lifecycle, minActiveState)
36+
.collect {
37+
collectBlock(it)
3138
}
32-
}
3339
}
3440
}
3541

36-
private inline fun Fragment.safeViewCollect(crossinline viewOwner: LifecycleOwner.() -> Unit) {
37-
lifecycle.addObserver(object : DefaultLifecycleObserver {
38-
override fun onCreate(owner: LifecycleOwner) {
39-
viewLifecycleOwnerLiveData.observe(
40-
this@safeViewCollect,
41-
{ viewLifecycleOwner ->
42-
viewLifecycleOwner.viewOwner()
43-
}
44-
)
45-
}
46-
})
47-
}
4842

49-
fun <T1, T2> Fragment.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, collectBlock: ((T1, T2) -> Unit)) {
50-
safeViewCollect {
51-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
52-
flow1.combine(flow2) { v1, v2 ->
53-
collectBlock.invoke(v1, v2)
54-
}.collect {
55-
// Empty collect block to trigger ^
56-
}
57-
}
58-
}
43+
fun <T1, T2> Fragment.combineFlows(
44+
flow1: Flow<T1>,
45+
flow2: Flow<T2>,
46+
collectBlock: ((T1, T2) -> Unit)
47+
) {
48+
collectFlow(flow1.combine(flow2) { v1, v2 ->
49+
collectBlock.invoke(v1, v2)
50+
}) {}
5951
}
6052

61-
fun <T1, T2, T3> Fragment.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, collectBlock: ((T1, T2, T3) -> Unit)) {
62-
safeViewCollect {
63-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
64-
combine(flow1, flow2, flow3) { v1, v2, v3 ->
65-
collectBlock.invoke(v1, v2, v3)
66-
}.collect {
67-
// Empty collect block to trigger ^
68-
}
69-
}
70-
}
53+
fun <T1, T2, T3> Fragment.combineFlows(
54+
flow1: Flow<T1>,
55+
flow2: Flow<T2>,
56+
flow3: Flow<T3>,
57+
collectBlock: ((T1, T2, T3) -> Unit)
58+
) {
59+
collectFlow(combine(flow1, flow2, flow3) { v1, v2, v3 ->
60+
collectBlock.invoke(v1, v2, v3)
61+
}) {}
7162
}
7263

73-
fun <T1, T2, T3, T4> Fragment.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, flow4: Flow<T4>, collectBlock: ((T1, T2, T3, T4) -> Unit)) {
74-
safeViewCollect {
75-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
76-
combine(flow1, flow2, flow3, flow4) { v1, v2, v3, v4 ->
77-
collectBlock.invoke(v1, v2, v3, v4)
78-
}.collect {
79-
// Empty collect block to trigger ^
80-
}
81-
}
82-
}
64+
fun <T1, T2, T3, T4> Fragment.combineFlows(
65+
flow1: Flow<T1>,
66+
flow2: Flow<T2>,
67+
flow3: Flow<T3>,
68+
flow4: Flow<T4>,
69+
collectBlock: ((T1, T2, T3, T4) -> Unit)
70+
) {
71+
collectFlow(combine(flow1, flow2, flow3, flow4) { v1, v2, v3, v4 ->
72+
collectBlock.invoke(v1, v2, v3, v4)
73+
}) {}
8374
}
8475

8576
fun <T1, T2> Fragment.zipFlows(flow1: Flow<T1>, flow2: Flow<T2>, collectBlock: ((T1, T2) -> Unit)) {
86-
safeViewCollect {
87-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
88-
flow1.zip(flow2) { v1, v2 ->
89-
collectBlock.invoke(v1, v2)
90-
}.collect {
91-
// Empty collect block to trigger ^
92-
}
93-
}
94-
}
77+
collectFlow(flow1.zip(flow2) { v1, v2 ->
78+
collectBlock.invoke(v1, v2)
79+
}) {}
9580
}
9681

9782
fun View.clicks(throttleTime: Long = 400): Flow<Unit> = callbackFlow {

0 commit comments

Comments
 (0)