Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -631,29 +631,28 @@ public class Call(
val sfuUrl = result.value.credentials.server.url
val sfuWsUrl = result.value.credentials.server.wsEndpoint
val iceServers = result.value.credentials.iceServers.map { it.toIceServer() }
try {
session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
RtcSession(
sessionId = this.sessionId,
apiKey = clientImpl.apiKey,
lifecycle = clientImpl.coordinatorConnectionModule.lifecycle,
client = client,
call = this,
sfuUrl = sfuUrl,
sfuWsUrl = sfuWsUrl,
sfuToken = sfuToken,
remoteIceServers = iceServers,
powerManager = powerManager,
)
}

session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
RtcSession(
sessionId = this.sessionId,
apiKey = clientImpl.apiKey,
lifecycle = clientImpl.coordinatorConnectionModule.lifecycle,
client = client,
call = this,
sfuUrl = sfuUrl,
sfuWsUrl = sfuWsUrl,
sfuToken = sfuToken,
remoteIceServers = iceServers,
powerManager = powerManager,
)
}

session?.let {
state._connection.value = RealtimeConnection.Joined(it)
}
session?.let {
state._connection.value = RealtimeConnection.Joined(it)
}

try {
session?.connect()
} catch (e: Exception) {
return Failure(Error.GenericError(e.message ?: "RtcSession error occurred."))
Expand Down Expand Up @@ -791,24 +790,31 @@ public class Call(
)
this.state.removeParticipant(prevSessionId)
session.prepareRejoin()
this.session = RtcSession(
clientImpl,
reconnectAttepmts,
powerManager,
this,
sessionId,
clientImpl.apiKey,
clientImpl.coordinatorConnectionModule.lifecycle,
cred.server.url,
cred.server.wsEndpoint,
cred.token,
cred.iceServers.map { ice ->
ice.toIceServer()
},
)
this.session?.connect(reconnectDetails, currentOptions)
session.cleanup()
monitorSession(joinResponse.value)
try {
this.session = RtcSession(
clientImpl,
reconnectAttepmts,
powerManager,
this,
sessionId,
clientImpl.apiKey,
clientImpl.coordinatorConnectionModule.lifecycle,
cred.server.url,
cred.server.wsEndpoint,
cred.token,
cred.iceServers.map { ice ->
ice.toIceServer()
},
)
this.session?.connect(reconnectDetails, currentOptions)
session.cleanup()
monitorSession(joinResponse.value)
} catch (ex: Exception) {
logger.e(ex) {
"[rejoin] Failed to join response with ex: ${ex.message}"
}
state._connection.value = RealtimeConnection.Failed(ex)
}
} else {
logger.e {
"[rejoin] Failed to get a join response ${joinResponse.errorOrNull()}"
Expand Down Expand Up @@ -847,27 +853,35 @@ public class Call(
reconnect_attempt = reconnectAttepmts,
)
session.prepareRejoin()
val newSession = RtcSession(
clientImpl,
reconnectAttepmts,
powerManager,
this,
sessionId,
clientImpl.apiKey,
clientImpl.coordinatorConnectionModule.lifecycle,
cred.server.url,
cred.server.wsEndpoint,
cred.token,
cred.iceServers.map { ice ->
ice.toIceServer()
},
)
val oldSession = this.session
this.session = newSession
this.session?.connect(reconnectDetails, currentOptions)
monitorSession(joinResponse.value)
oldSession?.leaveWithReason("migrating")
oldSession?.cleanup()
try {
val newSession = RtcSession(
clientImpl,
reconnectAttepmts,
powerManager,
this,
sessionId,
clientImpl.apiKey,
clientImpl.coordinatorConnectionModule.lifecycle,
cred.server.url,
cred.server.wsEndpoint,
cred.token,
cred.iceServers.map { ice ->
ice.toIceServer()
},
)
val oldSession = this.session
this.session = newSession
this.session?.connect(reconnectDetails, currentOptions)
monitorSession(joinResponse.value)
oldSession?.leaveWithReason("migrating")
oldSession?.cleanup()
} catch (ex: Exception) {
logger.e(ex) {
"[switchSfu] Failed to join during " +
"migration - Error ${ex.message}"
}
state._connection.value = RealtimeConnection.Failed(ex)
}
} else {
logger.e {
"[switchSfu] Failed to get a join response during " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ data class TrackDimensions(
* * Or when the UI layout changes
* * The SFU tells us what resolution to publish using the ChangePublishQualityEvent event
*
* For developers: RtcSession throws [IllegalStateException] because its [coroutineScope] & [rtcSessionScope] throws it
*/
public class RtcSession internal constructor(
client: StreamVideo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.jvm.Throws

/**
* Implementation of [ScopeProvider] that manages coroutine scopes for RTC sessions.
Expand All @@ -41,7 +42,9 @@ internal class ScopeProviderImpl(
private var executor: ExecutorService? = null
private var isCleanedUp = false

@Throws(IllegalStateException::class)
override fun getCoroutineScope(supervisorJob: CompletableJob): CoroutineScope {
// Future: We should not throw exception. Instead return Result.Fail
check(!isCleanedUp) { "ScopeProvider has been cleaned up" }
logger.d { "Creating coroutine scope for RTC session main" }
return CoroutineScope(
Expand All @@ -51,7 +54,9 @@ internal class ScopeProviderImpl(
)
}

@Throws(IllegalStateException::class)
override fun getRtcSessionScope(supervisorJob: CompletableJob, callId: String): CoroutineScope {
// Future: We should not throw exception. Instead return Result.Fail
check(!isCleanedUp) { "ScopeProvider has been cleaned up" }
logger.d { "Creating RTC session scope for callId: $callId" }

Expand Down