@@ -108,7 +108,6 @@ import org.webrtc.SessionDescription
108108import retrofit2.HttpException
109109import stream.video.sfu.event.JoinRequest
110110import stream.video.sfu.event.LeaveCallRequest
111- import stream.video.sfu.event.Migration
112111import stream.video.sfu.event.ReconnectDetails
113112import stream.video.sfu.event.SfuRequest
114113import stream.video.sfu.models.ClientDetails
@@ -492,6 +491,7 @@ public class RtcSession internal constructor(
492491 suspend fun connect (reconnectDetails : ReconnectDetails ? = null) {
493492 logger.i { " [connect] #sfu; #track; no args" }
494493 val request = JoinRequest (
494+ subscriber_sdp = getSubscriberSdp().description,
495495 session_id = sessionId,
496496 token = sfuToken,
497497 fast_reconnect = false ,
@@ -1823,13 +1823,14 @@ public class RtcSession internal constructor(
18231823 return Triple (previousSessionId, currentSubscriptions, publisherTracks)
18241824 }
18251825
1826- internal fun fastReconnect (reconnectDetails : ReconnectDetails ? ) {
1826+ internal suspend fun fastReconnect (reconnectDetails : ReconnectDetails ? ) {
18271827 // Fast reconnect, send a JOIN request on the same SFU
18281828 // and restart ICE on publisher
18291829 logger.d { " [fastReconnect] Starting fast reconnect." }
18301830 val (previousSessionId, currentSubscriptions, publisherTracks) = currentSfuInfo()
18311831 logger.d { " [fastReconnect] Published tracks: $publisherTracks " }
18321832 val request = JoinRequest (
1833+ subscriber_sdp = getSubscriberSdp().description,
18331834 session_id = sessionId,
18341835 token = sfuToken,
18351836 client_details = clientDetails,
@@ -1874,147 +1875,6 @@ public class RtcSession internal constructor(
18741875 errorJob?.cancel()
18751876 }
18761877
1877- suspend fun switchSfu (
1878- sfuName : String ,
1879- sfuUrl : String ,
1880- sfuToken : String ,
1881- remoteIceServers : List <IceServer >,
1882- failedToSwitch : () -> Unit ,
1883- ) {
1884- logger.i { " [switchSfu] #sfu; #track; from ${this .sfuUrl} to $sfuUrl " }
1885-
1886- // Prepare SDP
1887- val getSdp = suspend {
1888- getSubscriberSdp().description
1889- }
1890-
1891- // Prepare migration object for SFU socket
1892- val migration = suspend {
1893- Migration (
1894- from_sfu_id = sfuName,
1895- announced_tracks = getPublisherTracks(getSdp.invoke()),
1896- subscriptions = subscriptions.value,
1897- )
1898- }
1899- // Create a parallel SFU socket
1900- sfuConnectionMigrationModule = SfuConnectionModule (
1901- context = clientImpl.context,
1902- apiKey = apiKey,
1903- apiUrl = sfuUrl,
1904- wssUrl = sfuWsUrl,
1905- connectionTimeoutInMs = 10000L ,
1906- userToken = sfuToken,
1907- lifecycle = lifecycle,
1908- )
1909-
1910- // Connect to SFU socket
1911- val migrationData = migration.invoke()
1912- val request = JoinRequest (
1913- session_id = sessionId,
1914- token = sfuToken,
1915- subscriber_sdp = getSdp.invoke(),
1916- fast_reconnect = false ,
1917- migration = migrationData,
1918- client_details = clientDetails,
1919- )
1920-
1921- sfuConnectionModule.socketConnection.whenConnected {
1922- sfuConnectionMigrationModule!! .socketConnection.sendEvent(
1923- SfuDataRequest (SfuRequest (join_request = request)),
1924- )
1925- }
1926- // Wait until the socket connects - if it fails to connect then return to "Reconnecting"
1927- // state (to make sure that the full reconnect logic will kick in)
1928- coroutineScope.launch {
1929- sfuConnectionMigrationModule!! .socketConnection.state().collect { it ->
1930- when (it) {
1931- is SfuSocketState .Connected -> {
1932- logger.d { " [switchSfu] Migration SFU socket state changed to Connected" }
1933-
1934- // Disconnect the old SFU and stop listening to SFU stateflows
1935- eventJob?.cancel()
1936- errorJob?.cancel()
1937- // Cleanup called after the migration is successful
1938- // sfuConnectionModule.sfuSocket.cleanup()
1939-
1940- // Make the new SFU the currently used one
1941- setSfuConnectionModule(sfuConnectionMigrationModule!! )
1942- sfuConnectionMigrationModule = null
1943-
1944- // We are connected to the new SFU, change the RtcSession parameters to
1945- // match the new SFU
1946- this @RtcSession.sfuUrl = sfuUrl
1947- this @RtcSession.sfuToken = sfuToken
1948- this @RtcSession.remoteIceServers = remoteIceServers
1949- this @RtcSession.iceServers = buildRemoteIceServers(remoteIceServers)
1950-
1951- // reconnect socket listeners
1952- listenToSfuSocket()
1953-
1954- var tempSubscriber = subscriber
1955-
1956- // step 1 setup the peer connections
1957- subscriber = createSubscriber()
1958-
1959- // This makes sure that the new subscriber starts listening to the existing tracks
1960- // Without this the peer connection state would stay in NEW
1961- setVideoSubscriptions()
1962-
1963- // Start emiting the new subscriber connection state (used by CallHealthMonitor)
1964- listenToSubscriberConnection()
1965-
1966- // Necessary after SFU migration. This will trigger onNegotiationNeeded
1967- publisher?.connection?.restartIce()
1968-
1969- coroutineScope.launch {
1970- subscriber?.state?.collect {
1971- if (it == PeerConnectionState .CONNECTED ) {
1972- logger.d { " [switchSfu] Migration subscriber state changed to Connected" }
1973- tempSubscriber?.let { tempSubscriberValue ->
1974- tempSubscriberValue.connection.close()
1975- }
1976- cancel()
1977- } else if (it == PeerConnectionState .CLOSED ||
1978- it == PeerConnectionState .DISCONNECTED ||
1979- it == PeerConnectionState .FAILED
1980- ) {
1981- logger.d { " [switchSfu] Failed to migrate - subscriber didn't connect ($it )" }
1982- // Something when wrong with the new subscriber connection
1983- // We give up the migration and wait for full reconnect
1984- failedToSwitch()
1985- cancel()
1986- }
1987- }
1988- }
1989-
1990- updatePeerState()
1991-
1992- // Only listen for the connection event once
1993- cancel()
1994- }
1995-
1996- is SfuSocketState .Disconnected .DisconnectedPermanently -> {
1997- logger.d { " [switchSfu] Failed to migrate - SFU socket disconnected permanently ${it.error} " }
1998- failedToSwitch()
1999- cancel()
2000- }
2001-
2002- is SfuSocketState .Disconnected .DisconnectedTemporarily -> {
2003- logger.d { " [switchSfu] Failed to migrate - SFU socket disconnected temporarily ${it.error} " }
2004- // We don't wait for the socket to retry during migration
2005- // In this case we will fall back to full-reconnect
2006- failedToSwitch()
2007- cancel()
2008- }
2009-
2010- else -> {
2011- // Wait
2012- }
2013- }
2014- }
2015- }
2016- }
2017-
20181878 internal fun leaveWithReason (reason : String ) {
20191879 val leaveCallRequest = LeaveCallRequest (
20201880 session_id = sessionId,
0 commit comments