Skip to content

Commit 7b56408

Browse files
committed
Observe URL change in websocket core
1 parent b0afac6 commit 7b56408

File tree

2 files changed

+143
-9
lines changed

2 files changed

+143
-9
lines changed

common/src/main/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImpl.kt

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import io.homeassistant.companion.android.common.data.HomeAssistantApis.Companio
77
import io.homeassistant.companion.android.common.data.HomeAssistantVersion
88
import io.homeassistant.companion.android.common.data.authentication.AuthorizationException
99
import io.homeassistant.companion.android.common.data.servers.ServerManager
10-
import io.homeassistant.companion.android.common.data.servers.firstUrlOrNull
10+
import io.homeassistant.companion.android.common.data.servers.UrlState
1111
import io.homeassistant.companion.android.common.data.websocket.WebSocketCore
1212
import io.homeassistant.companion.android.common.data.websocket.WebSocketRequest
1313
import io.homeassistant.companion.android.common.data.websocket.WebSocketState
@@ -153,6 +153,8 @@ internal class WebSocketCoreImpl(
153153
private var connection: WebSocket? = null
154154
private var connectionState: WebSocketState? = null
155155
private var connectionHaVersion: HomeAssistantVersion? = null
156+
private var connectedUrl: URL? = null
157+
private var urlObserverJob: Job? = null
156158
private val connectedMutex = Mutex()
157159

158160
/**
@@ -183,7 +185,7 @@ internal class WebSocketCoreImpl(
183185
return !connected.isCancelled
184186
}
185187

186-
val url = connectionStateProvider().urlFlow().firstUrlOrNull()
188+
val url = startUrlObserverAndAwaitFirstUrl()
187189
if (url == null) {
188190
Timber.w("No URL available to open WebSocket connection")
189191
return false
@@ -196,6 +198,7 @@ internal class WebSocketCoreImpl(
196198
).also {
197199
// Preemptively send auth
198200
connectionState = WebSocketState.AUTHENTICATING
201+
connectedUrl = url
199202
val result = it.send(
200203
kotlinJsonMapper.encodeToString(
201204
mapOf(
@@ -209,12 +212,15 @@ internal class WebSocketCoreImpl(
209212
if (!result) {
210213
Timber.e("Unable to send auth message")
211214
connectionState = null
215+
connectedUrl = null
216+
urlObserverJob?.cancel()
212217
return false
213218
}
214219
}
215220
} catch (e: Exception) {
216221
Timber.e(e, "Unable to connect")
217222
connectionState = null
223+
urlObserverJob?.cancel()
218224
return false
219225
}
220226

@@ -242,15 +248,54 @@ internal class WebSocketCoreImpl(
242248
}
243249
}
244250
}
251+
if (!didConnect) {
252+
urlObserverJob?.cancel()
253+
}
245254
didConnect
246255
} catch (e: Exception) {
247256
Timber.e(e, "Unable to authenticate")
257+
urlObserverJob?.cancel()
248258
false
249259
}
250260
}
251261
}
252262
}
253263

264+
/**
265+
* Starts observing URL changes using a single flow subscription.
266+
* Returns the first available URL and continues observing for changes in the background.
267+
* When the URL changes, disconnects immediately the WebSocket to trigger reconnection via [handleClosingSocket].
268+
*
269+
* @return the first URL if available, or `null` if not
270+
*/
271+
private suspend fun startUrlObserverAndAwaitFirstUrl(): URL? {
272+
urlObserverJob?.cancel()
273+
274+
val firstUrlDeferred = CompletableDeferred<URL?>()
275+
276+
urlObserverJob = wsScope.launch {
277+
var isFirstEmission = true
278+
connectionStateProvider().urlFlow()
279+
.collect { urlState ->
280+
val url = (urlState as? UrlState.HasUrl)?.url
281+
if (isFirstEmission) {
282+
isFirstEmission = false
283+
firstUrlDeferred.complete(url)
284+
} else if (url != connectedUrl) {
285+
if (urlState is UrlState.InsecureState) {
286+
Timber.w("Insecure state, disconnecting immediately.")
287+
} else {
288+
Timber.w("URL changed, disconnecting immediately.")
289+
}
290+
// we abruptly cancel the connection since we might be in an insecure state
291+
connection?.cancel()
292+
}
293+
}
294+
}
295+
296+
return firstUrlDeferred.await()
297+
}
298+
254299
override fun getConnectionState(): WebSocketState? = connectionState
255300

256301
override suspend fun sendMessage(request: Map<String, Any?>): RawMessageSocketResponse? =
@@ -607,11 +652,14 @@ internal class WebSocketCoreImpl(
607652
}
608653

609654
private fun handleClosingSocket() {
655+
urlObserverJob?.cancel()
656+
urlObserverJob = null
610657
val cancelPendingMessagesJob = wsScope.launch {
611658
connectedMutex.withLock {
612659
connected = CompletableDeferred()
613660
connection = null
614661
connectionHaVersion = null
662+
connectedUrl = null
615663
if (connectionState != WebSocketState.CLOSED_AUTH) {
616664
connectionState = WebSocketState.CLOSED_OTHER
617665
}

common/src/test/kotlin/io/homeassistant/companion/android/common/data/websocket/impl/WebSocketCoreImplTest.kt

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ import io.mockk.slot
2626
import io.mockk.unmockkAll
2727
import io.mockk.verify
2828
import io.mockk.verifyOrder
29+
import java.io.IOException
2930
import kotlinx.coroutines.CoroutineScope
3031
import kotlinx.coroutines.ExperimentalCoroutinesApi
32+
import kotlinx.coroutines.flow.Flow
33+
import kotlinx.coroutines.flow.MutableStateFlow
3134
import kotlinx.coroutines.flow.flowOf
3235
import kotlinx.coroutines.test.TestScope
3336
import kotlinx.coroutines.test.UnconfinedTestDispatcher
@@ -41,7 +44,6 @@ import okio.ByteString
4144
import org.junit.jupiter.api.AfterEach
4245
import org.junit.jupiter.api.Assertions.assertEquals
4346
import org.junit.jupiter.api.Assertions.assertFalse
44-
import org.junit.jupiter.api.Assertions.assertSame
4547
import org.junit.jupiter.api.Assertions.assertTrue
4648
import org.junit.jupiter.api.Test
4749
import org.junit.jupiter.api.assertNotNull
@@ -66,6 +68,7 @@ class WebSocketCoreImplTest {
6668
private fun TestScope.setupServer(
6769
url: String = "https://io.ha",
6870
backgroundScope: CoroutineScope = this.backgroundScope,
71+
urlFlow: Flow<UrlState>? = null,
6972
) {
7073
mockOkHttpClient = mockk<OkHttpClient>(relaxed = true)
7174
val mockServerManager = mockk<ServerManager>(relaxed = true)
@@ -90,7 +93,7 @@ class WebSocketCoreImplTest {
9093
coEvery { mockAuthenticationRepository.retrieveAccessToken() } returns "mock_access_token"
9194
// Use OkHttp's URL parsing to normalize URLs (adds trailing slash) like the real implementation
9295
val parsedUrl = url.takeIf { it.startsWith("http") }?.toHttpUrlOrNull()?.toUrl()
93-
every { mockConnectionStateProvider.urlFlow() } returns flowOf(UrlState.HasUrl(parsedUrl))
96+
every { mockConnectionStateProvider.urlFlow() } returns (urlFlow ?: flowOf(UrlState.HasUrl(parsedUrl)))
9497
// The implementation use a background scope to properly handle async messages, to not block the test
9598
// we are injecting a background scope to properly control it within the tests, the scope will close itself at the end of the test
9699
webSocketCore = WebSocketCoreImpl(
@@ -122,7 +125,7 @@ class WebSocketCoreImplTest {
122125
},
123126
)
124127
} answers {
125-
assertSame(WebSocketState.AUTHENTICATING, webSocketCore.getConnectionState())
128+
assertEquals(WebSocketState.AUTHENTICATING, webSocketCore.getConnectionState())
126129
webSocketListener.onMessage(
127130
mockConnection,
128131
"""{"type":"${if (successfulAuth) "auth_ok" else "auth_invalid"}","ha_version":"$haVersion"}""",
@@ -181,6 +184,25 @@ connect()
181184
assertNull(webSocketCore.getConnectionState())
182185
}
183186

187+
@Test
188+
fun `Given InsecureState When connect is invoked Then it returns false and connection state is null`() = runTest {
189+
val serverManager = mockk<ServerManager>(relaxed = true)
190+
val mockConnectionStateProvider = mockk<ServerConnectionStateProvider>(relaxed = true)
191+
coEvery { serverManager.connectionStateProvider(1) } returns mockConnectionStateProvider
192+
coEvery { mockConnectionStateProvider.urlFlow() } returns flowOf(UrlState.InsecureState)
193+
194+
val webSocketCore = WebSocketCoreImpl(
195+
okHttpClient = mockk(),
196+
serverManager = serverManager,
197+
serverId = 1,
198+
)
199+
200+
val result = webSocketCore.connect()
201+
202+
assertFalse(result)
203+
assertNull(webSocketCore.getConnectionState())
204+
}
205+
184206
@Test
185207
fun `Given failure to send auth message after socket creation When connect is invoked Then it returns false and connection state is null`() = runTest {
186208
setupServer()
@@ -214,7 +236,7 @@ connect()
214236

215237
val result = webSocketCore.connect()
216238
assertTrue(result)
217-
assertSame(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
239+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
218240
}
219241

220242
@Test
@@ -225,7 +247,7 @@ connect()
225247
val result = webSocketCore.connect()
226248

227249
assertFalse(result)
228-
assertSame(WebSocketState.CLOSED_AUTH, webSocketCore.getConnectionState())
250+
assertEquals(WebSocketState.CLOSED_AUTH, webSocketCore.getConnectionState())
229251
}
230252

231253
@Test
@@ -253,7 +275,7 @@ connect()
253275
}
254276
// auth and supported_features
255277
coVerify(exactly = 2) { mockConnection.send(any<String>()) }
256-
assertSame(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
278+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
257279
}
258280

259281
@Test
@@ -266,7 +288,71 @@ connect()
266288
assertTrue(result)
267289
// auth
268290
coVerify(exactly = 1) { mockConnection.send(any<String>()) }
269-
assertSame(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
291+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
292+
}
293+
294+
@Test
295+
fun `Given an active connection When URL changes Then it cancels the connection`() = runTest {
296+
val urlStateFlow = MutableStateFlow<UrlState>(UrlState.HasUrl("https://io.ha".toHttpUrlOrNull()?.toUrl()))
297+
setupServer(urlFlow = urlStateFlow, backgroundScope = backgroundScope)
298+
prepareAuthenticationAnswer()
299+
300+
// Simulate WebSocket behavior: cancel() triggers onFailure callback
301+
every { mockConnection.cancel() } answers {
302+
webSocketListener.onFailure(mockConnection, IOException("Canceled"), null)
303+
}
304+
305+
assertTrue(webSocketCore.connect())
306+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
307+
308+
urlStateFlow.value = UrlState.HasUrl("https://new.io.ha".toHttpUrlOrNull()?.toUrl())
309+
advanceUntilIdle()
310+
311+
verify { mockConnection.cancel() }
312+
assertEquals(WebSocketState.CLOSED_OTHER, webSocketCore.getConnectionState())
313+
}
314+
315+
@Test
316+
fun `Given an active connection When state changes to InsecureState Then it cancels the connection`() = runTest {
317+
val urlStateFlow = MutableStateFlow<UrlState>(UrlState.HasUrl("https://io.ha".toHttpUrlOrNull()?.toUrl()))
318+
setupServer(urlFlow = urlStateFlow, backgroundScope = backgroundScope)
319+
prepareAuthenticationAnswer()
320+
321+
// Simulate WebSocket behavior: cancel() triggers onFailure callback
322+
every { mockConnection.cancel() } answers {
323+
webSocketListener.onFailure(mockConnection, IOException("Canceled"), null)
324+
}
325+
326+
assertTrue(webSocketCore.connect())
327+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
328+
329+
urlStateFlow.value = UrlState.InsecureState
330+
advanceUntilIdle()
331+
332+
verify { mockConnection.cancel() }
333+
assertEquals(WebSocketState.CLOSED_OTHER, webSocketCore.getConnectionState())
334+
}
335+
336+
@Test
337+
fun `Given an active connection When URL emits the same value Then it does not cancel the connection`() = runTest {
338+
val initialUrl = "https://io.ha".toHttpUrlOrNull()?.toUrl()
339+
val urlStateFlow = MutableStateFlow<UrlState>(UrlState.HasUrl(initialUrl))
340+
setupServer(urlFlow = urlStateFlow, backgroundScope = backgroundScope)
341+
prepareAuthenticationAnswer()
342+
343+
assertTrue(webSocketCore.connect())
344+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
345+
346+
// Emit the same URL again
347+
urlStateFlow.value = UrlState.HasUrl(initialUrl)
348+
advanceUntilIdle()
349+
350+
verify(exactly = 0) { mockConnection.cancel() }
351+
assertEquals(WebSocketState.ACTIVE, webSocketCore.getConnectionState())
352+
353+
// Clean up by closing the connection to stop the URL observer
354+
closeConnection()
355+
advanceUntilIdle()
270356
}
271357

272358
/*

0 commit comments

Comments
 (0)