Skip to content

Commit 760488c

Browse files
committed
Use a direct dispatcher in Flow -> Rx conversions.
A direct dispatcher mimics the Rx always synchronous-by-default behavior (unless `subscribeOn`/`observeOn` is used), which is different to the `Dispatchers.Unconfined` behavior (default used by `Flow.asObservable`) which may form event loops for nested coroutines (asynchronous).
1 parent 00753bb commit 760488c

File tree

11 files changed

+141
-7
lines changed

11 files changed

+141
-7
lines changed

android/conventions/src/main/kotlin/ribs.spotless-convention.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ configure<com.diffplug.gradle.spotless.SpotlessExtension> {
2626
endWithNewline()
2727
}
2828
kotlin {
29+
toggleOffOn()
2930
target("**/*.kt")
3031
ktlint(libs.versions.ktlint.get()).editorConfigOverride(
3132
mapOf(

android/libraries/rib-android/src/main/kotlin/com/uber/rib/core/RibActivity.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ abstract class RibActivity :
6060

6161
@Volatile private var _lifecycleObservable: Observable<ActivityLifecycleEvent>? = null
6262
private val lifecycleObservable
63-
get() = ::_lifecycleObservable.setIfNullAndGet { lifecycleFlow.asObservable() }
63+
get() = ::_lifecycleObservable.setIfNullAndGet { lifecycleFlow.asObservable(DirectDispatcher) }
6464

6565
private val _callbacksFlow =
6666
MutableSharedFlow<ActivityCallbackEvent>(0, 1, BufferOverflow.DROP_OLDEST)
@@ -69,7 +69,7 @@ abstract class RibActivity :
6969

7070
@Volatile private var _callbacksObservable: Observable<ActivityCallbackEvent>? = null
7171
private val callbacksObservable
72-
get() = ::_callbacksObservable.setIfNullAndGet { callbacksFlow.asObservable() }
72+
get() = ::_callbacksObservable.setIfNullAndGet { callbacksFlow.asObservable(DirectDispatcher) }
7373

7474
/** @return an observable of this activity's lifecycle events. */
7575
final override fun lifecycle(): Observable<ActivityLifecycleEvent> = lifecycleObservable

android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/FlowAsScope.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616
@file:JvmSynthetic
17+
@file:Suppress("invisible_reference", "invisible_member")
1718

1819
package com.uber.rib.core
1920

@@ -41,7 +42,7 @@ internal fun <T : Comparable<T>> SharedFlow<T>.asScopeCompletable(
4142
context: CoroutineContext = EmptyCoroutineContext,
4243
): CompletableSource {
4344
ensureAlive(range)
44-
return rxCompletable(RibDispatchers.Unconfined + context) {
45+
return rxCompletable(DirectDispatcher + context) {
4546
takeWhile { it < range.endInclusive }.collect()
4647
}
4748
}

android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/Interactor.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
@file:Suppress("invisible_reference", "invisible_member")
17+
1618
package com.uber.rib.core
1719

1820
import androidx.annotation.CallSuper
@@ -46,7 +48,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
4648

4749
@Volatile private var _lifecycleObservable: Observable<InteractorEvent>? = null
4850
private val lifecycleObservable
49-
get() = ::_lifecycleObservable.setIfNullAndGet { lifecycleFlow.asObservable() }
51+
get() = ::_lifecycleObservable.setIfNullAndGet { lifecycleFlow.asObservable(DirectDispatcher) }
5052

5153
private val routerDelegate = InitOnceProperty<R>()
5254

android/libraries/rib-base/src/main/kotlin/com/uber/rib/core/Presenter.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public abstract class Presenter : ScopeProvider, RibActionEmitter {
4040

4141
@Volatile private var _lifecycleObservable: Observable<PresenterEvent>? = null
4242
private val lifecycleObservable
43-
get() = ::_lifecycleObservable.setIfNullAndGet { lifecycleFlow.asObservable() }
43+
get() = ::_lifecycleObservable.setIfNullAndGet { lifecycleFlow.asObservable(DirectDispatcher) }
4444

4545
/** @return `true` if the presenter is loaded, `false` if not. */
4646
protected var isLoaded: Boolean = false
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (C) 2023. Uber Technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.uber.rib.core
17+
18+
import com.uber.rib.core.lifecycle.InteractorEvent
19+
import kotlinx.coroutines.ExperimentalCoroutinesApi
20+
import kotlinx.coroutines.test.runTest
21+
import org.junit.Test
22+
23+
@OptIn(ExperimentalCoroutinesApi::class)
24+
class InteractorTest {
25+
@Test
26+
fun assertInteractorEventEagerness() = runTest {
27+
assertLifecycleEventEagerness(InteractorEvent.ACTIVE) {
28+
val interactor = object : Interactor<Any, Router<*>>() {}
29+
interactor.apply {
30+
setPresenter(Unit)
31+
dispatchAttach(null)
32+
}
33+
}
34+
}
35+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (C) 2023. Uber Technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.uber.rib.core
17+
18+
import com.google.common.truth.Truth.assertThat
19+
import com.uber.autodispose.lifecycle.LifecycleScopeProvider
20+
import io.reactivex.disposables.Disposable
21+
import kotlinx.coroutines.Dispatchers
22+
import kotlinx.coroutines.ExperimentalCoroutinesApi
23+
import kotlinx.coroutines.launch
24+
import kotlinx.coroutines.test.TestScope
25+
import kotlinx.coroutines.test.advanceUntilIdle
26+
27+
@OptIn(ExperimentalCoroutinesApi::class)
28+
internal inline fun <T> TestScope.assertLifecycleEventEagerness(
29+
event: T,
30+
crossinline lifecycleScopeProviderProducer: () -> LifecycleScopeProvider<T>,
31+
) {
32+
val events = mutableListOf<Event<T>>()
33+
var disposable: Disposable? = null
34+
launch(Dispatchers.Unconfined) {
35+
disposable =
36+
lifecycleScopeProviderProducer().lifecycle().subscribe { lifecycleEvent ->
37+
events.add(Event.Lifecycle(lifecycleEvent))
38+
}
39+
events.add(Event.AfterSubscription)
40+
}
41+
advanceUntilIdle()
42+
assertThat(events).isEqualTo(listOf(Event.Lifecycle(event), Event.AfterSubscription))
43+
disposable?.dispose()
44+
}
45+
46+
@PublishedApi
47+
internal sealed interface Event<out T> {
48+
data class Lifecycle<T>(val event: T) : Event<T>
49+
object AfterSubscription : Event<Nothing>
50+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (C) 2023. Uber Technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.uber.rib.core
17+
18+
import kotlin.coroutines.CoroutineContext
19+
import kotlinx.coroutines.CoroutineDispatcher
20+
import kotlinx.coroutines.Runnable
21+
22+
/**
23+
* A dispatcher that immediately executes the [Runnable] on the same stack frame, without
24+
* potentially forming event loops like [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] or
25+
* [Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate] in case of nested
26+
* coroutines.
27+
*
28+
* For more context, see the following issues on `kotlinx.coroutines` GitHub repository:
29+
* <!-- spotless:off -->
30+
* 1. [Immediate dispatchers can cause spooky action at a distance](https://github.com/Kotlin/kotlinx.coroutines/issues/3760)
31+
* 2. [Chaining rxSingle calls that use Unconfined dispatcher and blockingGet results in deadlock #3458](https://github.com/Kotlin/kotlinx.coroutines/issues/3458)
32+
* 3. [Coroutines/Flow vs add/remove listener (synchronous execution) #3506](https://github.com/Kotlin/kotlinx.coroutines/issues/3506)
33+
* <!-- spotless:on -->
34+
*/
35+
internal object DirectDispatcher : CoroutineDispatcher() {
36+
override fun dispatch(context: CoroutineContext, block: Runnable) {
37+
block.run()
38+
}
39+
}

android/libraries/rib-router-navigator/src/main/kotlin/com/uber/rib/core/RouterNavigatorEvents.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
@file:Suppress("invisible_reference", "invisible_member")
17+
1618
package com.uber.rib.core
1719

1820
import io.reactivex.Observable
@@ -26,7 +28,7 @@ public class RouterNavigatorEvents private constructor() {
2628
private val _events = MutableSharedFlow<RouterNavigatorEvent>(0, 1, BufferOverflow.DROP_OLDEST)
2729

2830
/** @return the stream which can be subcribed to listen for [RouterNavigatorEvent] */
29-
public val events: Observable<RouterNavigatorEvent> = _events.asObservable()
31+
public val events: Observable<RouterNavigatorEvent> = _events.asObservable(DirectDispatcher)
3032

3133
@JvmSynthetic // Hide from Java consumers. In Java, `getEvents` resolves to the `events` property.
3234
@JvmName("_getEvents")

android/libraries/rib-screen-stack-base/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dependencies {
2727
api(libs.rxjava2)
2828
api(libs.rxrelay2)
2929
api(libs.rxbinding)
30+
implementation(project(":libraries:rib-coroutines"))
3031
implementation(libs.annotation)
3132
implementation(libs.autodispose.coroutines)
3233
implementation(libs.coroutines.android)

0 commit comments

Comments
 (0)