diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index c8f4c1c0423..2c865fa4c5c 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -982,7 +982,6 @@ public final class io/getstream/video/android/core/call/RtcSession { public final fun setScreenShareTrack ()V public final fun setSubscriber (Lio/getstream/video/android/core/call/connection/StreamPeerConnection;)V public final fun setTracks (Ljava/util/Map;)V - public final fun switchSfu (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun updateTrackDimensions (Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLstream/video/sfu/models/VideoDimension;)V public static synthetic fun updateTrackDimensions$default (Lio/getstream/video/android/core/call/RtcSession;Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLstream/video/sfu/models/VideoDimension;ILjava/lang/Object;)V } diff --git a/stream-video-android-core/src/androidTest/kotlin/io/getstream/video/android/core/ReconnectTest.kt b/stream-video-android-core/src/androidTest/kotlin/io/getstream/video/android/core/ReconnectTest.kt index b321bc7ee57..a218d971a62 100644 --- a/stream-video-android-core/src/androidTest/kotlin/io/getstream/video/android/core/ReconnectTest.kt +++ b/stream-video-android-core/src/androidTest/kotlin/io/getstream/video/android/core/ReconnectTest.kt @@ -16,7 +16,6 @@ package io.getstream.video.android.core -import app.cash.turbine.test import app.cash.turbine.testIn import com.google.common.truth.Truth.assertThat import io.getstream.log.taggedLogger @@ -26,7 +25,6 @@ import org.junit.Ignore import org.junit.Test import org.webrtc.PeerConnection import java.util.UUID -import kotlin.time.Duration.Companion.seconds /** * Connection state shows if we've established a connection with the SFU @@ -147,44 +145,4 @@ class ReconnectTest : IntegrationTestBase(connectCoordinatorWS = false) { // await until disconnect a call assertThat(connectionState.awaitItem()).isEqualTo(RealtimeConnection.Disconnected) } - - /** - * Switching an Sfu should be fast - */ - @Test - @Ignore - fun switchSfuQuickly() = runTest(timeout = 30.seconds) { - val call = client.call("default", UUID.randomUUID().toString()) - // join a call - val result = call.join(create = true) - // create a turbine connection state - val connectionState = call.state.connection.testIn(backgroundScope) - // asset that the connection state is connected - val connectionStateItem = connectionState.awaitItem() - assertThat(connectionStateItem).isAnyOf( - RealtimeConnection.Connected, - RealtimeConnection.Joined(result.getOrThrow()), - ) - if (connectionStateItem is RealtimeConnection.Joined) { - connectionState.awaitItem() - } - - // connect to the new socket - // do an ice restart - call.session?.let { - it.switchSfu(it.sfuUrl, it.sfuToken, it.sfuToken, it.remoteIceServers, {}) - } - - // assert the publisher is still connected - val publisher = call.session?.publisher?.state - publisher?.test(timeout = 30.seconds) { - val connection = awaitItem() - if (connection == PeerConnection.PeerConnectionState.CONNECTED) { - awaitComplete() - } - } - // leave and clean up a call - call.leave() - call.cleanup() - } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt index b474068637c..0287768d4f5 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt @@ -108,7 +108,6 @@ import org.webrtc.SessionDescription import retrofit2.HttpException import stream.video.sfu.event.JoinRequest import stream.video.sfu.event.LeaveCallRequest -import stream.video.sfu.event.Migration import stream.video.sfu.event.ReconnectDetails import stream.video.sfu.event.SfuRequest import stream.video.sfu.models.ClientDetails @@ -492,6 +491,7 @@ public class RtcSession internal constructor( suspend fun connect(reconnectDetails: ReconnectDetails? = null) { logger.i { "[connect] #sfu; #track; no args" } val request = JoinRequest( + subscriber_sdp = getSubscriberSdp().description, session_id = sessionId, token = sfuToken, fast_reconnect = false, @@ -1823,13 +1823,14 @@ public class RtcSession internal constructor( return Triple(previousSessionId, currentSubscriptions, publisherTracks) } - internal fun fastReconnect(reconnectDetails: ReconnectDetails?) { + internal suspend fun fastReconnect(reconnectDetails: ReconnectDetails?) { // Fast reconnect, send a JOIN request on the same SFU // and restart ICE on publisher logger.d { "[fastReconnect] Starting fast reconnect." } val (previousSessionId, currentSubscriptions, publisherTracks) = currentSfuInfo() logger.d { "[fastReconnect] Published tracks: $publisherTracks" } val request = JoinRequest( + subscriber_sdp = getSubscriberSdp().description, session_id = sessionId, token = sfuToken, client_details = clientDetails, @@ -1874,147 +1875,6 @@ public class RtcSession internal constructor( errorJob?.cancel() } - suspend fun switchSfu( - sfuName: String, - sfuUrl: String, - sfuToken: String, - remoteIceServers: List, - failedToSwitch: () -> Unit, - ) { - logger.i { "[switchSfu] #sfu; #track; from ${this.sfuUrl} to $sfuUrl" } - - // Prepare SDP - val getSdp = suspend { - getSubscriberSdp().description - } - - // Prepare migration object for SFU socket - val migration = suspend { - Migration( - from_sfu_id = sfuName, - announced_tracks = getPublisherTracks(getSdp.invoke()), - subscriptions = subscriptions.value, - ) - } - // Create a parallel SFU socket - sfuConnectionMigrationModule = SfuConnectionModule( - context = clientImpl.context, - apiKey = apiKey, - apiUrl = sfuUrl, - wssUrl = sfuWsUrl, - connectionTimeoutInMs = 10000L, - userToken = sfuToken, - lifecycle = lifecycle, - ) - - // Connect to SFU socket - val migrationData = migration.invoke() - val request = JoinRequest( - session_id = sessionId, - token = sfuToken, - subscriber_sdp = getSdp.invoke(), - fast_reconnect = false, - migration = migrationData, - client_details = clientDetails, - ) - - sfuConnectionModule.socketConnection.whenConnected { - sfuConnectionMigrationModule!!.socketConnection.sendEvent( - SfuDataRequest(SfuRequest(join_request = request)), - ) - } - // Wait until the socket connects - if it fails to connect then return to "Reconnecting" - // state (to make sure that the full reconnect logic will kick in) - coroutineScope.launch { - sfuConnectionMigrationModule!!.socketConnection.state().collect { it -> - when (it) { - is SfuSocketState.Connected -> { - logger.d { "[switchSfu] Migration SFU socket state changed to Connected" } - - // Disconnect the old SFU and stop listening to SFU stateflows - eventJob?.cancel() - errorJob?.cancel() - // Cleanup called after the migration is successful - // sfuConnectionModule.sfuSocket.cleanup() - - // Make the new SFU the currently used one - setSfuConnectionModule(sfuConnectionMigrationModule!!) - sfuConnectionMigrationModule = null - - // We are connected to the new SFU, change the RtcSession parameters to - // match the new SFU - this@RtcSession.sfuUrl = sfuUrl - this@RtcSession.sfuToken = sfuToken - this@RtcSession.remoteIceServers = remoteIceServers - this@RtcSession.iceServers = buildRemoteIceServers(remoteIceServers) - - // reconnect socket listeners - listenToSfuSocket() - - var tempSubscriber = subscriber - - // step 1 setup the peer connections - subscriber = createSubscriber() - - // This makes sure that the new subscriber starts listening to the existing tracks - // Without this the peer connection state would stay in NEW - setVideoSubscriptions() - - // Start emiting the new subscriber connection state (used by CallHealthMonitor) - listenToSubscriberConnection() - - // Necessary after SFU migration. This will trigger onNegotiationNeeded - publisher?.connection?.restartIce() - - coroutineScope.launch { - subscriber?.state?.collect { - if (it == PeerConnectionState.CONNECTED) { - logger.d { "[switchSfu] Migration subscriber state changed to Connected" } - tempSubscriber?.let { tempSubscriberValue -> - tempSubscriberValue.connection.close() - } - cancel() - } else if (it == PeerConnectionState.CLOSED || - it == PeerConnectionState.DISCONNECTED || - it == PeerConnectionState.FAILED - ) { - logger.d { "[switchSfu] Failed to migrate - subscriber didn't connect ($it)" } - // Something when wrong with the new subscriber connection - // We give up the migration and wait for full reconnect - failedToSwitch() - cancel() - } - } - } - - updatePeerState() - - // Only listen for the connection event once - cancel() - } - - is SfuSocketState.Disconnected.DisconnectedPermanently -> { - logger.d { "[switchSfu] Failed to migrate - SFU socket disconnected permanently ${it.error}" } - failedToSwitch() - cancel() - } - - is SfuSocketState.Disconnected.DisconnectedTemporarily -> { - logger.d { "[switchSfu] Failed to migrate - SFU socket disconnected temporarily ${it.error}" } - // We don't wait for the socket to retry during migration - // In this case we will fall back to full-reconnect - failedToSwitch() - cancel() - } - - else -> { - // Wait - } - } - } - } - } - internal fun leaveWithReason(reason: String) { val leaveCallRequest = LeaveCallRequest( session_id = sessionId,