Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,9 @@ public class RtcSession internal constructor(
}
}
// step 6 - onNegotiationNeeded will trigger and complete the setup using SetPublisherRequest
listenToMediaChanges()

publisher?.let {
listenToMediaChanges()
}
// subscribe to the tracks of other participants
setVideoSubscriptions(true)
return
Expand Down Expand Up @@ -820,6 +821,15 @@ public class RtcSession internal constructor(
// Note: Executor cleanup is handled by Call cleanup
}

private fun hasPublishCapability(): Boolean {
val capabilities = call.state.ownCapabilities.value
return capabilities.any {
it == OwnCapability.SendAudio ||
it == OwnCapability.SendVideo ||
it == OwnCapability.Screenshare
}
}

internal val muteState = MutableStateFlow(
mapOf(
TrackType.TRACK_TYPE_AUDIO to false,
Expand Down Expand Up @@ -1071,12 +1081,14 @@ public class RtcSession internal constructor(
call.state.replaceParticipants(participantStates)
sfuConnectionModule.socketConnection.whenConnected {
logger.d { "JoinCallResponseEvent sfuConnectionModule.socketConnection.whenConnected" }
if (publisher == null) {
if (publisher == null && hasPublishCapability()) {
publisher = createPublisher(event.publishOptions)
}
connectRtc()
processPendingSubscriberEvents()
processPendingPublisherEvents()
publisher?.let {
processPendingPublisherEvents()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ internal class Publisher(
trackInfos.joinToString(separator = ";") { it.toString() },
)
if (trackInfos.isEmpty()) {
logger.e { ("rejoin cause, Can't negotiate without announcing any tracks") }
rejoin.invoke()
logger.d { "No local tracks to publish, skipping publisher negotiate" }
return@submit
}
logger.i { "Negotiating with tracks: $trackInfos" }
logger.i { "Offer: ${offer.description}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,15 @@ class PublisherTest {
coVerify { publisher.negotiate(any()) }
}

@Test
fun `negotiate skips setPublisher when no tracks are announced`() = runTest(coroutineContext) {
every { publisher.getAnnouncedTracks(any(), any()) } returns emptyList()

publisher.negotiate()

coVerify(exactly = 0) { mockSignalServerService.setPublisher(any()) }
}

@Test
fun `close with stopTracks = true stops publishing and closes connection`() = runTest {
val mockVideoTrack = mockk<VideoTrack>(relaxed = true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ package io.getstream.video.android.core.rtc
import android.graphics.ColorSpace.match
import android.os.PowerManager
import androidx.lifecycle.Lifecycle
import io.getstream.android.video.generated.models.OwnCapability
import io.getstream.video.android.core.Call
import io.getstream.video.android.core.CallState
import io.getstream.video.android.core.MediaManagerImpl
import io.getstream.video.android.core.ParticipantState
import io.getstream.video.android.core.StreamVideo
import io.getstream.video.android.core.StreamVideoClient
import io.getstream.video.android.core.call.RtcSession
import io.getstream.video.android.core.call.connection.Publisher
import io.getstream.video.android.core.events.ICETrickleEvent
import io.getstream.video.android.core.events.JoinCallResponseEvent
import io.getstream.video.android.core.events.SubscriberOfferEvent
import io.getstream.video.android.core.internal.module.SfuConnectionModule
import io.getstream.video.android.core.model.IceServer
import io.getstream.video.android.core.socket.sfu.SfuSocketConnection
import io.getstream.video.android.core.socket.sfu.state.SfuSocketState
import io.mockk.MockKAnnotations
import io.mockk.coEvery
import io.mockk.coJustRun
Expand All @@ -40,26 +45,36 @@ import io.mockk.impl.annotations.RelaxedMockK
import io.mockk.mockk
import io.mockk.spyk
import io.mockk.unmockkAll
import io.mockk.verify
import junit.framework.TestCase.assertEquals
import junit.framework.TestCase.assertNotNull
import junit.framework.TestCase.assertNull
import junit.framework.TestCase.assertTrue
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import org.webrtc.SessionDescription
import stream.video.sfu.models.ParticipantCount
import stream.video.sfu.models.PeerType
import stream.video.sfu.models.PublishOption
import stream.video.sfu.models.TrackType
import stream.video.sfu.models.VideoDimension
import stream.video.sfu.signal.SendAnswerResponse

class RtcSessionTest2 {

private val testDispatcher = StandardTestDispatcher()
private val testScope = TestScope(UnconfinedTestDispatcher())
private val testScope = TestScope(testDispatcher)
private val ownCapabilitiesFlow = MutableStateFlow<List<OwnCapability>>(emptyList())
private val participantsFlow = MutableStateFlow<List<ParticipantState>>(emptyList())
private val remoteParticipantsFlow = MutableStateFlow<List<ParticipantState>>(emptyList())

@MockK
private lateinit var mockPowerManager: PowerManager
Expand Down Expand Up @@ -99,6 +114,10 @@ class RtcSessionTest2 {
)
} returns mockk(relaxed = true) {}
}
every { mockCallState.ownCapabilities } returns ownCapabilitiesFlow
every { mockCallState.participants } returns participantsFlow
every { mockCallState.remoteParticipants } returns remoteParticipantsFlow
every { mockCallState.replaceParticipants(any()) } answers { }

// We can stub out other pieces
every { mockCallState.me.value } returns null
Expand Down Expand Up @@ -255,10 +274,17 @@ class RtcSessionTest2 {
}

// TODO: Test is broken because socket connection is not established in this test.
@Ignore
@Test
fun `handleIceTrickle adds event to publisherPendingEvents if publisher is null`() = runTest {
// Given an RtcSession with no publisher set (publisher = null by default until fully joined)
val mockSocket = mockk<SfuSocketConnection>()
val mockConnectedEvent = mockk<JoinCallResponseEvent>(relaxed = true)
val socketStateFlow =
MutableStateFlow<SfuSocketState>(SfuSocketState.Connected(mockConnectedEvent))
every { mockSocket.state() } returns socketStateFlow
val mockModule = mockk<SfuConnectionModule>(relaxed = true) {
every { socketConnection } returns mockSocket
}
val rtcSession = RtcSession(
client = mockStreamVideo,
powerManager = mockPowerManager,
Expand All @@ -271,8 +297,9 @@ class RtcSessionTest2 {
sfuToken = "fake-sfu-token",
clientImpl = mockVideoClient,
coroutineScope = testScope,
rtcSessionScope = testScope,
remoteIceServers = emptyList(),
sfuConnectionModuleProvider = { mockk(relaxed = true) },
sfuConnectionModuleProvider = { mockModule },
)
// Confirm publisher is null
assertNull(rtcSession.publisher)
Expand All @@ -290,6 +317,7 @@ class RtcSessionTest2 {

// When
rtcSession.handleIceTrickle(event)
testScope.testScheduler.advanceUntilIdle()

// Then
// The event should be added to publisherPendingEvents
Expand Down Expand Up @@ -380,9 +408,94 @@ class RtcSessionTest2 {
coVerify { publisher.close(any()) }
}

@Test
fun `join response without publish capability skips publisher creation`() = runTest {
ownCapabilitiesFlow.value = emptyList()
val (rtcSession, _) = createRtcSessionSpyWithMockSocket()
val event = fakeJoinResponseEvent(samplePublishOptions())

rtcSession.handleEvent(event)
testScope.testScheduler.advanceUntilIdle()

assertNull(rtcSession.publisher)
verify(exactly = 0) { rtcSession["createPublisher"](any<List<PublishOption>>()) }
}

private fun <T> RtcSession.fieldValue(name: String): T? {
val field = RtcSession::class.java.getDeclaredField(name)
field.isAccessible = true
return field.get(this) as? T
}

private fun samplePublishOptions(): List<PublishOption> = listOf(
PublishOption(
track_type = TrackType.TRACK_TYPE_VIDEO,
bitrate = 1_000_000,
fps = 30,
max_spatial_layers = 1,
max_temporal_layers = 1,
video_dimension = VideoDimension(width = 1280, height = 720),
id = 1,
),
)

private fun fakeJoinResponseEvent(
publishOptions: List<PublishOption>,
): JoinCallResponseEvent {
val protoCallState = mockk<stream.video.sfu.models.CallState>(relaxed = true) {
every { participants } returns emptyList()
}
val count = mockk<io.getstream.video.android.core.events.ParticipantCount>(relaxed = true)
return mockk(relaxed = true) {
every { callState } returns protoCallState
every { participantCount } returns count
every { fastReconnectDeadlineSeconds } returns 0
every { isReconnected } returns false
every { [email protected] } returns publishOptions
}
}

private fun createRtcSessionSpyWithMockSocket(): Pair<RtcSession, Publisher> {
val mockSocket = mockk<SfuSocketConnection>()
val mockConnectedEvent = mockk<JoinCallResponseEvent>(relaxed = true)
val socketStateFlow =
MutableStateFlow<SfuSocketState>(SfuSocketState.Connected(mockConnectedEvent))
every { mockSocket.state() } returns socketStateFlow
every { mockSocket.whenConnected(any<Long>(), any()) } answers {
val callback = secondArg<suspend (String) -> Unit>()
// Launch the callback in backgroundScope to simulate the real whenConnected behavior
// The real impl launches in socket's scope; we launch in background scope for control
testScope.backgroundScope.launch {
delay(500) // Simulate the real whenConnected delay
callback("connection-id")
}
Unit // Return Unit since whenConnected returns Unit
}
val mockModule = mockk<SfuConnectionModule>(relaxed = true) {
every { socketConnection } returns mockSocket
every { api } returns mockk(relaxed = true)
}
val rtcSession = spyk(
RtcSession(
client = mockStreamVideo,
powerManager = mockPowerManager,
call = mockCall,
sessionId = "session-id",
apiKey = "api-key",
lifecycle = mockLifecycle,
sfuUrl = "https://test-sfu.stream.com",
sfuWsUrl = "wss://test-sfu.stream.com",
sfuToken = "fake-sfu-token",
clientImpl = mockVideoClient,
coroutineScope = testScope,
rtcSessionScope = testScope,
remoteIceServers = emptyList(),
sfuConnectionModuleProvider = { mockModule },
),
recordPrivateCalls = true,
)
val publisherMock = mockk<Publisher>(relaxed = true)
every { rtcSession["createPublisher"](any<List<PublishOption>>()) } returns publisherMock
return rtcSession to publisherMock
}
}