Skip to content

Commit 76303c1

Browse files
Allow for correctly reconnecting after initial connect() is interrupted (#34)
1 parent 50c3431 commit 76303c1

File tree

2 files changed

+255
-7
lines changed

2 files changed

+255
-7
lines changed

stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import io.getstream.android.core.api.subscribe.StreamSubscription
3636
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
3737
import io.getstream.android.core.api.utils.flatMap
3838
import io.getstream.android.core.api.utils.onTokenError
39+
import io.getstream.android.core.api.utils.runCatchingCancellable
3940
import io.getstream.android.core.api.utils.update
4041
import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor
4142
import io.getstream.android.core.internal.observers.StreamNetworkAndLifecycleMonitorListener
@@ -133,8 +134,10 @@ internal class StreamClientImpl<T>(
133134
.subscribe(networkAndLifecycleMonitorListener, retentionOptions)
134135
.getOrThrow()
135136
}
136-
tokenManager
137-
.loadIfAbsent()
137+
// Network and Lifecycle manager must start first
138+
networkAndLifeCycleMonitor
139+
.start()
140+
.flatMap { tokenManager.loadIfAbsent() }
138141
.flatMap { token -> connectSocketSession(token) }
139142
.fold(
140143
onSuccess = { connected ->
@@ -150,9 +153,6 @@ internal class StreamClientImpl<T>(
150153
Result.failure(error)
151154
},
152155
)
153-
.flatMap { connectedUser ->
154-
networkAndLifeCycleMonitor.start().map { connectedUser }
155-
}
156156
.getOrThrow()
157157
}
158158

@@ -211,7 +211,13 @@ internal class StreamClientImpl<T>(
211211

212212
is Recovery.Disconnect<*> -> {
213213
logger.v { "[recovery] Disconnecting: $recovery" }
214-
socketSession.disconnect().notifyFailure(subscriptionManager)
214+
mutableConnectionState.update(StreamConnectionState.Disconnected())
215+
runCatchingCancellable {
216+
singleFlight.cancel(connectKey).getOrThrow()
217+
connectionIdHolder.clear().getOrThrow()
218+
socketSession.disconnect().getOrThrow()
219+
}
220+
.notifyFailure(subscriptionManager)
215221
}
216222

217223
is Recovery.Error -> {

stream-android-core/src/test/java/io/getstream/android/core/internal/client/StreamClientIImplTest.kt

Lines changed: 243 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package io.getstream.android.core.internal.client
2020

2121
import io.getstream.android.core.api.authentication.StreamTokenManager
2222
import io.getstream.android.core.api.log.StreamLogger
23+
import io.getstream.android.core.api.model.StreamTypedKey
2324
import io.getstream.android.core.api.model.connection.StreamConnectedUser
2425
import io.getstream.android.core.api.model.connection.StreamConnectionState
2526
import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState
@@ -31,24 +32,35 @@ import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData
3132
import io.getstream.android.core.api.model.exceptions.StreamEndpointException
3233
import io.getstream.android.core.api.model.value.StreamToken
3334
import io.getstream.android.core.api.model.value.StreamUserId
35+
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleListener
36+
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor
37+
import io.getstream.android.core.api.observers.network.StreamNetworkMonitor
38+
import io.getstream.android.core.api.observers.network.StreamNetworkMonitorListener
3439
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
3540
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
3641
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
3742
import io.getstream.android.core.api.socket.StreamConnectionIdHolder
3843
import io.getstream.android.core.api.socket.listeners.StreamClientListener
3944
import io.getstream.android.core.api.subscribe.StreamSubscription
4045
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
46+
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager.Options
4147
import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor
4248
import io.getstream.android.core.internal.observers.StreamNetworkAndLifecycleMonitorListener
49+
import io.getstream.android.core.internal.recovery.StreamConnectionRecoveryEvaluatorImpl
4350
import io.getstream.android.core.internal.socket.StreamSocketSession
51+
import io.getstream.android.core.testing.TestLogger
4452
import io.mockk.*
4553
import kotlin.time.ExperimentalTime
54+
import kotlinx.coroutines.CompletableDeferred
4655
import kotlinx.coroutines.CoroutineScope
4756
import kotlinx.coroutines.flow.MutableStateFlow
4857
import kotlinx.coroutines.flow.update
58+
import kotlinx.coroutines.launch
59+
import kotlinx.coroutines.runBlocking
60+
import kotlinx.coroutines.suspendCancellableCoroutine
61+
import kotlinx.coroutines.test.advanceTimeBy
4962
import kotlinx.coroutines.test.advanceUntilIdle
5063
import kotlinx.coroutines.test.runTest
51-
import org.bouncycastle.util.test.SimpleTest.runTest
5264
import org.junit.Assert.*
5365
import org.junit.Before
5466
import org.junit.Test
@@ -156,6 +168,99 @@ class StreamClientIImplTest {
156168
}
157169
}
158170

171+
private class TestLifecycleMonitor : StreamLifecycleMonitor {
172+
private val listeners = mutableSetOf<StreamLifecycleListener>()
173+
private var started = false
174+
175+
override fun start(): Result<Unit> = Result.success(Unit).also { started = true }
176+
177+
override fun stop(): Result<Unit> =
178+
Result.success(Unit).also {
179+
started = false
180+
listeners.clear()
181+
}
182+
183+
override fun subscribe(
184+
listener: StreamLifecycleListener,
185+
options: Options,
186+
): Result<StreamSubscription> {
187+
listeners += listener
188+
return Result.success(
189+
object : StreamSubscription {
190+
override fun cancel() {
191+
listeners -= listener
192+
}
193+
}
194+
)
195+
}
196+
197+
override fun getCurrentState(): StreamLifecycleState = StreamLifecycleState.Unknown
198+
199+
fun emitBackground() {
200+
if (!started) return
201+
listeners.forEach { it.onBackground() }
202+
}
203+
204+
fun emitForeground() {
205+
if (!started) return
206+
listeners.forEach { it.onForeground() }
207+
}
208+
}
209+
210+
private class TestNetworkMonitor : StreamNetworkMonitor {
211+
private var listener: StreamNetworkMonitorListener? = null
212+
private var started = false
213+
214+
override fun start(): Result<Unit> = Result.success(Unit).also { started = true }
215+
216+
override fun stop(): Result<Unit> =
217+
Result.success(Unit).also {
218+
started = false
219+
listener = null
220+
}
221+
222+
override fun subscribe(
223+
listener: StreamNetworkMonitorListener,
224+
options: Options,
225+
): Result<StreamSubscription> {
226+
this.listener = listener
227+
return Result.success(
228+
object : StreamSubscription {
229+
override fun cancel() {
230+
if (this@TestNetworkMonitor.listener === listener) {
231+
this@TestNetworkMonitor.listener = null
232+
}
233+
}
234+
}
235+
)
236+
}
237+
238+
fun emitConnected(snapshot: StreamNetworkInfo.Snapshot?) {
239+
if (!started) return
240+
runBlocking { listener?.onNetworkConnected(snapshot) }
241+
}
242+
243+
fun emitLost(permanent: Boolean) {
244+
if (!started) return
245+
runBlocking { listener?.onNetworkLost(permanent) }
246+
}
247+
}
248+
249+
private class ImmediateSingleFlightProcessor : StreamSingleFlightProcessor {
250+
override suspend fun <T> run(key: StreamTypedKey<T>, block: suspend () -> T): Result<T> =
251+
runCatching {
252+
block()
253+
}
254+
255+
override fun <T> has(key: StreamTypedKey<T>): Boolean = false
256+
257+
override fun <T> cancel(key: StreamTypedKey<T>): Result<Unit> = Result.success(Unit)
258+
259+
override fun clear(cancelRunning: Boolean): Result<Unit> = Result.success(Unit)
260+
261+
override fun stop(): Result<Unit> = Result.success(Unit)
262+
}
263+
159264
@Test
160265
fun `connect short-circuits when already connected`() = runTest {
161266
val connectedUser = mockk<StreamConnectedUser>(relaxed = true)
@@ -457,6 +562,143 @@ class StreamClientIImplTest {
457562
assertTrue(recoveries.contains(expectedRecovery))
458563
}
459564

565+
@Test
566+
fun `recovery disconnects when backgrounding during long connect`() = runTest {
567+
var networkListener: StreamNetworkAndLifecycleMonitorListener? = null
568+
val networkMonitor = capturingNetworkMonitor { networkListener = it }
569+
val recoveryEvaluator = mockk<StreamConnectionRecoveryEvaluator>()
570+
val expectedRecovery = Recovery.Disconnect("background")
571+
coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns
572+
Result.success(expectedRecovery)
573+
coEvery { socketSession.disconnect() } returns Result.success(Unit)
574+
coEvery { socketSession.subscribe(any<StreamClientListener>(), any()) } returns
575+
Result.success(mockk(relaxed = true))
576+
coEvery { tokenManager.loadIfAbsent() } returns
577+
Result.success(StreamToken.fromString("tok"))
578+
coEvery { socketSession.connect(any()) } coAnswers
579+
{
580+
suspendCancellableCoroutine<Result<StreamConnectionState.Connected>> {}
581+
}
582+
583+
val client = createClient(this, networkMonitor, recoveryEvaluator)
584+
585+
val connectJob = launch { client.connect().onFailure {} }
586+
advanceUntilIdle()
587+
588+
val listener = networkListener ?: error("Network listener not registered")
589+
listener.onNetworkAndLifecycleState(
590+
StreamNetworkState.Disconnected,
591+
StreamLifecycleState.Background,
592+
)
593+
advanceUntilIdle()
594+
595+
coVerify(exactly = 1) { recoveryEvaluator.evaluate(any(), any(), any()) }
596+
verify(exactly = 1) { socketSession.disconnect() }
597+
598+
connectJob.cancel()
599+
}
600+
601+
@Test
602+
fun `backgrounding while initial connect is pending cancels the session`() = runTest {
603+
val lifecycleMonitor = TestLifecycleMonitor()
604+
val networkMonitor = TestNetworkMonitor()
605+
val downstreamSubscriptionManager =
606+
StreamSubscriptionManager<StreamNetworkAndLifecycleMonitorListener>(TestLogger)
607+
val monitor =
608+
StreamNetworkAndLifeCycleMonitor(
609+
logger = TestLogger,
610+
lifecycleMonitor = lifecycleMonitor,
611+
networkMonitor = networkMonitor,
612+
mutableNetworkState = MutableStateFlow(StreamNetworkState.Unknown),
613+
mutableLifecycleState = MutableStateFlow(StreamLifecycleState.Unknown),
614+
subscriptionManager = downstreamSubscriptionManager,
615+
)
616+
val recoveryEvaluator =
617+
StreamConnectionRecoveryEvaluatorImpl(TestLogger, ImmediateSingleFlightProcessor())
618+
619+
val firstConnectDeferred = CompletableDeferred<Result<StreamConnectionState.Connected>>()
620+
val connectedUser = mockk<StreamConnectedUser>(relaxed = true)
621+
val connectedState = StreamConnectionState.Connected(connectedUser, "conn-2")
622+
coEvery { tokenManager.loadIfAbsent() } returns
623+
Result.success(StreamToken.fromString("tok"))
624+
every { socketSession.subscribe(any<StreamClientListener>(), any()) } returns
625+
Result.success(mockk(relaxed = true))
626+
every { socketSession.disconnect() } returns Result.success(Unit)
627+
var firstCall = true
628+
coEvery { socketSession.connect(any()) } coAnswers
629+
{
630+
if (firstCall) {
631+
firstCall = false
632+
connFlow.update { StreamConnectionState.Connecting.Opening(userId.rawValue) }
633+
firstConnectDeferred.await()
634+
} else {
635+
Result.success(connectedState)
636+
}
637+
}
638+
639+
val client = createClient(this, monitor, recoveryEvaluator)
640+
641+
val connectJob = launch { client.connect().onFailure {} }
642+
advanceUntilIdle()
643+
644+
lifecycleMonitor.emitBackground()
645+
advanceUntilIdle()
646+
647+
verify(exactly = 1) { socketSession.disconnect() }
648+
649+
connectJob.cancel()
650+
firstConnectDeferred.cancel()
651+
}
652+
653+
@Test
654+
fun `background disconnect followed by foreground reconnect succeeds unless client disconnects`() =
655+
runTest {
656+
val lifecycleMonitor = TestLifecycleMonitor()
657+
val networkMonitor = TestNetworkMonitor()
658+
val downstreamSubscriptionManager =
659+
StreamSubscriptionManager<StreamNetworkAndLifecycleMonitorListener>(TestLogger)
660+
val monitor =
661+
StreamNetworkAndLifeCycleMonitor(
662+
logger = TestLogger,
663+
lifecycleMonitor = lifecycleMonitor,
664+
networkMonitor = networkMonitor,
665+
mutableNetworkState = MutableStateFlow(StreamNetworkState.Unknown),
666+
mutableLifecycleState = MutableStateFlow(StreamLifecycleState.Unknown),
667+
subscriptionManager = downstreamSubscriptionManager,
668+
)
669+
val recoveryEvaluator =
670+
StreamConnectionRecoveryEvaluatorImpl(TestLogger, ImmediateSingleFlightProcessor())
671+
672+
val connectedUser = mockk<StreamConnectedUser>(relaxed = true)
673+
val connectedState = StreamConnectionState.Connected(connectedUser, "conn-42")
674+
coEvery { tokenManager.loadIfAbsent() } returns
675+
Result.success(StreamToken.fromString("tok"))
676+
every { socketSession.subscribe(any<StreamClientListener>(), any()) } returns
677+
Result.success(mockk(relaxed = true))
678+
every { socketSession.disconnect() } returns Result.success(Unit)
679+
coEvery { socketSession.connect(any()) } returnsMany
680+
listOf(Result.success(connectedState), Result.success(connectedState))
681+
every { connectionIdHolder.setConnectionId("conn-42") } returns
682+
Result.success("conn-42")
683+
684+
val client = createClient(this, monitor, recoveryEvaluator)
685+
686+
client.connect().onFailure {}
687+
advanceUntilIdle()
688+
689+
lifecycleMonitor.emitBackground()
690+
advanceUntilIdle()
691+
verify(exactly = 1) { socketSession.disconnect() }
692+
assertEquals(StreamConnectionState.Disconnected(), connFlow.value)
693+
694+
lifecycleMonitor.emitForeground()
695+
networkMonitor.emitConnected(StreamNetworkInfo.Snapshot())
696+
advanceTimeBy(1000)
697+
advanceUntilIdle()
698+
699+
coVerify(exactly = 2) { socketSession.connect(any()) }
700+
}
701+
460702
@Test
461703
fun `recovery error notifies subscribers`() = runTest {
462704
var networkListener: StreamNetworkAndLifecycleMonitorListener? = null

0 commit comments

Comments
 (0)