Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public enum WebRTCJoinPolicy: Sendable {

/// Waits until both peer connections report `.connected`, or until the
/// timeout elapses, before completing the join request.
case peerConnectionReadinessAware(timeout: TimeInterval)
case peerConnectionReadinessAware
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Copyright © 2026 Stream.io Inc. All rights reserved.
//

import Foundation

/// Builds and sends telemetry for a join flow once the state machine reaches
/// the point where the flow outcome is known.
struct JoinedStateTelemetryReporter {

/// Describes the join or reconnection path that completed.
enum FlowType { case regular, fast, rejoin, migrate }

/// The join flow that should be encoded in the telemetry payload.
var flowType: FlowType = .regular

private let startTime = Date()

/// Reports telemetry for the completed join flow.
///
/// Regular joins send the elapsed connection time. Reconnect flows send a
/// reconnection payload with the selected strategy and elapsed duration.
///
/// - Parameters:
/// - sessionId: The SFU session identifier for the active participant.
/// - unifiedSessionId: The identifier shared across reconnect attempts.
/// - sfuAdapter: The adapter that submits telemetry to the SFU.
func reportTelemetry(
sessionId: String,
unifiedSessionId: String,
sfuAdapter: SFUAdapter
) async {
var telemetry = Stream_Video_Sfu_Signal_Telemetry()
let duration = Float(Date().timeIntervalSince(startTime))
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.timeSeconds = duration

telemetry.data = {
switch self.flowType {
case .regular:
return .connectionTimeSeconds(duration)
case .fast:
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.strategy = .fast
return .reconnection(reconnection)
Comment on lines +42 to +45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: .fast case creates a new reconnection without timeSeconds.

The .fast case shadows the outer reconnection variable and creates a new one without setting timeSeconds. This means fast reconnection telemetry will report timeSeconds = 0 instead of the actual duration, unlike .rejoin and .migrate which correctly reuse the outer variable.

🐛 Proposed fix
             case .fast:
-                var reconnection = Stream_Video_Sfu_Signal_Reconnection()
+                reconnection.strategy = .fast
-                reconnection.strategy = .fast
                 return .reconnection(reconnection)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@Sources/StreamVideo/WebRTC/v2/StateMachine/Components/JoinedStateTelemetryReporter.swift`
around lines 42 - 45, In JoinedStateTelemetryReporter.swift the switch's .fast
branch shadows the outer reconnection variable and builds a new
Stream_Video_Sfu_Signal_Reconnection without setting timeSeconds, causing zero
durations to be reported; change the .fast case to reuse the existing
reconnection variable (do not redeclare it), set reconnection.strategy = .fast
(keeping reconnection.timeSeconds already assigned) and return
.reconnection(reconnection) so the real duration is preserved.

case .rejoin:
reconnection.strategy = .rejoin
return .reconnection(reconnection)
case .migrate:
reconnection.strategy = .migrate
return .reconnection(reconnection)
}
}()

do {
try await sfuAdapter.sendStats(
for: sessionId,
unifiedSessionId: unifiedSessionId,
telemetry: telemetry
)
log.debug("Join call completed in \(duration) seconds.")
} catch {
log.error(error)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ extension WebRTCCoordinator.StateMachine.Stage {

@Injected(\.audioStore) private var audioStore

private enum FlowType { case regular, fast, rejoin, migrate }
private let disposableBag = DisposableBag()
private let startTime = Date()
private var flowType = FlowType.regular
private var telemetryReporter: JoinedStateTelemetryReporter = .init()

/// Initializes a new instance of `JoiningStage`.
/// - Parameter context: The context for the joining stage.
Expand All @@ -56,19 +55,19 @@ extension WebRTCCoordinator.StateMachine.Stage {
) -> Self? {
switch previousStage.id {
case .connected where context.isRejoiningFromSessionID != nil:
flowType = .rejoin
telemetryReporter.flowType = .rejoin
executeRejoining()
return self
case .connected:
flowType = .regular
telemetryReporter.flowType = .regular
execute(isFastReconnecting: false)
return self
case .fastReconnected:
flowType = .fast
telemetryReporter.flowType = .fast
execute(isFastReconnecting: true)
return self
case .migrated:
flowType = .migrate
telemetryReporter.flowType = .migrate
executeMigration()
return self
default:
Expand Down Expand Up @@ -135,7 +134,11 @@ extension WebRTCCoordinator.StateMachine.Stage {
await coordinator.stateAdapter.publisher?.restartICE()
}

transitionToNextStage(context)
await transitionToNextStage(
context,
coordinator: coordinator,
sfuAdapter: sfuAdapter
)
} catch {
context.reconnectionStrategy = context
.reconnectionStrategy
Expand Down Expand Up @@ -196,7 +199,11 @@ extension WebRTCCoordinator.StateMachine.Stage {
isFastReconnecting: false
)

transitionToNextStage(context)
await transitionToNextStage(
context,
coordinator: coordinator,
sfuAdapter: sfuAdapter
)
} catch {
context.reconnectionStrategy = .rejoin
transitionDisconnectOrError(error)
Expand Down Expand Up @@ -255,7 +262,11 @@ extension WebRTCCoordinator.StateMachine.Stage {
isFastReconnecting: false
)

transitionToNextStage(context)
await transitionToNextStage(
context,
coordinator: coordinator,
sfuAdapter: sfuAdapter
)
} catch {
transitionDisconnectOrError(error)
}
Expand Down Expand Up @@ -422,12 +433,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
)

try Task.checkCancellation()

reportTelemetry(
sessionId: await coordinator.stateAdapter.sessionID,
unifiedSessionId: coordinator.stateAdapter.unifiedSessionId,
sfuAdapter: sfuAdapter
)
}

/// Waits until the audio session is fully ready for call setup.
Expand Down Expand Up @@ -461,66 +466,28 @@ extension WebRTCCoordinator.StateMachine.Stage {
}
}

/// Reports telemetry data to the SFU (Selective Forwarding Unit) to monitor and analyze the
/// connection lifecycle.
///
/// This method collects relevant metrics based on the flow type of the connection, such as
/// connection time or reconnection details, and sends them to the SFU for logging and diagnostics.
/// The telemetry data provides insights into the connection's performance and the strategies used
/// during rejoining, fast reconnecting, or migration.
///
/// The reported data includes:
/// - Connection time in seconds for a regular flow.
/// - Reconnection strategies (e.g., fast reconnect, rejoin, or migration) and their duration.
private func reportTelemetry(
sessionId: String,
unifiedSessionId: String,
private func transitionToNextStage(
_ context: Context,
coordinator: WebRTCCoordinator,
sfuAdapter: SFUAdapter
) {
Task(disposableBag: disposableBag) { [weak self] in
guard let self else { return }
var telemetry = Stream_Video_Sfu_Signal_Telemetry()
let duration = Float(Date().timeIntervalSince(startTime))
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.timeSeconds = duration

telemetry.data = {
switch self.flowType {
case .regular:
return .connectionTimeSeconds(duration)
case .fast:
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.strategy = .fast
return .reconnection(reconnection)
case .rejoin:
reconnection.strategy = .rejoin
return .reconnection(reconnection)
case .migrate:
reconnection.strategy = .migrate
return .reconnection(reconnection)
}
}()

do {
try await sfuAdapter.sendStats(
for: sessionId,
unifiedSessionId: unifiedSessionId,
telemetry: telemetry
)
log.debug("Join call completed in \(duration) seconds.")
} catch {
log.error(error)
}
}
}

private func transitionToNextStage(_ context: Context) {
) async {
switch context.joinPolicy {
case .default:
await telemetryReporter.reportTelemetry(
sessionId: await coordinator.stateAdapter.sessionID,
unifiedSessionId: coordinator.stateAdapter.unifiedSessionId,
sfuAdapter: sfuAdapter
)
reportJoinCompletion()
transitionOrDisconnect(.joined(self.context))
case .peerConnectionReadinessAware(let timeout):
transitionOrDisconnect(.peerConnectionPreparing(self.context, timeout: timeout))

case .peerConnectionReadinessAware:
transitionOrDisconnect(
.peerConnectionPreparing(
self.context,
telemetryReporter: telemetryReporter
)
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ extension WebRTCCoordinator.StateMachine.Stage {

/// Creates the stage that waits briefly for publisher and subscriber peer
/// connections to report `.connected` before the join call completes.
///
/// - Parameters:
/// - context: The state machine context for the pending join flow.
/// - telemetryReporter: Reports telemetry after the stage finishes.
static func peerConnectionPreparing(
_ context: Context,
timeout: TimeInterval
telemetryReporter: JoinedStateTelemetryReporter
) -> WebRTCCoordinator.StateMachine.Stage {
PeerConnectionPreparingStage(
context,
timeout: timeout
telemetryReporter: telemetryReporter
)
}
}
Expand All @@ -30,14 +34,19 @@ extension WebRTCCoordinator.StateMachine.Stage {
@unchecked Sendable {

private let disposableBag = DisposableBag()
private let timeout: TimeInterval
private let timeout: TimeInterval = WebRTCConfiguration.timeout.peerConnectionReadiness
private let telemetryReporter: JoinedStateTelemetryReporter

/// Initializes a new instance of `PeerConnectionPreparingStage`.
///
/// - Parameters:
/// - context: The state machine context for the pending join flow.
/// - telemetryReporter: Reports telemetry after the stage finishes.
init(
_ context: Context,
timeout: TimeInterval
telemetryReporter: JoinedStateTelemetryReporter
) {
self.timeout = timeout
self.telemetryReporter = telemetryReporter
super.init(id: .peerConnectionPreparing, context: context)
}

Expand Down Expand Up @@ -67,8 +76,10 @@ extension WebRTCCoordinator.StateMachine.Stage {

private func execute() async {
guard
let publisher = await context.coordinator?.stateAdapter.publisher,
let subscriber = await context.coordinator?.stateAdapter.subscriber
let coordinator = context.coordinator,
let sfuAdapter = await coordinator.stateAdapter.sfuAdapter,
let publisher = await coordinator.stateAdapter.publisher,
let subscriber = await coordinator.stateAdapter.subscriber
else {
return
}
Expand All @@ -91,6 +102,12 @@ extension WebRTCCoordinator.StateMachine.Stage {
)
}

await telemetryReporter.reportTelemetry(
sessionId: await coordinator.stateAdapter.sessionID,
unifiedSessionId: coordinator.stateAdapter.unifiedSessionId,
sfuAdapter: sfuAdapter
)

reportJoinCompletion()

transitionOrDisconnect(.joined(context))
Expand Down
9 changes: 7 additions & 2 deletions Sources/StreamVideo/WebRTC/v2/WebRTCConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ enum WebRTCConfiguration {
var migrationCompletion: TimeInterval
var publisherSetUpBeforeNegotiation: TimeInterval
var audioSessionConfigurationCompletion: TimeInterval
/// Maximum time to wait for both peer connections to reach
/// `.connected` after the SFU join flow succeeds.
var peerConnectionReadiness: TimeInterval

/// Timeout for authentication in production environment.
static let production = Timeout(
Expand All @@ -22,7 +25,8 @@ enum WebRTCConfiguration {
join: 30,
migrationCompletion: 10,
publisherSetUpBeforeNegotiation: 2,
audioSessionConfigurationCompletion: 2
audioSessionConfigurationCompletion: 2,
peerConnectionReadiness: 5
)

#if STREAM_TESTS
Expand All @@ -33,7 +37,8 @@ enum WebRTCConfiguration {
join: 5,
migrationCompletion: 5,
publisherSetUpBeforeNegotiation: 5,
audioSessionConfigurationCompletion: 5
audioSessionConfigurationCompletion: 5,
peerConnectionReadiness: 5
)
#endif
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/StreamVideoSwiftUI/CallViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ open class CallViewModel: ObservableObject {
callId: callId,
members: [],
customData: customData,
policy: .peerConnectionReadinessAware(timeout: 2)
policy: .peerConnectionReadinessAware
)
} catch {
hasAcceptedCall = false
Expand Down
4 changes: 2 additions & 2 deletions StreamVideoSwiftUITests/CallViewModel_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ final class CallViewModel_Tests: XCTestCase, @unchecked Sendable {
switch policy {
case .default:
XCTFail()
case let .peerConnectionReadinessAware(timeout):
XCTAssertEqual(timeout, 2)
case .peerConnectionReadinessAware:
XCTAssertTrue(true)
}
default:
XCTFail()
Expand Down
6 changes: 3 additions & 3 deletions StreamVideoTests/Call/Call_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable {
call.stub(for: \.state, with: .init(.dummy()))
mockCallController.stub(for: .join, with: JoinCallResponse.dummy())

_ = try await call.join(policy: .peerConnectionReadinessAware(timeout: 2))
_ = try await call.join(policy: .peerConnectionReadinessAware)

let recordedInput = try XCTUnwrap(
mockCallController.recordedInputPayload(
Expand All @@ -661,8 +661,8 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable {
switch recordedInput.6 {
case .default:
XCTFail()
case let .peerConnectionReadinessAware(timeout):
XCTAssertEqual(timeout, 2)
case .peerConnectionReadinessAware:
break
}
}

Expand Down
Loading
Loading