Skip to content

Commit 3820a06

Browse files
authored
Use Flows for pending ICE candidates (#884)
1 parent 719b14e commit 3820a06

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ import kotlinx.coroutines.CoroutineScope
6464
import kotlinx.coroutines.Job
6565
import kotlinx.coroutines.SupervisorJob
6666
import kotlinx.coroutines.cancel
67-
import kotlinx.coroutines.channels.consumeEach
6867
import kotlinx.coroutines.delay
6968
import kotlinx.coroutines.flow.MutableStateFlow
7069
import kotlinx.coroutines.flow.StateFlow
@@ -1192,7 +1191,7 @@ public class RtcSession internal constructor(
11921191

11931192
syncSubscriberCandidates?.cancel()
11941193
syncSubscriberCandidates = coroutineScope.launch {
1195-
sfuConnectionModule.sfuSocket.pendingSubscriberIceCandidates.consumeEach { iceCandidates ->
1194+
sfuConnectionModule.sfuSocket.pendingSubscriberIceCandidates.collect { iceCandidates ->
11961195
subscriber.addIceCandidate(iceCandidates)
11971196
}
11981197
}
@@ -1339,7 +1338,7 @@ public class RtcSession internal constructor(
13391338

13401339
// start listening to ICE candidates
13411340
launch {
1342-
sfuConnectionModule.sfuSocket.pendingPublisherIceCandidates.consumeEach { iceCandidates ->
1341+
sfuConnectionModule.sfuSocket.pendingPublisherIceCandidates.collect { iceCandidates ->
13431342
publisher?.addIceCandidate(iceCandidates)
13441343
}
13451344
}

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/SfuSocket.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import io.getstream.video.android.core.model.IceCandidate
3131
import kotlinx.coroutines.CancellableContinuation
3232
import kotlinx.coroutines.CoroutineScope
3333
import kotlinx.coroutines.channels.Channel
34+
import kotlinx.coroutines.ensureActive
35+
import kotlinx.coroutines.flow.receiveAsFlow
3436
import kotlinx.coroutines.launch
3537
import kotlinx.serialization.json.Json
3638
import okhttp3.OkHttpClient
@@ -79,8 +81,11 @@ public class SfuSocket(
7981
// Only set during SFU migration
8082
private var migrationData: (suspend () -> Migration)? = null
8183

82-
internal val pendingPublisherIceCandidates = Channel<IceCandidate>(capacity = 99)
83-
internal val pendingSubscriberIceCandidates = Channel<IceCandidate>(capacity = 99)
84+
private val _pendingPublisherIceCandidates = Channel<IceCandidate>(capacity = 99)
85+
internal val pendingPublisherIceCandidates = _pendingPublisherIceCandidates.receiveAsFlow()
86+
87+
private val _pendingSubscriberIceCandidates = Channel<IceCandidate>(capacity = 99)
88+
internal val pendingSubscriberIceCandidates = _pendingSubscriberIceCandidates.receiveAsFlow()
8489

8590
private val clientDetails
8691
get() = ClientDetails(
@@ -206,6 +211,7 @@ public class SfuSocket(
206211
handleIceTrickle(message)
207212
}
208213
} catch (error: Throwable) {
214+
coroutineContext.ensureActive()
209215
logger.e { "[onMessage] failed: $error" }
210216
handleError(error)
211217
}
@@ -218,9 +224,9 @@ public class SfuSocket(
218224
}
219225
val iceCandidate: IceCandidate = Json.decodeFromString(event.candidate)
220226
val result = if (event.peerType == PeerType.PEER_TYPE_PUBLISHER_UNSPECIFIED) {
221-
pendingPublisherIceCandidates.send(iceCandidate)
227+
_pendingPublisherIceCandidates.send(iceCandidate)
222228
} else {
223-
pendingSubscriberIceCandidates.send(iceCandidate)
229+
_pendingSubscriberIceCandidates.send(iceCandidate)
224230
}
225231
logger.v { "[handleTrickle] #sfu; #${event.peerType.stringify()}; result: $result" }
226232
}

0 commit comments

Comments
 (0)