Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ public final class io/getstream/video/android/core/call/connection/StreamPeerCon
public final fun getConnection ()Lorg/webrtc/PeerConnection;
public final fun getStats (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getVideoTransceiver ()Lorg/webrtc/RtpTransceiver;
public final fun handleNewIceCandidate (Lio/getstream/video/android/core/model/IceCandidate;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun initialize (Lorg/webrtc/PeerConnection;)V
public final fun isFailedOrClosed ()Z
public final fun isHealthy ()Z
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public class RtcSession internal constructor(
}
}

private fun initialiseAudioTransceiver() {
private fun initializeAudioTransceiver() {
if (!audioTransceiverInitialized) {
publisher?.let {
it.addAudioTransceiver(
Expand Down Expand Up @@ -588,7 +588,7 @@ public class RtcSession internal constructor(
setMuteState(isEnabled = it == DeviceStatus.Enabled, TrackType.TRACK_TYPE_AUDIO)

if (it == DeviceStatus.Enabled) {
initialiseAudioTransceiver()
initializeAudioTransceiver()
}
}
}
Expand Down Expand Up @@ -741,7 +741,7 @@ public class RtcSession internal constructor(
initializeVideoTransceiver()
}
if (call.mediaManager.microphone.status.value == DeviceStatus.Enabled) {
initialiseAudioTransceiver()
initializeAudioTransceiver()
}
if (call.mediaManager.screenShare.status.value == DeviceStatus.Enabled) {
initializeScreenshareTransceiver()
Expand Down Expand Up @@ -1249,9 +1249,9 @@ public class RtcSession internal constructor(
}
val iceCandidate: IceCandidate = Json.decodeFromString(event.candidate)
val result = if (event.peerType == PeerType.PEER_TYPE_PUBLISHER_UNSPECIFIED) {
publisher?.addIceCandidate(iceCandidate)
publisher?.handleNewIceCandidate(iceCandidate)
} else {
subscriber?.addIceCandidate(iceCandidate)
subscriber?.handleNewIceCandidate(iceCandidate)
}
logger.v { "[handleTrickle] #sfu; #${event.peerType.stringify()}; result: $result" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ public class StreamPeerConnection(
private val maxBitRate: Int,
) : PeerConnection.Observer {

private val setDescriptionMutex = Mutex()
private val localDescriptionMutex = Mutex()
private val remoteDescriptionMutex = Mutex()
private val iceCandidatesMutex = Mutex() // Not needed in current logic flow, but kept it for safety.

internal var localSdp: SessionDescription? = null
internal var remoteSdp: SessionDescription? = null
private val iceCandidates = mutableListOf<IceCandidate>()
private val typeTag = type.stringify()

// see https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceConnectionState
Expand Down Expand Up @@ -171,18 +174,34 @@ public class StreamPeerConnection(
* @return An empty [Result], if the operation has been successful or not.
*/
public suspend fun setRemoteDescription(sessionDescription: SessionDescription): Result<Unit> {
logger.d { "[setRemoteDescription] #sfu; #$typeTag; answerSdp: ${sessionDescription.stringify()}" }

remoteSdp = sessionDescription
val result: Result<Unit>

remoteDescriptionMutex.withLock {
result = setValue {
connection.setRemoteDescription(
it,
SessionDescription(
sessionDescription.type,
sessionDescription.description.mungeCodecs(),
),
)
}

return setValue {
connection.setRemoteDescription(
it,
SessionDescription(
sessionDescription.type,
sessionDescription.description.mungeCodecs(),
),
)
logger.d { "[setRemoteDescription] #ice; #sfu; #$typeTag; result: $result" }

if (result.isSuccess) processIceCandidates()
}

return result
}

private suspend fun processIceCandidates() {
logger.d { "[processIceCandidates] #ice; #sfu; #$typeTag; count: ${iceCandidates.count()}" }

iceCandidatesMutex.withLock {
iceCandidates.forEach { addIceCandidate(it) }
iceCandidates.clear()
}
}

Expand All @@ -204,14 +223,30 @@ public class StreamPeerConnection(
logger.d { "[setLocalDescription] #sfu; #$typeTag; offerSdp: ${sessionDescription.stringify()}" }
// This needs a mutex because parallel calls will result in:
// SfuSocketError: subscriber PC: negotiation failed
return setDescriptionMutex.withLock {
return localDescriptionMutex.withLock {
setValue {
// Never call this in parallel
connection.setLocalDescription(it, sdp)
}
}
}

public suspend fun handleNewIceCandidate(iceCandidate: IceCandidate) {
remoteDescriptionMutex.withLock {
if (connection.remoteDescription == null) {
logger.d {
"[handleNewIceCandidate] #ice; #sfu; #$typeTag; Remote desc is null, storing candidate: $iceCandidate"
}
iceCandidatesMutex.withLock { iceCandidates.add(iceCandidate) }
} else {
logger.d {
"[handleNewIceCandidate] #ice; #sfu; #$typeTag; Remote desc is set, adding candidate: $iceCandidate"
}
addIceCandidate(iceCandidate)
}
}
}

/**
* Adds an [IceCandidate] to the underlying [connection] if it's already been set up, or stores
* it for later consumption.
Expand Down Expand Up @@ -443,7 +478,7 @@ public class StreamPeerConnection(

// better to monitor onConnectionChange for the state
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
logger.i { "[onIceConnectionChange] #sfu; #$typeTag; newState: $newState" }
logger.i { "[onIceConnectionChange] #ice; #sfu; #$typeTag; newState: $newState" }
iceState.value = newState
when (newState) {
PeerConnection.IceConnectionState.CLOSED, PeerConnection.IceConnectionState.FAILED, PeerConnection.IceConnectionState.DISCONNECTED -> {
Expand Down
Loading