Skip to content

Commit 8da851a

Browse files
authored
Fix publish deadlocks (#618)
* Fast fail attempts to publish without permissions * Fix publish deadlock when no response from server
1 parent ac6b7a6 commit 8da851a

File tree

7 files changed

+156
-32
lines changed

7 files changed

+156
-32
lines changed

.changeset/chilly-kiwis-travel.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fix publish deadlock when no response from server

.changeset/twelve-doors-wash.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fast fail attempts to publish without permissions

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ import kotlinx.coroutines.ensureActive
5454
import kotlinx.coroutines.joinAll
5555
import kotlinx.coroutines.launch
5656
import kotlinx.coroutines.runBlocking
57+
import kotlinx.coroutines.suspendCancellableCoroutine
5758
import kotlinx.coroutines.sync.Mutex
5859
import kotlinx.coroutines.sync.withLock
60+
import kotlinx.coroutines.withTimeout
5961
import kotlinx.coroutines.withTimeoutOrNull
6062
import kotlinx.coroutines.yield
6163
import livekit.LivekitModels
@@ -87,7 +89,7 @@ import javax.inject.Singleton
8789
import kotlin.coroutines.Continuation
8890
import kotlin.coroutines.resume
8991
import kotlin.coroutines.resumeWithException
90-
import kotlin.coroutines.suspendCoroutine
92+
import kotlin.time.Duration.Companion.seconds
9193

9294
/**
9395
* @suppress
@@ -332,17 +334,19 @@ internal constructor(
332334
}
333335

334336
// Suspend until signal client receives message confirming track publication.
335-
return suspendCoroutine { cont ->
336-
synchronized(pendingTrackResolvers) {
337-
pendingTrackResolvers[cid] = cont
337+
return withTimeout(20.seconds) {
338+
suspendCancellableCoroutine { cont ->
339+
synchronized(pendingTrackResolvers) {
340+
pendingTrackResolvers[cid] = cont
341+
}
342+
client.sendAddTrack(
343+
cid = cid,
344+
name = name,
345+
type = kind,
346+
stream = stream,
347+
builder = builder,
348+
)
338349
}
339-
client.sendAddTrack(
340-
cid = cid,
341-
name = name,
342-
type = kind,
343-
stream = stream,
344-
builder = builder,
345-
)
346350
}
347351
}
348352

livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ internal constructor(
252252
* @see Room.videoTrackCaptureDefaults
253253
* @see Room.videoTrackPublishDefaults
254254
*/
255+
@Throws(TrackException.PublishException::class)
255256
suspend fun setCameraEnabled(enabled: Boolean) {
256257
setTrackEnabled(Track.Source.CAMERA, enabled)
257258
}
@@ -266,6 +267,7 @@ internal constructor(
266267
* @see Room.audioTrackCaptureDefaults
267268
* @see Room.audioTrackPublishDefaults
268269
*/
270+
@Throws(TrackException.PublishException::class)
269271
suspend fun setMicrophoneEnabled(enabled: Boolean) {
270272
setTrackEnabled(Track.Source.MICROPHONE, enabled)
271273
}
@@ -286,6 +288,7 @@ internal constructor(
286288
* @see Room.screenShareTrackPublishDefaults
287289
* @see ScreenAudioCapturer
288290
*/
291+
@Throws(TrackException.PublishException::class)
289292
suspend fun setScreenShareEnabled(
290293
enabled: Boolean,
291294
mediaProjectionPermissionResultData: Intent? = null,
@@ -481,7 +484,27 @@ internal constructor(
481484
)
482485
}
483486

487+
private fun hasPermissionsToPublish(source: Track.Source): Boolean {
488+
val permissions = this.permissions
489+
if (permissions == null) {
490+
LKLog.w { "No permissions present for publishing track." }
491+
return false
492+
}
493+
val canPublish = permissions.canPublish
494+
val canPublishSources = permissions.canPublishSources
495+
496+
val sourceAllowed = canPublishSources.contains(source)
497+
498+
if (canPublish && (canPublishSources.isEmpty() || sourceAllowed)) {
499+
return true
500+
}
501+
502+
LKLog.w { "insufficient permissions to publish" }
503+
return false
504+
}
505+
484506
/**
507+
* @throws TrackException.PublishException thrown when the publish fails. see [TrackException.PublishException.message] for details.
485508
* @return true if the track publish was successful.
486509
*/
487510
private suspend fun publishTrackImpl(
@@ -491,6 +514,15 @@ internal constructor(
491514
encodings: List<RtpParameters.Encoding> = emptyList(),
492515
publishListener: PublishListener? = null,
493516
): LocalTrackPublication? {
517+
val addTrackRequestBuilder = AddTrackRequest.newBuilder().apply {
518+
this.requestConfig()
519+
}
520+
521+
val trackSource = Track.Source.fromProto(addTrackRequestBuilder.source ?: LivekitModels.TrackSource.UNRECOGNIZED)
522+
if (!hasPermissionsToPublish(trackSource)) {
523+
throw TrackException.PublishException("Failed to publish track, insufficient permissions")
524+
}
525+
494526
@Suppress("NAME_SHADOWING") var options = options
495527

496528
@Suppress("NAME_SHADOWING") var encodings = encodings
@@ -564,17 +596,13 @@ internal constructor(
564596
}
565597

566598
suspend fun requestAddTrack(): TrackInfo {
567-
val builder = AddTrackRequest.newBuilder().apply {
568-
this.requestConfig()
569-
}
570-
571599
return try {
572600
engine.addTrack(
573601
cid = cid,
574602
name = options.name ?: track.name,
575603
kind = track.kind.toProto(),
576604
stream = options.stream,
577-
builder = builder,
605+
builder = addTrackRequestBuilder,
578606
)
579607
} catch (e: Exception) {
580608
val exception = TrackException.PublishException("Failed to publish track", e)
@@ -1362,8 +1390,12 @@ internal constructor(
13621390
)
13631391
}
13641392
negotiateJob.join()
1365-
val trackInfo = publishJob.await()
1366-
LKLog.d { "published $codec for track ${track.sid}, $trackInfo" }
1393+
try {
1394+
val trackInfo = publishJob.await()
1395+
LKLog.d { "published $codec for track ${track.sid}, $trackInfo" }
1396+
} catch (e: Exception) {
1397+
LKLog.w(e) { "exception when publishing $codec for track ${track.sid}" }
1398+
}
13671399
}
13681400
}
13691401

livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,9 @@ data class ParticipantPermission(
591591
val canPublishData: Boolean,
592592
val hidden: Boolean,
593593
val recorder: Boolean,
594+
/**
595+
* The list of allowed sources. If this is empty, then all sources are allowed.
596+
*/
594597
val canPublishSources: List<Track.Source>,
595598
val canUpdateMetadata: Boolean,
596599
val canSubscribeMetrics: Boolean,

livekit-android-test/src/main/java/io/livekit/android/test/mock/TestData.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,15 @@ object TestData {
5656
identity = "local_participant_identity"
5757
state = LivekitModels.ParticipantInfo.State.ACTIVE
5858
metadata = "local_metadata"
59-
permission = LivekitModels.ParticipantPermission.newBuilder()
60-
.setCanPublish(true)
61-
.setCanSubscribe(true)
62-
.setCanPublishData(true)
63-
.setHidden(true)
64-
.setRecorder(false)
65-
.build()
59+
permission = with(LivekitModels.ParticipantPermission.newBuilder()) {
60+
canPublish = true
61+
canSubscribe = true
62+
canPublishData = true
63+
64+
hidden = false
65+
recorder = false
66+
build()
67+
}
6668
putAttributes("attribute", "value")
6769
build()
6870
}

livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 LiveKit, Inc.
2+
* Copyright 2023-2025 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import io.livekit.android.room.DefaultsManager
2727
import io.livekit.android.room.track.LocalVideoTrack
2828
import io.livekit.android.room.track.LocalVideoTrackOptions
2929
import io.livekit.android.room.track.Track
30+
import io.livekit.android.room.track.TrackException
3031
import io.livekit.android.room.track.VideoCaptureParameter
3132
import io.livekit.android.room.track.VideoCodec
3233
import io.livekit.android.test.MockE2ETest
@@ -46,6 +47,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
4647
import kotlinx.coroutines.Job
4748
import kotlinx.coroutines.cancel
4849
import kotlinx.coroutines.launch
50+
import kotlinx.coroutines.test.StandardTestDispatcher
4951
import kotlinx.coroutines.test.advanceUntilIdle
5052
import livekit.LivekitModels
5153
import livekit.LivekitModels.AudioTrackFeature
@@ -66,6 +68,7 @@ import org.mockito.kotlin.argThat
6668
import org.robolectric.RobolectricTestRunner
6769
import org.robolectric.Shadows
6870
import java.nio.ByteBuffer
71+
import kotlin.time.Duration.Companion.seconds
6972

7073
@ExperimentalCoroutinesApi
7174
@RunWith(RobolectricTestRunner::class)
@@ -122,11 +125,23 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
122125
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
123126
wsFactory.ws.clearRequests()
124127

125-
val backgroundScope = CoroutineScope(coroutineContext + Job())
128+
val standardTestDispatcher = StandardTestDispatcher()
129+
val backgroundScope = CoroutineScope(coroutineContext + Job() + standardTestDispatcher)
126130
try {
127-
backgroundScope.launch { room.localParticipant.setMicrophoneEnabled(true) }
128-
backgroundScope.launch { room.localParticipant.setMicrophoneEnabled(true) }
131+
backgroundScope.launch {
132+
try {
133+
room.localParticipant.setMicrophoneEnabled(true)
134+
} catch (_: Exception) {
135+
}
136+
}
137+
backgroundScope.launch {
138+
try {
139+
room.localParticipant.setMicrophoneEnabled(true)
140+
} catch (_: Exception) {
141+
}
142+
}
129143

144+
standardTestDispatcher.scheduler.advanceTimeBy(1.seconds.inWholeMilliseconds)
130145
assertEquals(1, wsFactory.ws.sentRequests.size)
131146
} finally {
132147
backgroundScope.cancel()
@@ -144,10 +159,23 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
144159
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
145160
wsFactory.ws.clearRequests()
146161

147-
val backgroundScope = CoroutineScope(coroutineContext + Job())
162+
val standardTestDispatcher = StandardTestDispatcher()
163+
val backgroundScope = CoroutineScope(coroutineContext + Job() + standardTestDispatcher)
148164
try {
149-
backgroundScope.launch { room.localParticipant.setMicrophoneEnabled(true) }
150-
backgroundScope.launch { room.localParticipant.setCameraEnabled(true) }
165+
backgroundScope.launch {
166+
try {
167+
room.localParticipant.setMicrophoneEnabled(true)
168+
} catch (_: Exception) {
169+
}
170+
}
171+
backgroundScope.launch {
172+
try {
173+
room.localParticipant.setCameraEnabled(true)
174+
} catch (_: Exception) {
175+
}
176+
}
177+
178+
standardTestDispatcher.scheduler.advanceTimeBy(1.seconds.inWholeMilliseconds)
151179

152180
assertEquals(2, wsFactory.ws.sentRequests.size)
153181
} finally {
@@ -534,4 +562,49 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
534562
assertTrue(features.contains(AudioTrackFeature.TF_AUTO_GAIN_CONTROL))
535563
assertFalse(features.contains(AudioTrackFeature.TF_ENHANCED_NOISE_CANCELLATION))
536564
}
565+
566+
@Test
567+
fun lackOfPublishPermissionCausesException() = runTest {
568+
val noCanPublishJoin = with(TestData.JOIN.toBuilder()) {
569+
join = with(join.toBuilder()) {
570+
participant = with(participant.toBuilder()) {
571+
permission = with(permission.toBuilder()) {
572+
canPublish = false
573+
build()
574+
}
575+
build()
576+
}
577+
build()
578+
}
579+
build()
580+
}
581+
connect(noCanPublishJoin)
582+
583+
var didThrow = false
584+
try {
585+
room.localParticipant.publishVideoTrack(createLocalTrack())
586+
} catch (e: TrackException.PublishException) {
587+
didThrow = true
588+
}
589+
590+
assertTrue(didThrow)
591+
}
592+
593+
@Test
594+
fun publishWithNoResponseCausesException() = runTest {
595+
connect()
596+
597+
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
598+
var didThrow = false
599+
launch {
600+
try {
601+
room.localParticipant.publishVideoTrack(createLocalTrack())
602+
} catch (e: TrackException.PublishException) {
603+
didThrow = true
604+
}
605+
}
606+
607+
coroutineRule.dispatcher.scheduler.advanceUntilIdle()
608+
assertTrue(didThrow)
609+
}
537610
}

0 commit comments

Comments
 (0)