Skip to content

Commit 1a0367e

Browse files
committed
Handle failures during socket connection
1 parent 1350ef6 commit 1a0367e

File tree

2 files changed

+73
-1
lines changed

2 files changed

+73
-1
lines changed

stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,11 @@ internal class StreamSocketSession<T>(
368368
failure(it)
369369
}
370370
}
371+
372+
override fun onFailure(t: Throwable, response: Response?) {
373+
logger.e(t) { "[onFailure] Socket failure. ${t.message}" }
374+
failure(t)
375+
}
371376
}
372377

373378
val hsRes =

stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,15 @@ import io.getstream.android.core.internal.model.events.StreamHealthCheckEvent
3636
import io.getstream.android.core.internal.serialization.StreamCompositeEventSerializationImpl
3737
import io.getstream.android.core.internal.serialization.StreamCompositeSerializationEvent
3838
import io.getstream.android.core.internal.socket.model.ConnectUserData
39-
import io.mockk.*
39+
import io.mockk.MockKAnnotations
40+
import io.mockk.Runs
41+
import io.mockk.every
42+
import io.mockk.just
43+
import io.mockk.mockk
44+
import io.mockk.slot
45+
import io.mockk.verify
4046
import junit.framework.Assert.assertEquals
47+
import kotlin.time.Duration.Companion.seconds
4148
import kotlinx.coroutines.async
4249
import kotlinx.coroutines.cancelAndJoin
4350
import kotlinx.coroutines.test.advanceUntilIdle
@@ -1208,6 +1215,66 @@ class StreamSocketSessionTest {
12081215
job.cancel()
12091216
}
12101217

1218+
@Test
1219+
fun `handshake onFailure triggers connect failure`() =
1220+
runTest(timeout = 1.seconds) {
1221+
val seenStates = mutableListOf<StreamConnectionState>()
1222+
every { subs.forEach(any()) } answers
1223+
{
1224+
val consumer = arg<(StreamClientListener) -> Unit>(0)
1225+
val listener =
1226+
object : StreamClientListener {
1227+
override fun onState(state: StreamConnectionState) {
1228+
seenStates += state
1229+
}
1230+
1231+
override fun onEvent(event: Any) {}
1232+
}
1233+
consumer(listener)
1234+
Result.success(Unit)
1235+
}
1236+
1237+
val lifeSub = mockk<StreamSubscription>(relaxed = true)
1238+
val hsSub = mockk<StreamSubscription>(relaxed = true)
1239+
1240+
every { socket.subscribe(any<StreamWebSocketListener>()) } returns
1241+
Result.success(lifeSub)
1242+
1243+
var hsListener: StreamWebSocketListener? = null
1244+
every { socket.subscribe(any<StreamWebSocketListener>(), any()) } answers
1245+
{
1246+
hsListener = firstArg()
1247+
Result.success(hsSub)
1248+
}
1249+
1250+
val socketFailure = RuntimeException("WebSocket connection failed")
1251+
val mockResponse = mockk<Response>(relaxed = true)
1252+
1253+
every { socket.open(config) } answers
1254+
{
1255+
// Simulate socket failure during handshake
1256+
hsListener?.onFailure(socketFailure, mockResponse)
1257+
?: error("Handshake listener not installed")
1258+
Result.success(Unit)
1259+
}
1260+
1261+
every { socket.close() } returns Result.success(Unit)
1262+
1263+
val result = session.connect(connectUserData())
1264+
1265+
assertTrue(result.isFailure)
1266+
1267+
// Verify that the handshake subscription was cancelled
1268+
verify { hsSub.cancel() }
1269+
1270+
// Verify that proper connection states were emitted
1271+
assertTrue(seenStates.first() is StreamConnectionState.Connecting.Opening)
1272+
assertTrue(seenStates.any { it is StreamConnectionState.Disconnected })
1273+
1274+
// Verify no health monitoring started
1275+
verify(exactly = 0) { health.start() }
1276+
}
1277+
12111278
private fun connectUserData(): ConnectUserData =
12121279
ConnectUserData("u", "t", null, null, false, null, emptyMap())
12131280
}

0 commit comments

Comments
 (0)