Skip to content

Commit e4ea0d9

Browse files
Introduce the ability to supply close code and reason when disconnecting (#16)
Co-authored-by: Copilot <[email protected]>
1 parent 65a88a2 commit e4ea0d9

File tree

9 files changed

+362
-70
lines changed

9 files changed

+362
-70
lines changed

stream-android-core/src/main/java/io/getstream/android/core/api/model/exceptions/Exceptions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public open class StreamClientException(message: String = "", cause: Throwable?
4343
*/
4444
@StreamPublishedApi
4545
public class StreamEndpointException(
46-
message: String = "",
46+
message: String? = null,
4747
public val apiError: StreamEndpointErrorData? = null,
4848
cause: Throwable? = null,
4949
) : IOException(message, cause)

stream-android-core/src/main/java/io/getstream/android/core/api/socket/StreamWebSocket.kt

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import io.getstream.android.core.api.log.StreamLogger
2020
import io.getstream.android.core.api.model.config.StreamSocketConfig
2121
import io.getstream.android.core.api.socket.listeners.StreamWebSocketListener
2222
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
23+
import io.getstream.android.core.internal.socket.SocketConstants.CLOSE_SOCKET_CODE
24+
import io.getstream.android.core.internal.socket.SocketConstants.CLOSE_SOCKET_REASON
2325
import io.getstream.android.core.internal.socket.StreamWebSocketImpl
2426

2527
/**
@@ -42,13 +44,16 @@ public interface StreamWebSocket<T : StreamWebSocketListener> : StreamSubscripti
4244
public fun open(config: StreamSocketConfig): Result<Unit>
4345

4446
/**
45-
* Closes the WebSocket connection.
47+
* Closes the WebSocket connection with a custom code and reason.
4648
*
47-
* Once closed, no further messages can be sent or received.
48-
*
49-
* @return A [Result] indicating whether the connection was successfully closed.
49+
* @param code The closure status code
50+
* @param reason The reason message provided by the peer, if any.
51+
* @return A [Result] indicating whether the close operation was successful.
5052
*/
51-
public fun close(): Result<Unit>
53+
public fun close(
54+
code: Int = CLOSE_SOCKET_CODE,
55+
reason: String = CLOSE_SOCKET_REASON,
56+
): Result<Unit>
5257

5358
/**
5459
* Sends binary data through the WebSocket connection.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.getstream.android.core.api.utils
17+
18+
import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData
19+
import io.getstream.android.core.api.serialization.StreamJsonSerialization
20+
import okhttp3.Response
21+
22+
/**
23+
* Extracts the API error from the response body.
24+
*
25+
* @param with The JSON parser to use for parsing the response body.
26+
* @return The API error, or a failure if the response body could not be parsed.
27+
* @receiver The response to extract the error from.
28+
*/
29+
public fun Response.toErrorData(with: StreamJsonSerialization): Result<StreamEndpointErrorData> =
30+
runCatching { peekBody(Long.MAX_VALUE).string() }
31+
.flatMap { with.fromJson(it, StreamEndpointErrorData::class.java) }

stream-android-core/src/main/java/io/getstream/android/core/internal/http/interceptor/StreamAuthInterceptor.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.getstream.android.core.api.authentication.StreamTokenManager
1919
import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData
2020
import io.getstream.android.core.api.model.exceptions.StreamEndpointException
2121
import io.getstream.android.core.api.serialization.StreamJsonSerialization
22+
import io.getstream.android.core.api.utils.toErrorData
2223
import kotlinx.coroutines.runBlocking
2324
import okhttp3.Interceptor
2425
import okhttp3.Request
@@ -73,13 +74,12 @@ internal class StreamAuthInterceptor(
7374
}
7475

7576
// Peek only; do NOT consume
76-
val peeked = first.peekBody(PEEK_ERROR_BYTES_MAX).string()
77-
val parsed = jsonParser.fromJson(peeked, StreamEndpointErrorData::class.java)
77+
val errorData = first.toErrorData(jsonParser)
7878

7979
val alreadyRetried = original.header(HEADER_RETRIED_ON_AUTH) == "present"
8080

81-
if (parsed.isSuccess) {
82-
val error = parsed.getOrEndpointException("Failed to parse error body.")
81+
if (errorData.isSuccess) {
82+
val error = errorData.getOrEndpointException("Failed to parse error body.")
8383

8484
// Only handle token errors here
8585
if (isTokenInvalidErrorCode(error.code) && !alreadyRetried) {

stream-android-core/src/main/java/io/getstream/android/core/internal/http/interceptor/StreamEndpointErrorInterceptor.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
*/
1616
package io.getstream.android.core.internal.http.interceptor
1717

18-
import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData
1918
import io.getstream.android.core.api.model.exceptions.StreamEndpointException
2019
import io.getstream.android.core.api.serialization.StreamJsonSerialization
20+
import io.getstream.android.core.api.utils.toErrorData
2121
import kotlin.fold
22-
import kotlin.jvm.java
2322
import okhttp3.Interceptor
2423
import okhttp3.Response
2524

@@ -37,9 +36,8 @@ internal class StreamEndpointErrorInterceptor(private val jsonParser: StreamJson
3736

3837
if (!response.isSuccessful) {
3938
// Try to parse a Stream API error from the response body
40-
val errorBody = response.peekBody(Long.MAX_VALUE).string()
41-
jsonParser
42-
.fromJson(errorBody, StreamEndpointErrorData::class.java)
39+
response
40+
.toErrorData(jsonParser)
4341
.fold(
4442
onSuccess = { apiError ->
4543
throw StreamEndpointException(

stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.getstream.android.core.api.socket.listeners.StreamWebSocketListener
3232
import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor
3333
import io.getstream.android.core.api.subscribe.StreamSubscription
3434
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
35+
import io.getstream.android.core.api.utils.toErrorData
3536
import io.getstream.android.core.internal.model.StreamConnectUserDetailsRequest
3637
import io.getstream.android.core.internal.model.authentication.StreamWSAuthMessageRequest
3738
import io.getstream.android.core.internal.model.events.StreamClientConnectedEvent
@@ -117,9 +118,31 @@ internal class StreamSocketSession<T>(
117118
}
118119
}
119120

120-
/** Disconnect the socket and synchronously notify listeners. */
121-
fun disconnect(error: Throwable? = null): Result<Unit> {
122-
logger.d { "[disconnect] Disconnecting socket, error: $error" }
121+
/**
122+
* Terminates the active socket session and performs a best-effort shutdown of all components.
123+
*
124+
* The method emits a `StreamConnectionState.Disconnected` state (embedding [error] when
125+
* provided), cancels the active socket subscription, closes the underlying [StreamWebSocket],
126+
* and stops the health monitor and batch processor. Subsequent invocations are idempotent:
127+
* listeners are only notified on the first call while the socket close is still attempted every
128+
* time.
129+
*
130+
* @param error Optional cause that is propagated to subscribers via
131+
* `StreamConnectionState.Disconnected(error)`.
132+
* @param code Close code forwarded to [StreamWebSocket.close]. Defaults to the standard manual
133+
* shutdown code used by the SDK.
134+
* @param reason Reason string forwarded to [StreamWebSocket.close]. Defaults to the standard
135+
* human-readable explanation used by the SDK.
136+
* @return The result returned by [StreamWebSocket.close] after cleanup completes.
137+
*/
138+
fun disconnect(
139+
error: Throwable? = null,
140+
code: Int = SocketConstants.CLOSE_SOCKET_CODE,
141+
reason: String = SocketConstants.CLOSE_SOCKET_REASON,
142+
): Result<Unit> {
143+
logger.d {
144+
"[disconnect] Disconnecting socket, code: $code, reason: $reason, error: $error"
145+
}
123146
if (!closingByUs.getAndSet(true)) {
124147
val newState =
125148
if (error == null) {
@@ -132,14 +155,29 @@ internal class StreamSocketSession<T>(
132155
// Cancel subscriptions before closing to avoid duplicate callbacks.
133156
socketSubscription?.cancel()
134157
val closeRes =
135-
internalSocket.close().onFailure { throwable ->
136-
logger.e { "[disconnect] Failed to close socket. ${throwable.message}" }
137-
}
158+
internalSocket
159+
.close(code, reason)
160+
.onSuccess { logger.d { "[disconnect] Socket closed" } }
161+
.onFailure { throwable ->
162+
logger.e { "[disconnect] Failed to close socket. ${throwable.message}" }
163+
}
138164
cleanup()
139165
return closeRes
140166
}
141167

142-
/** Connects the user to the socket. */
168+
/**
169+
* Opens the socket and completes the Stream authentication handshake for the provided user.
170+
*
171+
* The call subscribes to lifecycle events, opens the underlying [StreamWebSocket], performs the
172+
* auth handshake, starts the health monitor once connected, and emits all intermediate
173+
* [StreamConnectionState] updates to registered [StreamClientListener] instances. When the
174+
* coroutine is cancelled or any step fails, all temporary subscriptions are cleaned up and the
175+
* failure is propagated via the returned [Result].
176+
*
177+
* @param data Payload describing the user being connected and the products being authorised.
178+
* @return `Result.success` with the established [StreamConnectionState.Connected] when the
179+
* handshake finishes, or `Result.failure` containing the encountered error.
180+
*/
143181
suspend fun connect(data: ConnectUserData): Result<StreamConnectionState.Connected> =
144182
suspendCancellableCoroutine { continuation ->
145183
var handshakeSubscription: StreamSubscription? = null
@@ -149,7 +187,10 @@ internal class StreamSocketSession<T>(
149187
logger.d { "[connect] Cancelled: ${cause?.message}" }
150188
socketSubscription?.cancel()
151189
handshakeSubscription?.cancel()
152-
internalSocket.close()
190+
internalSocket.close(
191+
SocketConstants.CLOSE_SOCKET_CODE,
192+
SocketConstants.CLOSE_SOCKET_REASON,
193+
)
153194
cleanup()
154195
}
155196

@@ -248,17 +289,20 @@ internal class StreamSocketSession<T>(
248289
notifyState(connected) // emit state before completing
249290
continuation.resume(Result.success(connected))
250291
}
292+
logger.v { "[success] Connection successful" }
251293
}
252294

253295
val failure: (Throwable) -> Unit = { throwable ->
296+
logger.e(throwable) { "[failure] Connection failed. ${throwable.message}" }
254297
handshakeSubscription?.cancel()
255298
socketSubscription?.cancel()
256299
completeFailure(throwable)
257300
}
258301

259302
val apiFailure: (StreamEndpointErrorData) -> Unit = { apiError ->
260-
handshakeSubscription?.cancel()
261303
val error = StreamEndpointException("Connection error", apiError)
304+
logger.e(error) { "[apiFailure] Connection error: $apiError" }
305+
handshakeSubscription?.cancel()
262306
completeFailure(error)
263307
}
264308

@@ -370,13 +414,28 @@ internal class StreamSocketSession<T>(
370414
}
371415

372416
override fun onFailure(t: Throwable, response: Response?) {
373-
logger.e(t) { "[onFailure] Socket failure. ${t.message}" }
374-
failure(t)
417+
val apiError = response?.toErrorData(jsonSerialization)?.getOrNull()
418+
val exception =
419+
StreamEndpointException(
420+
message = "Socket failed during connection",
421+
cause = t,
422+
apiError = apiError,
423+
)
424+
logger.e(exception) {
425+
"[onFailure] Socket failure during connection: ${exception.message}"
426+
}
427+
failure(exception)
375428
}
376429

377430
override fun onClosed(code: Int, reason: String) {
378-
logger.e { "[onClosed] Socket closed. Code: $code, Reason: $reason" }
379-
failure(IOException("Socket closed. Code: $code, Reason: $reason"))
431+
val exception =
432+
IOException(
433+
"Socket closed during connection. Code: $code, Reason: $reason"
434+
)
435+
logger.e(exception) {
436+
"[onClosed] Socket closed during connection. Code: $code, Reason: $reason"
437+
}
438+
failure(exception)
380439
}
381440
}
382441

stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamWebSocketImpl.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import io.getstream.android.core.api.socket.StreamWebSocketFactory
2323
import io.getstream.android.core.api.socket.listeners.StreamWebSocketListener
2424
import io.getstream.android.core.api.subscribe.StreamSubscription
2525
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
26-
import io.getstream.android.core.internal.socket.SocketConstants.CLOSE_SOCKET_CODE
27-
import io.getstream.android.core.internal.socket.SocketConstants.CLOSE_SOCKET_REASON
2826
import java.io.IOException
2927
import okhttp3.Response
3028
import okhttp3.WebSocket
@@ -49,9 +47,9 @@ internal open class StreamWebSocketImpl<T : StreamWebSocketListener>(
4947
.getOrThrow()
5048
}
5149

52-
override fun close(): Result<Unit> = withSocket {
53-
logger.d { "[close] Closing socket" }
54-
socket.close(CLOSE_SOCKET_CODE, CLOSE_SOCKET_REASON)
50+
override fun close(code: Int, reason: String): Result<Unit> = withSocket {
51+
logger.d { "[close#withReason] Closing socket. Code: $code, Reason: $reason" }
52+
socket.close(code, reason)
5553
}
5654

5755
override fun send(data: ByteArray): Result<ByteArray> = withSocket {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.getstream.android.core.api.utils
17+
18+
import io.getstream.android.core.api.model.exceptions.StreamEndpointErrorData
19+
import io.getstream.android.core.api.serialization.StreamJsonSerialization
20+
import io.mockk.every
21+
import io.mockk.mockk
22+
import okhttp3.MediaType.Companion.toMediaTypeOrNull
23+
import okhttp3.Response
24+
import okhttp3.ResponseBody
25+
import org.junit.Assert.*
26+
import org.junit.Test
27+
28+
class ResponseTest {
29+
30+
@Test
31+
fun `toErrorData returns success when parsing is successful`() {
32+
val jsonSerialization = mockk<StreamJsonSerialization>()
33+
val errorData = StreamEndpointErrorData(1000, "detail", "field")
34+
val responseBody = "{\"code\":\"code\",\"detail\":\"detail\",\"field\":\"field\"}"
35+
val response =
36+
Response.Builder()
37+
.code(400)
38+
.message("Bad Request")
39+
.request(okhttp3.Request.Builder().url("http://localhost").build())
40+
.protocol(okhttp3.Protocol.HTTP_1_1)
41+
.body(ResponseBody.create("application/json".toMediaTypeOrNull(), responseBody))
42+
.build()
43+
every { jsonSerialization.fromJson(any(), StreamEndpointErrorData::class.java) } returns
44+
Result.success(errorData)
45+
val result = response.toErrorData(jsonSerialization)
46+
assertTrue(result.isSuccess)
47+
assertEquals(errorData, result.getOrNull())
48+
}
49+
50+
@Test
51+
fun `toErrorData returns failure when parsing throws`() {
52+
val jsonSerialization = mockk<StreamJsonSerialization>()
53+
val responseBody = "invalid json"
54+
val response =
55+
Response.Builder()
56+
.code(400)
57+
.message("Bad Request")
58+
.request(okhttp3.Request.Builder().url("http://localhost").build())
59+
.protocol(okhttp3.Protocol.HTTP_1_1)
60+
.body(ResponseBody.create("application/json".toMediaTypeOrNull(), responseBody))
61+
.build()
62+
every { jsonSerialization.fromJson(any(), StreamEndpointErrorData::class.java) } returns
63+
Result.failure(IllegalArgumentException("Parse error"))
64+
val result = response.toErrorData(jsonSerialization)
65+
assertTrue(result.isFailure)
66+
assertTrue(result.exceptionOrNull() is IllegalArgumentException)
67+
assertEquals("Parse error", result.exceptionOrNull()?.message)
68+
}
69+
}

0 commit comments

Comments
 (0)