Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ enum class WebSocketState {
ACTIVE,
CLOSED_AUTH,
CLOSED_OTHER,

/** Connection closed because the URL changed (e.g., switched networks). Reconnects immediately. */
CLOSED_URL_CHANGE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.homeassistant.companion.android.common.data.HomeAssistantApis.Companio
import io.homeassistant.companion.android.common.data.HomeAssistantVersion
import io.homeassistant.companion.android.common.data.authentication.AuthorizationException
import io.homeassistant.companion.android.common.data.servers.ServerManager
import io.homeassistant.companion.android.common.data.servers.firstUrlOrNull
import io.homeassistant.companion.android.common.data.servers.UrlState
import io.homeassistant.companion.android.common.data.websocket.WebSocketCore
import io.homeassistant.companion.android.common.data.websocket.WebSocketRequest
import io.homeassistant.companion.android.common.data.websocket.WebSocketState
Expand Down Expand Up @@ -153,6 +153,8 @@ internal class WebSocketCoreImpl(
private var connection: WebSocket? = null
private var connectionState: WebSocketState? = null
private var connectionHaVersion: HomeAssistantVersion? = null
private var connectedUrl: URL? = null
private var urlObserverJob: Job? = null
private val connectedMutex = Mutex()

/**
Expand Down Expand Up @@ -183,7 +185,7 @@ internal class WebSocketCoreImpl(
return !connected.isCancelled
}

val url = connectionStateProvider().urlFlow().firstUrlOrNull()
val url = startUrlObserverAndAwaitFirstUrl()
if (url == null) {
Timber.w("No URL available to open WebSocket connection")
return false
Expand All @@ -196,6 +198,7 @@ internal class WebSocketCoreImpl(
).also {
// Preemptively send auth
connectionState = WebSocketState.AUTHENTICATING
connectedUrl = url
val result = it.send(
kotlinJsonMapper.encodeToString(
mapOf(
Expand All @@ -209,12 +212,15 @@ internal class WebSocketCoreImpl(
if (!result) {
Timber.e("Unable to send auth message")
connectionState = null
connectedUrl = null
urlObserverJob?.cancel()
return false
}
}
} catch (e: Exception) {
Timber.e(e, "Unable to connect")
connectionState = null
urlObserverJob?.cancel()
return false
}

Expand Down Expand Up @@ -242,15 +248,55 @@ internal class WebSocketCoreImpl(
}
}
}
if (!didConnect) {
urlObserverJob?.cancel()
}
didConnect
} catch (e: Exception) {
Timber.e(e, "Unable to authenticate")
urlObserverJob?.cancel()
false
}
}
}
}

/**
* Starts observing URL changes using a single flow subscription.
* Returns the first available URL and continues observing for changes in the background.
* When the URL changes, disconnects immediately the WebSocket to trigger reconnection via [handleClosingSocket].
*
* @return the first URL if available, or `null` if not
*/
private suspend fun startUrlObserverAndAwaitFirstUrl(): URL? {
urlObserverJob?.cancel()

val firstUrlDeferred = CompletableDeferred<URL?>()

urlObserverJob = wsScope.launch {
var isFirstEmission = true
connectionStateProvider().urlFlow()
.collect { urlState ->
val url = (urlState as? UrlState.HasUrl)?.url
if (isFirstEmission) {
isFirstEmission = false
firstUrlDeferred.complete(url)
} else if (url != connectedUrl) {
if (urlState is UrlState.InsecureState) {
Timber.w("Insecure state, disconnecting immediately.")
} else {
Timber.w("URL changed, disconnecting immediately.")
}
// Set state before cancel() since cancel triggers onFailure -> handleClosingSocket
connectionState = WebSocketState.CLOSED_URL_CHANGE
connection?.cancel()
}
}
}

return firstUrlDeferred.await()
}

override fun getConnectionState(): WebSocketState? = connectionState

override suspend fun sendMessage(request: Map<String, Any?>): RawMessageSocketResponse? =
Expand Down Expand Up @@ -607,12 +653,22 @@ internal class WebSocketCoreImpl(
}

private fun handleClosingSocket() {
urlObserverJob?.cancel()
urlObserverJob = null
// Capture current state before it gets modified in the mutex block
val closingState = connectionState
val cancelPendingMessagesJob = wsScope.launch {
connectedMutex.withLock {
connected = CompletableDeferred()
connection = null
connectionHaVersion = null
if (connectionState != WebSocketState.CLOSED_AUTH) {
connectedUrl = null
// Preserve specific closure states, otherwise set to CLOSED_OTHER
if (connectionState !in listOf(
WebSocketState.CLOSED_AUTH,
WebSocketState.CLOSED_URL_CHANGE,
)
) {
connectionState = WebSocketState.CLOSED_OTHER
}
activeMessages
Expand All @@ -634,7 +690,10 @@ internal class WebSocketCoreImpl(
if (hasFlowMessages && wsScope.isActive) {
wsScope.launch {
cancelPendingMessagesJob.join()
delay(DELAY_BEFORE_RECONNECT)
// Try to reconnect immediately on URL change, otherwise use standard delay
if (closingState != WebSocketState.CLOSED_URL_CHANGE) {
delay(DELAY_BEFORE_RECONNECT)
}
if (connect()) {
Timber.d("Resubscribing to active subscriptions...")
activeMessages.filterValues { it.eventFlow != null }.entries
Expand Down
Loading
Loading