@@ -22,17 +22,24 @@ import io.getstream.android.core.api.authentication.StreamTokenManager
2222import io.getstream.android.core.api.log.StreamLogger
2323import io.getstream.android.core.api.model.connection.StreamConnectedUser
2424import io.getstream.android.core.api.model.connection.StreamConnectionState
25+ import io.getstream.android.core.api.model.connection.lifecycle.StreamLifecycleState
26+ import io.getstream.android.core.api.model.connection.network.StreamNetworkInfo
2527import io.getstream.android.core.api.model.connection.network.StreamNetworkState
28+ import io.getstream.android.core.api.model.connection.recovery.Recovery
29+ import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData
30+ import io.getstream.android.core.api.model.exceptions.StreamEndpointException
2631import io.getstream.android.core.api.model.event.StreamClientWsEvent
2732import io.getstream.android.core.api.model.value.StreamToken
2833import io.getstream.android.core.api.model.value.StreamUserId
2934import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
3035import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
36+ import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
3137import io.getstream.android.core.api.socket.StreamConnectionIdHolder
3238import io.getstream.android.core.api.socket.listeners.StreamClientListener
3339import io.getstream.android.core.api.subscribe.StreamSubscription
3440import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
3541import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor
42+ import io.getstream.android.core.internal.observers.StreamNetworkAndLifecycleMonitorListener
3643import io.getstream.android.core.internal.socket.StreamSocketSession
3744import io.mockk.*
3845import kotlin.time.ExperimentalTime
@@ -80,19 +87,25 @@ class StreamClientIImplTest {
8087 coEvery { singleFlight.run (any(), any< suspend () -> Any > ()) } coAnswers
8188 {
8289 val block = secondArg< suspend () -> Any > ()
83- Result .success(block())
90+ try {
91+ Result .success(block())
92+ } catch (t: Throwable ) {
93+ Result .failure(t)
94+ }
8495 }
8596
8697 // Mutable client state: expose real StateFlows that update() mutates
8798 connFlow = MutableStateFlow (StreamConnectionState .Disconnected ())
8899 networkFlow = MutableStateFlow (StreamNetworkState .Unknown )
89100
90101 every { connectionIdHolder.clear() } returns Result .success(Unit )
102+ every { subscriptionManager.forEach(any()) } returns Result .success(Unit )
91103 }
92104
93105 private fun createClient (
94106 scope : CoroutineScope ,
95107 networkAndLifeCycleMonitor : StreamNetworkAndLifeCycleMonitor = mockNetworkMonitor(),
108+ connectionRecoveryEvaluator : StreamConnectionRecoveryEvaluator = mockk(relaxed = true),
96109 ) =
97110 StreamClientImpl (
98111 userId = userId,
@@ -106,14 +119,28 @@ class StreamClientIImplTest {
106119 scope = scope,
107120 subscriptionManager = subscriptionManager,
108121 networkAndLifeCycleMonitor = networkAndLifeCycleMonitor,
109- connectionRecoveryEvaluator = mockk(relaxed = true ) ,
122+ connectionRecoveryEvaluator = connectionRecoveryEvaluator ,
110123 )
111124
112125 private fun mockNetworkMonitor (): StreamNetworkAndLifeCycleMonitor =
113126 mockk(relaxed = true ) {
114127 every { start() } returns Result .success(Unit )
115128 every { stop() } returns Result .success(Unit )
116- every { subscribe(any(), any()) } returns Result .success(mockk(relaxed = true ))
129+ every { subscribe(any(), any()) } returns
130+ Result .success(mockk<StreamSubscription >(relaxed = true ))
131+ }
132+
133+ private fun capturingNetworkMonitor (
134+ onListener : (StreamNetworkAndLifecycleMonitorListener ) -> Unit
135+ ): StreamNetworkAndLifeCycleMonitor =
136+ mockk(relaxed = true ) {
137+ every { start() } returns Result .success(Unit )
138+ every { stop() } returns Result .success(Unit )
139+ every { subscribe(any(), any()) } answers
140+ {
141+ onListener(firstArg())
142+ Result .success(mockk<StreamSubscription >(relaxed = true ))
143+ }
117144 }
118145
119146 @Test
@@ -350,6 +377,171 @@ class StreamClientIImplTest {
350377 verify(exactly = 0 ) { connectionIdHolder.setConnectionId(any()) }
351378 }
352379
380+ @Test
381+ fun `recovery connect triggers another connect attempt` () = runTest {
382+ var networkListener: StreamNetworkAndLifecycleMonitorListener ? = null
383+ val networkMonitor = capturingNetworkMonitor { networkListener = it }
384+ val recoveryEvaluator = mockk<StreamConnectionRecoveryEvaluator >()
385+ coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns
386+ Result .success(Recovery .Connect (StreamNetworkInfo .Snapshot ()))
387+
388+ val error = RuntimeException (" no token" )
389+ coEvery { tokenManager.loadIfAbsent() } returnsMany
390+ listOf (Result .failure(error), Result .failure(error))
391+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } returns
392+ Result .success(mockk(relaxed = true ))
393+
394+ val client = createClient(backgroundScope, networkMonitor, recoveryEvaluator)
395+
396+ client.connect().onFailure {}
397+ advanceUntilIdle()
398+
399+ val listener = networkListener ? : error(" Network listener not registered" )
400+ val networkState = StreamNetworkState .Available (StreamNetworkInfo .Snapshot ())
401+ listener.onNetworkAndLifecycleState(networkState, StreamLifecycleState .Foreground )
402+ advanceUntilIdle()
403+
404+ coVerify(exactly = 2 ) { tokenManager.loadIfAbsent() }
405+ coVerify(exactly = 1 ) { recoveryEvaluator.evaluate(any(), any(), any()) }
406+ }
407+
408+ @Test
409+ fun `recovery disconnect closes the socket session` () = runTest {
410+ var networkListener: StreamNetworkAndLifecycleMonitorListener ? = null
411+ val networkMonitor = capturingNetworkMonitor { networkListener = it }
412+ val recoveryEvaluator = mockk<StreamConnectionRecoveryEvaluator >()
413+ coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns
414+ Result .success(Recovery .Disconnect (StreamNetworkState .Disconnected ))
415+
416+ val error = RuntimeException (" no token" )
417+ coEvery { tokenManager.loadIfAbsent() } returns Result .failure(error)
418+ every { socketSession.disconnect() } returns Result .success(Unit )
419+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } returns
420+ Result .success(mockk(relaxed = true ))
421+
422+ val client = createClient(backgroundScope, networkMonitor, recoveryEvaluator)
423+
424+ client.connect().onFailure {}
425+ advanceUntilIdle()
426+
427+ val listener = networkListener ? : error(" Network listener not registered" )
428+ listener.onNetworkAndLifecycleState(
429+ StreamNetworkState .Disconnected ,
430+ StreamLifecycleState .Background ,
431+ )
432+ advanceUntilIdle()
433+
434+ coVerify(exactly = 1 ) { recoveryEvaluator.evaluate(any(), any(), any()) }
435+ verify(exactly = 1 ) { socketSession.disconnect() }
436+ }
437+
438+ @Test
439+ fun `recovery error notifies subscribers` () = runTest {
440+ var networkListener: StreamNetworkAndLifecycleMonitorListener ? = null
441+ val networkMonitor = capturingNetworkMonitor { networkListener = it }
442+ val recoveryEvaluator = mockk<StreamConnectionRecoveryEvaluator >()
443+ val boom = RuntimeException (" recovery error" )
444+ coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns
445+ Result .success(Recovery .Error (boom))
446+
447+ val reported = mutableListOf<Throwable >()
448+ every { subscriptionManager.forEach(any()) } answers
449+ {
450+ val block = firstArg< (StreamClientListener ) -> Unit > ()
451+ val external = mockk<StreamClientListener >(relaxed = true )
452+ every { external.onError(any()) } answers { reported + = firstArg<Throwable >() }
453+ block(external)
454+ Result .success(Unit )
455+ }
456+
457+ val tokenError = RuntimeException (" token" )
458+ coEvery { tokenManager.loadIfAbsent() } returns Result .failure(tokenError)
459+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } returns
460+ Result .success(mockk(relaxed = true ))
461+
462+ val client = createClient(backgroundScope, networkMonitor, recoveryEvaluator)
463+
464+ client.connect().onFailure {}
465+ advanceUntilIdle()
466+
467+ val listener = networkListener ? : error(" Network listener not registered" )
468+ listener.onNetworkAndLifecycleState(
469+ StreamNetworkState .Disconnected ,
470+ StreamLifecycleState .Background ,
471+ )
472+ advanceUntilIdle()
473+
474+ assertTrue(reported.contains(boom))
475+ every { subscriptionManager.forEach(any()) } returns Result .success(Unit )
476+ }
477+
478+ @Test
479+ fun `recovery null results in no action` () = runTest {
480+ var networkListener: StreamNetworkAndLifecycleMonitorListener ? = null
481+ val networkMonitor = capturingNetworkMonitor { networkListener = it }
482+ val recoveryEvaluator = mockk<StreamConnectionRecoveryEvaluator >()
483+ coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns Result .success(null )
484+
485+ val tokenError = RuntimeException (" token" )
486+ coEvery { tokenManager.loadIfAbsent() } returns Result .failure(tokenError)
487+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } returns
488+ Result .success(mockk(relaxed = true ))
489+
490+ val client = createClient(backgroundScope, networkMonitor, recoveryEvaluator)
491+
492+ client.connect().onFailure {}
493+ advanceUntilIdle()
494+
495+ val listener = networkListener ? : error(" Network listener not registered" )
496+ listener.onNetworkAndLifecycleState(
497+ StreamNetworkState .Disconnected ,
498+ StreamLifecycleState .Background ,
499+ )
500+ advanceUntilIdle()
501+
502+ coVerify(exactly = 1 ) { tokenManager.loadIfAbsent() }
503+ verify(exactly = 0 ) { socketSession.disconnect() }
504+ }
505+
506+ @Test
507+ fun `recovery failure notifies subscribers` () = runTest {
508+ var networkListener: StreamNetworkAndLifecycleMonitorListener ? = null
509+ val networkMonitor = capturingNetworkMonitor { networkListener = it }
510+ val recoveryEvaluator = mockk<StreamConnectionRecoveryEvaluator >()
511+ val boom = IllegalStateException (" recovery failure" )
512+ coEvery { recoveryEvaluator.evaluate(any(), any(), any()) } returns Result .failure(boom)
513+
514+ val reported = mutableListOf<Throwable >()
515+ every { subscriptionManager.forEach(any()) } answers
516+ {
517+ val block = firstArg< (StreamClientListener ) -> Unit > ()
518+ val external = mockk<StreamClientListener >(relaxed = true )
519+ every { external.onError(any()) } answers { reported + = firstArg<Throwable >() }
520+ block(external)
521+ Result .success(Unit )
522+ }
523+
524+ val tokenError = RuntimeException (" token" )
525+ coEvery { tokenManager.loadIfAbsent() } returns Result .failure(tokenError)
526+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } returns
527+ Result .success(mockk(relaxed = true ))
528+
529+ val client = createClient(backgroundScope, networkMonitor, recoveryEvaluator)
530+
531+ client.connect().onFailure {}
532+ advanceUntilIdle()
533+
534+ val listener = networkListener ? : error(" Network listener not registered" )
535+ listener.onNetworkAndLifecycleState(
536+ StreamNetworkState .Disconnected ,
537+ StreamLifecycleState .Background ,
538+ )
539+ advanceUntilIdle()
540+
541+ assertTrue(reported.contains(boom))
542+ every { subscriptionManager.forEach(any()) } returns Result .success(Unit )
543+ }
544+
353545 @Test
354546 fun `subscription onState updates client state and forwards to subscribers` () = runTest {
355547 val client = createClient(backgroundScope)
@@ -457,4 +649,75 @@ class StreamClientIImplTest {
457649 assertTrue(forwardedEvents.contains(event))
458650 verify(atLeast = 1 ) { subscriptionManager.forEach(any()) }
459651 }
652+
653+ @Test
654+ fun `subscription onError forwards to subscribers` () = runTest {
655+ val client = createClient(backgroundScope)
656+ coEvery { singleFlight.run (any(), any< suspend () -> StreamConnectedUser > ()) } coAnswers
657+ {
658+ val block = secondArg< suspend () -> StreamConnectedUser > ()
659+ try {
660+ Result .success(block.invoke())
661+ } catch (t: Throwable ) {
662+ Result .failure(t)
663+ }
664+ }
665+
666+ var capturedListener: StreamClientListener ? = null
667+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } answers
668+ {
669+ capturedListener = firstArg()
670+ Result .success(mockk(relaxed = true ))
671+ }
672+
673+ val reported = mutableListOf<Throwable >()
674+ every { subscriptionManager.forEach(any()) } answers
675+ {
676+ val block = firstArg< (StreamClientListener ) -> Unit > ()
677+ val external = mockk<StreamClientListener >(relaxed = true )
678+ every { external.onError(any()) } answers { reported + = firstArg<Throwable >() }
679+ block(external)
680+ Result .success(Unit )
681+ }
682+
683+ coEvery { tokenManager.loadIfAbsent() } returns Result .failure(RuntimeException (" stop" ))
684+
685+ client.connect().onFailure {}
686+ advanceUntilIdle()
687+
688+ val error = RuntimeException (" socket failure" )
689+ capturedListener!! .onError(error)
690+
691+ assertTrue(reported.contains(error))
692+ every { subscriptionManager.forEach(any()) } returns Result .success(Unit )
693+ }
694+
695+ @Test
696+ fun `connect retries when token error occurs` () = runTest {
697+ val client = createClient(backgroundScope)
698+
699+ val token = StreamToken .fromString(" tok-1" )
700+ val refreshedToken = StreamToken .fromString(" tok-2" )
701+ coEvery { tokenManager.loadIfAbsent() } returns Result .success(token)
702+ justRun { tokenManager.invalidate() }
703+ coEvery { tokenManager.refresh() } returns Result .success(refreshedToken)
704+
705+ val endpointError =
706+ StreamEndpointException (apiError = StreamEndpointErrorData (code = 40 ))
707+ val connectedUser = mockk<StreamConnectedUser >(relaxed = true )
708+ val connectedState = StreamConnectionState .Connected (connectedUser, " conn-42" )
709+ coEvery { socketSession.connect(match { it.token == token.rawValue }) } returns
710+ Result .failure(endpointError)
711+ coEvery { socketSession.connect(match { it.token == refreshedToken.rawValue }) } returns
712+ Result .success(connectedState)
713+ every { socketSession.subscribe(any<StreamClientListener >(), any()) } returns
714+ Result .success(mockk(relaxed = true ))
715+ every { connectionIdHolder.setConnectionId(" conn-42" ) } returns Result .success(" conn-42" )
716+
717+ client.connect().onFailure {}
718+
719+ verify { tokenManager.invalidate() }
720+ coVerify { tokenManager.refresh() }
721+ coVerify(exactly = 2 ) { socketSession.connect(any()) }
722+ }
460723}
0 commit comments