Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,6 @@ public final class io/getstream/video/android/core/call/RtcSession {
public final fun setScreenShareTrack ()V
public final fun setSubscriber (Lio/getstream/video/android/core/call/connection/StreamPeerConnection;)V
public final fun setTracks (Ljava/util/Map;)V
public final fun switchSfu (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun updateTrackDimensions (Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLstream/video/sfu/models/VideoDimension;)V
public static synthetic fun updateTrackDimensions$default (Lio/getstream/video/android/core/call/RtcSession;Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLstream/video/sfu/models/VideoDimension;ILjava/lang/Object;)V
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.getstream.video.android.core

import app.cash.turbine.test
import app.cash.turbine.testIn
import com.google.common.truth.Truth.assertThat
import io.getstream.log.taggedLogger
Expand All @@ -26,7 +25,6 @@ import org.junit.Ignore
import org.junit.Test
import org.webrtc.PeerConnection
import java.util.UUID
import kotlin.time.Duration.Companion.seconds

/**
* Connection state shows if we've established a connection with the SFU
Expand Down Expand Up @@ -147,44 +145,4 @@ class ReconnectTest : IntegrationTestBase(connectCoordinatorWS = false) {
// await until disconnect a call
assertThat(connectionState.awaitItem()).isEqualTo(RealtimeConnection.Disconnected)
}

/**
* Switching an Sfu should be fast
*/
@Test
@Ignore
fun switchSfuQuickly() = runTest(timeout = 30.seconds) {
val call = client.call("default", UUID.randomUUID().toString())
// join a call
val result = call.join(create = true)
// create a turbine connection state
val connectionState = call.state.connection.testIn(backgroundScope)
// asset that the connection state is connected
val connectionStateItem = connectionState.awaitItem()
assertThat(connectionStateItem).isAnyOf(
RealtimeConnection.Connected,
RealtimeConnection.Joined(result.getOrThrow()),
)
if (connectionStateItem is RealtimeConnection.Joined) {
connectionState.awaitItem()
}

// connect to the new socket
// do an ice restart
call.session?.let {
it.switchSfu(it.sfuUrl, it.sfuToken, it.sfuToken, it.remoteIceServers, {})
}

// assert the publisher is still connected
val publisher = call.session?.publisher?.state
publisher?.test(timeout = 30.seconds) {
val connection = awaitItem()
if (connection == PeerConnection.PeerConnectionState.CONNECTED) {
awaitComplete()
}
}
// leave and clean up a call
call.leave()
call.cleanup()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ import org.webrtc.SessionDescription
import retrofit2.HttpException
import stream.video.sfu.event.JoinRequest
import stream.video.sfu.event.LeaveCallRequest
import stream.video.sfu.event.Migration
import stream.video.sfu.event.ReconnectDetails
import stream.video.sfu.event.SfuRequest
import stream.video.sfu.models.ClientDetails
Expand Down Expand Up @@ -492,6 +491,7 @@ public class RtcSession internal constructor(
suspend fun connect(reconnectDetails: ReconnectDetails? = null) {
logger.i { "[connect] #sfu; #track; no args" }
val request = JoinRequest(
subscriber_sdp = getSubscriberSdp().description,
session_id = sessionId,
token = sfuToken,
fast_reconnect = false,
Expand Down Expand Up @@ -1823,13 +1823,14 @@ public class RtcSession internal constructor(
return Triple(previousSessionId, currentSubscriptions, publisherTracks)
}

internal fun fastReconnect(reconnectDetails: ReconnectDetails?) {
internal suspend fun fastReconnect(reconnectDetails: ReconnectDetails?) {
// Fast reconnect, send a JOIN request on the same SFU
// and restart ICE on publisher
logger.d { "[fastReconnect] Starting fast reconnect." }
val (previousSessionId, currentSubscriptions, publisherTracks) = currentSfuInfo()
logger.d { "[fastReconnect] Published tracks: $publisherTracks" }
val request = JoinRequest(
subscriber_sdp = getSubscriberSdp().description,
session_id = sessionId,
token = sfuToken,
client_details = clientDetails,
Expand Down Expand Up @@ -1874,147 +1875,6 @@ public class RtcSession internal constructor(
errorJob?.cancel()
}

suspend fun switchSfu(
sfuName: String,
sfuUrl: String,
sfuToken: String,
remoteIceServers: List<IceServer>,
failedToSwitch: () -> Unit,
) {
logger.i { "[switchSfu] #sfu; #track; from ${this.sfuUrl} to $sfuUrl" }

// Prepare SDP
val getSdp = suspend {
getSubscriberSdp().description
}

// Prepare migration object for SFU socket
val migration = suspend {
Migration(
from_sfu_id = sfuName,
announced_tracks = getPublisherTracks(getSdp.invoke()),
subscriptions = subscriptions.value,
)
}
// Create a parallel SFU socket
sfuConnectionMigrationModule = SfuConnectionModule(
context = clientImpl.context,
apiKey = apiKey,
apiUrl = sfuUrl,
wssUrl = sfuWsUrl,
connectionTimeoutInMs = 10000L,
userToken = sfuToken,
lifecycle = lifecycle,
)

// Connect to SFU socket
val migrationData = migration.invoke()
val request = JoinRequest(
session_id = sessionId,
token = sfuToken,
subscriber_sdp = getSdp.invoke(),
fast_reconnect = false,
migration = migrationData,
client_details = clientDetails,
)

sfuConnectionModule.socketConnection.whenConnected {
sfuConnectionMigrationModule!!.socketConnection.sendEvent(
SfuDataRequest(SfuRequest(join_request = request)),
)
}
// Wait until the socket connects - if it fails to connect then return to "Reconnecting"
// state (to make sure that the full reconnect logic will kick in)
coroutineScope.launch {
sfuConnectionMigrationModule!!.socketConnection.state().collect { it ->
when (it) {
is SfuSocketState.Connected -> {
logger.d { "[switchSfu] Migration SFU socket state changed to Connected" }

// Disconnect the old SFU and stop listening to SFU stateflows
eventJob?.cancel()
errorJob?.cancel()
// Cleanup called after the migration is successful
// sfuConnectionModule.sfuSocket.cleanup()

// Make the new SFU the currently used one
setSfuConnectionModule(sfuConnectionMigrationModule!!)
sfuConnectionMigrationModule = null

// We are connected to the new SFU, change the RtcSession parameters to
// match the new SFU
[email protected] = sfuUrl
[email protected] = sfuToken
[email protected] = remoteIceServers
[email protected] = buildRemoteIceServers(remoteIceServers)

// reconnect socket listeners
listenToSfuSocket()

var tempSubscriber = subscriber

// step 1 setup the peer connections
subscriber = createSubscriber()

// This makes sure that the new subscriber starts listening to the existing tracks
// Without this the peer connection state would stay in NEW
setVideoSubscriptions()

// Start emiting the new subscriber connection state (used by CallHealthMonitor)
listenToSubscriberConnection()

// Necessary after SFU migration. This will trigger onNegotiationNeeded
publisher?.connection?.restartIce()

coroutineScope.launch {
subscriber?.state?.collect {
if (it == PeerConnectionState.CONNECTED) {
logger.d { "[switchSfu] Migration subscriber state changed to Connected" }
tempSubscriber?.let { tempSubscriberValue ->
tempSubscriberValue.connection.close()
}
cancel()
} else if (it == PeerConnectionState.CLOSED ||
it == PeerConnectionState.DISCONNECTED ||
it == PeerConnectionState.FAILED
) {
logger.d { "[switchSfu] Failed to migrate - subscriber didn't connect ($it)" }
// Something when wrong with the new subscriber connection
// We give up the migration and wait for full reconnect
failedToSwitch()
cancel()
}
}
}

updatePeerState()

// Only listen for the connection event once
cancel()
}

is SfuSocketState.Disconnected.DisconnectedPermanently -> {
logger.d { "[switchSfu] Failed to migrate - SFU socket disconnected permanently ${it.error}" }
failedToSwitch()
cancel()
}

is SfuSocketState.Disconnected.DisconnectedTemporarily -> {
logger.d { "[switchSfu] Failed to migrate - SFU socket disconnected temporarily ${it.error}" }
// We don't wait for the socket to retry during migration
// In this case we will fall back to full-reconnect
failedToSwitch()
cancel()
}

else -> {
// Wait
}
}
}
}
}

internal fun leaveWithReason(reason: String) {
val leaveCallRequest = LeaveCallRequest(
session_id = sessionId,
Expand Down
Loading