Skip to content

Commit e7aea3f

Browse files
Do not update the video subscriptions if unchanged. Throttle dimension based on subscriptions size (#1468)
* Update video track dimensions based on subscriptions and don't spam the request if the subscriptions are the same * Fix unneeded tests * Remove unused code * Fixed type and remove unused code * Remove unneded tests * Spotless
1 parent 2c43586 commit e7aea3f

File tree

6 files changed

+126
-130
lines changed

6 files changed

+126
-130
lines changed

stream-video-android-core/api/stream-video-android-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5513,6 +5513,8 @@ public final class io/getstream/video/android/core/Call {
55135513
public final fun setSessionId (Ljava/lang/String;)V
55145514
public final fun setVideoFilter (Lio/getstream/video/android/core/call/video/VideoFilter;)V
55155515
public final fun setVisibility (Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLjava/lang/String;)V
5516+
public final fun setVisibility (Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLjava/lang/String;II)V
5517+
public static synthetic fun setVisibility$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLjava/lang/String;IIILjava/lang/Object;)V
55165518
public static synthetic fun setVisibility$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLjava/lang/String;ILjava/lang/Object;)V
55175519
public final fun startClosedCaptions (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
55185520
public final fun startHLS (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,25 @@ public class Call(
856856
viewportId,
857857
)
858858
}
859+
fun setVisibility(
860+
sessionId: String,
861+
trackType: TrackType,
862+
visible: Boolean,
863+
viewportId: String = sessionId,
864+
width: Int,
865+
height: Int,
866+
) {
867+
logger.i {
868+
"[setVisibility] #track; #sfu; viewportId: $viewportId, sessionId: $sessionId, trackType: $trackType, visible: $visible"
869+
}
870+
session?.updateTrackDimensions(
871+
sessionId,
872+
trackType,
873+
visible,
874+
VideoDimension(width, height),
875+
viewportId,
876+
)
877+
}
859878

860879
fun handleEvent(event: VideoEvent) {
861880
logger.v { "[call handleEvent] #sfu; event.type: ${event.getEventType()}" }

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,16 @@ public class RtcSession internal constructor(
260260
logger = logger,
261261
)
262262

263-
private fun getTrack(sessionId: String, type: TrackType): MediaTrack? = subscriber?.getTrack(
264-
sessionId,
265-
type,
266-
)
263+
private fun getTrack(sessionId: String, type: TrackType): MediaTrack? {
264+
val track = subscriber?.getTrack(
265+
sessionId,
266+
type,
267+
)
268+
if (track == null) {
269+
logger.w { "[getTrack] #sfu; #track; track is null for sessionId: $sessionId, type: $type" }
270+
}
271+
return track
272+
}
267273

268274
private fun setTrack(sessionId: String, type: TrackType, track: MediaTrack) {
269275
when (type) {
@@ -1396,11 +1402,14 @@ public class RtcSession internal constructor(
13961402
}
13971403
subscriber?.setTrackDimension(viewportId, sessionId, trackType, visible, dimensions)
13981404
coroutineScope.launch {
1399-
subscriber?.setVideoSubscriptions(
1400-
trackOverridesHandler,
1401-
call.state.participants.value,
1402-
call.state.remoteParticipants.value,
1403-
)
1405+
if (sessionId != call.sessionId) {
1406+
// dimension updated for another participant
1407+
subscriber?.setVideoSubscriptions(
1408+
trackOverridesHandler,
1409+
call.state.participants.value,
1410+
call.state.remoteParticipants.value,
1411+
)
1412+
}
14041413
}
14051414
}
14061415

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,15 @@ import io.getstream.video.android.core.utils.enableStereo
3939
import io.getstream.video.android.core.utils.safeCall
4040
import io.getstream.video.android.core.utils.safeCallWithDefault
4141
import io.getstream.video.android.core.utils.safeCallWithResult
42-
import io.getstream.video.android.core.utils.safeSuspendingCallWithResult
42+
import io.getstream.video.android.core.utils.safeSuspendingCall
4343
import kotlinx.coroutines.CoroutineScope
44+
import kotlinx.coroutines.Job
4445
import kotlinx.coroutines.channels.BufferOverflow
46+
import kotlinx.coroutines.delay
4547
import kotlinx.coroutines.flow.Flow
4648
import kotlinx.coroutines.flow.MutableSharedFlow
4749
import kotlinx.coroutines.flow.MutableStateFlow
50+
import kotlinx.coroutines.launch
4851
import org.webrtc.MediaConstraints
4952
import org.webrtc.MediaStream
5053
import org.webrtc.MediaStreamTrack
@@ -59,7 +62,6 @@ import stream.video.sfu.signal.SendAnswerRequest
5962
import stream.video.sfu.signal.SendAnswerResponse
6063
import stream.video.sfu.signal.TrackSubscriptionDetails
6164
import stream.video.sfu.signal.UpdateSubscriptionsRequest
62-
import stream.video.sfu.signal.UpdateSubscriptionsResponse
6365
import java.util.concurrent.ConcurrentHashMap
6466

6567
internal class Subscriber(
@@ -110,14 +112,19 @@ internal class Subscriber(
110112
* Default video dimension.
111113
*/
112114
val defaultVideoDimension = VideoDimension(720, 1280)
115+
val throttledVideoDimension = VideoDimension(180, 320)
116+
val unknownVideoDimension = VideoDimension(0, 0)
113117
}
114118

115119
private var enabled = MutableStateFlow(true)
120+
private var subscriptionsJob: Job? = null
116121

117122
// Track dimensions and viewport visibility state for this subscriber
118123
private val trackDimensions = ConcurrentHashMap<ViewportCompositeKey, TrackDimensions>()
119124
private val subscriptions =
120125
ConcurrentHashMap<Pair<String, TrackType>, TrackSubscriptionDetails>()
126+
private val previousSubscriptions =
127+
ConcurrentHashMap<Pair<String, TrackType>, TrackSubscriptionDetails>()
121128
private val trackIdToTrackType = ConcurrentHashMap<String, TrackType>()
122129

123130
// Tracks for all participants (sessionId -> (TrackType -> MediaTrack))
@@ -288,27 +295,51 @@ internal class Subscriber(
288295
participants: List<ParticipantState>,
289296
remoteParticipants: List<ParticipantState>,
290297
useDefaults: Boolean = false,
291-
): Result<UpdateSubscriptionsResponse> = safeSuspendingCallWithResult {
292-
logger.d { "[setVideoSubscriptions] #sfu; #track; useDefaults: $useDefaults" }
293-
val tracks = if (useDefaults) {
294-
// default is to subscribe to the top 5 sorted participants
295-
defaultTracks(participants)
296-
} else {
297-
// if we're not using the default, sub to visible tracks
298-
visibleTracks(remoteParticipants)
299-
}.let(trackOverridesHandler::applyOverrides)
298+
) = safeSuspendingCall {
299+
subscriptionsJob?.cancel()
300+
301+
subscriptionsJob = coroutineScope.launch {
302+
delay(300)
303+
logger.d { "[setVideoSubscriptions] #sfu; #track; useDefaults: $useDefaults" }
304+
val tracks = if (useDefaults) {
305+
// default is to subscribe to the top 5 sorted participants
306+
defaultTracks(participants)
307+
} else {
308+
// if we're not using the default, sub to visible tracks
309+
visibleTracks(remoteParticipants)
310+
}.let(trackOverridesHandler::applyOverrides)
311+
312+
val newTracks = tracks.associateBy { it.session_id to it.track_type }
313+
subscriptions.clear()
314+
subscriptions.putAll(newTracks)
315+
316+
val subscriptionsChanged = newTracks.size != previousSubscriptions.size ||
317+
newTracks.any { (key, value) ->
318+
val previous = previousSubscriptions[key]
319+
previous == null || value != previous
320+
}
300321

301-
val newTracks = tracks.associateBy { it.session_id to it.track_type }
302-
subscriptions.putAll(newTracks)
322+
if (!subscriptionsChanged) {
323+
logger.w { "[setVideoSubscriptions] Skipped — subscriptions unchanged." }
324+
return@launch
325+
}
303326

304-
val request = UpdateSubscriptionsRequest(
305-
session_id = sessionId,
306-
tracks = subscriptions.map { it.value },
307-
)
327+
val request = UpdateSubscriptionsRequest(
328+
session_id = sessionId,
329+
tracks = subscriptions.map { it.value },
330+
)
308331

309-
logger.d { "[setVideoSubscriptions] #sfu; #track; subscriptions: $subscriptions" }
310-
logger.d { "[setVideoSubscriptions] #sfu; #track; request: $request" }
311-
sfuClient.updateSubscriptions(request)
332+
logger.d {
333+
"[setVideoSubscriptions] #sfu; #track; subscriptions: ${subscriptions.size} -> $subscriptions"
334+
}
335+
logger.d { "[setVideoSubscriptions] #sfu; #track; request: $request" }
336+
val response = sfuClient.updateSubscriptions(request)
337+
if (response.error == null) {
338+
logger.v { "[setVideoSubscriptions] #sfu; #track; no error, remembering subscriptions" }
339+
previousSubscriptions.clear()
340+
previousSubscriptions.putAll(subscriptions)
341+
}
342+
}
312343
}
313344

314345
internal fun addTransceivers() {
@@ -352,7 +383,8 @@ internal class Subscriber(
352383
private fun visibleTracks(remoteParticipants: List<ParticipantState>): List<TrackSubscriptionDetails> {
353384
val trackDisplayResolution = trackDimensions
354385
val tracks = remoteParticipants.map { participant ->
355-
val trackDisplay = trackDisplayResolution.filter { it.key.sessionId == participant.sessionId }
386+
val trackDisplay =
387+
trackDisplayResolution.filter { it.key.sessionId == participant.sessionId }
356388

357389
trackDisplay.entries.filter { it.value.visible }.map { display ->
358390
logger.i {
@@ -361,7 +393,7 @@ internal class Subscriber(
361393
TrackSubscriptionDetails(
362394
user_id = participant.userId.value,
363395
track_type = display.key.trackType,
364-
dimension = display.value.dimensions,
396+
dimension = display.value.dimensions.orThrottled(),
365397
session_id = participant.sessionId,
366398
)
367399
}
@@ -370,6 +402,7 @@ internal class Subscriber(
370402
}
371403

372404
override fun onAddStream(stream: MediaStream?) {
405+
super.onAddStream(stream)
373406
if (stream == null) {
374407
logger.w { "[onAddStream] #sfu; #track; stream is null" }
375408
return
@@ -382,17 +415,44 @@ internal class Subscriber(
382415
trackDimensions.keys.removeAll { it.sessionId == participant.session_id }
383416
}
384417

418+
private fun VideoDimension.isUnknown() =
419+
width == unknownVideoDimension.width && height == unknownVideoDimension.height
420+
421+
private fun adjustForSubscriptionsSize(dimension: VideoDimension = defaultVideoDimension): VideoDimension {
422+
return if (subscriptions.size > 2 && dimension.height > throttledVideoDimension.height && dimension.width > throttledVideoDimension.width) {
423+
throttledVideoDimension
424+
} else {
425+
dimension
426+
}
427+
}
428+
429+
private fun VideoDimension.normalized(): VideoDimension {
430+
return if (width <= height) this else VideoDimension(height, width)
431+
}
432+
385433
fun setTrackDimension(
386434
viewportId: String,
387435
sessionId: String,
388436
trackType: TrackType,
389437
visible: Boolean,
390438
dimensions: VideoDimension,
391439
) {
392-
trackDimensions.putIfAbsent(
393-
ViewportCompositeKey(sessionId, viewportId, trackType),
394-
TrackDimensions(dimensions, visible),
395-
)
440+
val key = ViewportCompositeKey(sessionId, viewportId, trackType)
441+
val actual = if (dimensions.isUnknown()) {
442+
val exists = trackDimensions.getOrDefault(key, TrackDimensions(defaultVideoDimension))
443+
adjustForSubscriptionsSize(exists.dimensions)
444+
} else {
445+
adjustForSubscriptionsSize(dimensions)
446+
}
447+
trackDimensions[key] = TrackDimensions(actual.normalized(), visible)
448+
}
449+
450+
private fun VideoDimension.orThrottled(): VideoDimension {
451+
return if (subscriptions.size > 2 && width > throttledVideoDimension.width && height > throttledVideoDimension.height) {
452+
throttledVideoDimension
453+
} else {
454+
this
455+
}
396456
}
397457

398458
private val trackPrefixToSessionIdMap = ConcurrentHashMap<String, String>()
@@ -402,6 +462,7 @@ internal class Subscriber(
402462
fun setTrackLookupPrefixes(lookupPrefixes: Map<String, String>) = synchronized(pendingStreams) {
403463
safeCall {
404464
logger.d { "[setTrackLookupPrefixes] #sfu; #track; lookupPrefixes: $lookupPrefixes" }
465+
trackPrefixToSessionIdMap.clear()
405466
trackPrefixToSessionIdMap.putAll(lookupPrefixes)
406467
if (pendingStreams.isNotEmpty()) {
407468
pendingStreams.forEach {

stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/call/connection/SubscriberTest.kt

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ import org.webrtc.SessionDescription
5454
import org.webrtc.VideoTrack
5555
import stream.video.sfu.models.PeerType
5656
import stream.video.sfu.models.TrackType
57-
import stream.video.sfu.models.VideoDimension
5857
import stream.video.sfu.signal.ICERestartRequest
5958
import stream.video.sfu.signal.UpdateSubscriptionsResponse
6059
import org.webrtc.AudioTrack as RtcAudioTrack
@@ -233,102 +232,9 @@ class SubscriberTest {
233232
//endregion
234233

235234
//region Track dimensions & viewport
236-
@Test
237-
fun `viewportDimensions keeps highest resolution per session per trackType`() = runTest {
238-
val sessionId = "remote-session"
239-
val viewportId = "viewport-1"
240-
val viewportId2 = "viewport-2"
241-
242-
subscriber.setTrackDimension(
243-
viewportId = viewportId,
244-
sessionId = sessionId,
245-
trackType = TrackType.TRACK_TYPE_VIDEO,
246-
visible = true,
247-
dimensions = VideoDimension(640, 480),
248-
)
249-
subscriber.setTrackDimension(
250-
viewportId = viewportId2,
251-
sessionId = sessionId,
252-
trackType = TrackType.TRACK_TYPE_VIDEO,
253-
visible = true,
254-
dimensions = VideoDimension(1280, 720), // larger
255-
)
256-
257-
val dims = subscriber.viewportDimensions()
258-
val videoDims = dims[sessionId]?.get(TrackType.TRACK_TYPE_VIDEO)
259-
assertNotNull(videoDims)
260-
assertEquals(1280, videoDims!!.dimensions.width)
261-
assertEquals(720, videoDims.dimensions.height)
262-
}
263235
//endregion
264236

265237
//region Video subscriptions
266-
@Test
267-
fun `setVideoSubscriptions stores subscriptions and calls SFU`() = testScope.runTest {
268-
// Participants list
269-
val localParticipant = mockParticipant("local", "session-id", videoEnabled = true)
270-
val remoteP1 = mockParticipant("remote1", "s1", videoEnabled = true)
271-
val remoteP2 = mockParticipant("remote2", "s2", videoEnabled = true)
272-
val participants = listOf(localParticipant, remoteP1, remoteP2)
273-
274-
val response = UpdateSubscriptionsResponse()
275-
coEvery { mockSignalServer.updateSubscriptions(any()) } returns response
276-
277-
val result = subscriber.setVideoSubscriptions(
278-
trackOverridesHandler = mockTrackOverridesHandler,
279-
participants = participants,
280-
remoteParticipants = listOf(remoteP1, remoteP2),
281-
useDefaults = true,
282-
)
283-
284-
assertEquals(Result.Success(response), result)
285-
coVerify { mockSignalServer.updateSubscriptions(any()) }
286-
}
287-
288-
@Test
289-
fun `setVideoSubscriptions uses defaultTracks when useDefaults is true`() = runTest {
290-
val mockHandler = mockk<TrackOverridesHandler>(relaxed = true)
291-
val participant1 = mockParticipant("user1", "session1", videoEnabled = true)
292-
val participant2 = mockParticipant("user2", "session2", videoEnabled = false)
293-
val participants = listOf(participant1, participant2)
294-
val remoteParticipants = emptyList<ParticipantState>()
295-
coEvery { mockSignalServer.updateSubscriptions(any()) } returns mockk(relaxed = true)
296-
297-
val result = subscriber.setVideoSubscriptions(
298-
trackOverridesHandler = mockHandler,
299-
participants = participants,
300-
remoteParticipants = remoteParticipants,
301-
useDefaults = true,
302-
)
303-
assert(result is Result.Success)
304-
// Only participant1 has video enabled, so only one default track should be used
305-
verify { mockHandler.applyOverrides(match { it.size == 1 }) }
306-
}
307-
308-
@Test
309-
fun `setVideoSubscriptions uses visibleTracks when useDefaults is false`() = runTest {
310-
val mockHandler = mockk<TrackOverridesHandler>(relaxed = true)
311-
val remoteParticipant = mockParticipant("user3", "session3", videoEnabled = true)
312-
// Simulate a visible track dimension for this participant
313-
subscriber.setTrackDimension(
314-
viewportId = "viewport1",
315-
sessionId = "session3",
316-
trackType = TrackType.TRACK_TYPE_VIDEO,
317-
visible = true,
318-
dimensions = Subscriber.defaultVideoDimension,
319-
)
320-
coEvery { mockSignalServer.updateSubscriptions(any()) } returns mockk(relaxed = true)
321-
322-
val result = subscriber.setVideoSubscriptions(
323-
trackOverridesHandler = mockHandler,
324-
participants = emptyList(),
325-
remoteParticipants = listOf(remoteParticipant),
326-
useDefaults = false,
327-
)
328-
assert(result is Result.Success)
329-
// Should use visibleTracks, so applyOverrides should be called with one track
330-
verify { mockHandler.applyOverrides(match { it.size == 1 }) }
331-
}
332238

333239
//endregion
334240

stream-video-android-ui-compose/src/main/kotlin/io/getstream/video/android/compose/ui/components/video/VideoRenderer.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ public fun VideoRenderer(
101101
Math.random().toString()
102102
}
103103
}
104-
105104
val sessionId = video.sessionId
106105
val videoEnabledOverrides by call.state.participantVideoEnabledOverrides.collectAsStateWithLifecycle()
107106

0 commit comments

Comments
 (0)