diff --git a/Sources/StreamVideo/WebRTC/v2/Policies/JoinPolicy/WebRTCJoinPolicy.swift b/Sources/StreamVideo/WebRTC/v2/Policies/JoinPolicy/WebRTCJoinPolicy.swift index d07d0a0fe..5b2efb9b1 100644 --- a/Sources/StreamVideo/WebRTC/v2/Policies/JoinPolicy/WebRTCJoinPolicy.swift +++ b/Sources/StreamVideo/WebRTC/v2/Policies/JoinPolicy/WebRTCJoinPolicy.swift @@ -12,5 +12,5 @@ public enum WebRTCJoinPolicy: Sendable { /// Waits until both peer connections report `.connected`, or until the /// timeout elapses, before completing the join request. - case peerConnectionReadinessAware(timeout: TimeInterval) + case peerConnectionReadinessAware } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Components/JoinedStateTelemetryReporter.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Components/JoinedStateTelemetryReporter.swift new file mode 100644 index 000000000..e50b3f07e --- /dev/null +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Components/JoinedStateTelemetryReporter.swift @@ -0,0 +1,66 @@ +// +// Copyright © 2026 Stream.io Inc. All rights reserved. +// + +import Foundation + +/// Builds and sends telemetry for a join flow once the state machine reaches +/// the point where the flow outcome is known. +struct JoinedStateTelemetryReporter { + + /// Describes the join or reconnection path that completed. + enum FlowType { case regular, fast, rejoin, migrate } + + /// The join flow that should be encoded in the telemetry payload. + var flowType: FlowType = .regular + + private let startTime = Date() + + /// Reports telemetry for the completed join flow. + /// + /// Regular joins send the elapsed connection time. Reconnect flows send a + /// reconnection payload with the selected strategy and elapsed duration. + /// + /// - Parameters: + /// - sessionId: The SFU session identifier for the active participant. + /// - unifiedSessionId: The identifier shared across reconnect attempts. + /// - sfuAdapter: The adapter that submits telemetry to the SFU. + func reportTelemetry( + sessionId: String, + unifiedSessionId: String, + sfuAdapter: SFUAdapter + ) async { + var telemetry = Stream_Video_Sfu_Signal_Telemetry() + let duration = Float(Date().timeIntervalSince(startTime)) + var reconnection = Stream_Video_Sfu_Signal_Reconnection() + reconnection.timeSeconds = duration + + telemetry.data = { + switch self.flowType { + case .regular: + return .connectionTimeSeconds(duration) + case .fast: + var reconnection = Stream_Video_Sfu_Signal_Reconnection() + reconnection.strategy = .fast + return .reconnection(reconnection) + case .rejoin: + reconnection.strategy = .rejoin + return .reconnection(reconnection) + case .migrate: + reconnection.strategy = .migrate + return .reconnection(reconnection) + } + }() + + do { + try await sfuAdapter.sendStats( + for: sessionId, + unifiedSessionId: unifiedSessionId, + telemetry: telemetry + ) + log.debug("Join call completed in \(duration) seconds.") + } catch { + log.error(error) + } + } +} diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift index 7cf9ac27b..4d49b3d0b 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift @@ -31,10 +31,9 @@ extension WebRTCCoordinator.StateMachine.Stage { @Injected(\.audioStore) private var audioStore - private enum FlowType { case regular, fast, rejoin, migrate } private let disposableBag = DisposableBag() private let startTime = Date() - private var flowType = FlowType.regular + private var telemetryReporter: JoinedStateTelemetryReporter = .init() /// Initializes a new instance of `JoiningStage`. /// - Parameter context: The context for the joining stage. @@ -56,19 +55,19 @@ extension WebRTCCoordinator.StateMachine.Stage { ) -> Self? { switch previousStage.id { case .connected where context.isRejoiningFromSessionID != nil: - flowType = .rejoin + telemetryReporter.flowType = .rejoin executeRejoining() return self case .connected: - flowType = .regular + telemetryReporter.flowType = .regular execute(isFastReconnecting: false) return self case .fastReconnected: - flowType = .fast + telemetryReporter.flowType = .fast execute(isFastReconnecting: true) return self case .migrated: - flowType = .migrate + telemetryReporter.flowType = .migrate executeMigration() return self default: @@ -135,7 +134,11 @@ extension WebRTCCoordinator.StateMachine.Stage { await coordinator.stateAdapter.publisher?.restartICE() } - transitionToNextStage(context) + await transitionToNextStage( + context, + coordinator: coordinator, + sfuAdapter: sfuAdapter + ) } catch { context.reconnectionStrategy = context .reconnectionStrategy @@ -196,7 +199,11 @@ extension WebRTCCoordinator.StateMachine.Stage { isFastReconnecting: false ) - transitionToNextStage(context) + await transitionToNextStage( + context, + coordinator: coordinator, + sfuAdapter: sfuAdapter + ) } catch { context.reconnectionStrategy = .rejoin transitionDisconnectOrError(error) @@ -255,7 +262,11 @@ extension WebRTCCoordinator.StateMachine.Stage { isFastReconnecting: false ) - transitionToNextStage(context) + await transitionToNextStage( + context, + coordinator: coordinator, + sfuAdapter: sfuAdapter + ) } catch { transitionDisconnectOrError(error) } @@ -422,12 +433,6 @@ extension WebRTCCoordinator.StateMachine.Stage { ) try Task.checkCancellation() - - reportTelemetry( - sessionId: await coordinator.stateAdapter.sessionID, - unifiedSessionId: coordinator.stateAdapter.unifiedSessionId, - sfuAdapter: sfuAdapter - ) } /// Waits until the audio session is fully ready for call setup. @@ -461,66 +466,28 @@ extension WebRTCCoordinator.StateMachine.Stage { } } - /// Reports telemetry data to the SFU (Selective Forwarding Unit) to monitor and analyze the - /// connection lifecycle. - /// - /// This method collects relevant metrics based on the flow type of the connection, such as - /// connection time or reconnection details, and sends them to the SFU for logging and diagnostics. - /// The telemetry data provides insights into the connection's performance and the strategies used - /// during rejoining, fast reconnecting, or migration. - /// - /// The reported data includes: - /// - Connection time in seconds for a regular flow. - /// - Reconnection strategies (e.g., fast reconnect, rejoin, or migration) and their duration. - private func reportTelemetry( - sessionId: String, - unifiedSessionId: String, + private func transitionToNextStage( + _ context: Context, + coordinator: WebRTCCoordinator, sfuAdapter: SFUAdapter - ) { - Task(disposableBag: disposableBag) { [weak self] in - guard let self else { return } - var telemetry = Stream_Video_Sfu_Signal_Telemetry() - let duration = Float(Date().timeIntervalSince(startTime)) - var reconnection = Stream_Video_Sfu_Signal_Reconnection() - reconnection.timeSeconds = duration - - telemetry.data = { - switch self.flowType { - case .regular: - return .connectionTimeSeconds(duration) - case .fast: - var reconnection = Stream_Video_Sfu_Signal_Reconnection() - reconnection.strategy = .fast - return .reconnection(reconnection) - case .rejoin: - reconnection.strategy = .rejoin - return .reconnection(reconnection) - case .migrate: - reconnection.strategy = .migrate - return .reconnection(reconnection) - } - }() - - do { - try await sfuAdapter.sendStats( - for: sessionId, - unifiedSessionId: unifiedSessionId, - telemetry: telemetry - ) - log.debug("Join call completed in \(duration) seconds.") - } catch { - log.error(error) - } - } - } - - private func transitionToNextStage(_ context: Context) { + ) async { switch context.joinPolicy { case .default: + await telemetryReporter.reportTelemetry( + sessionId: await coordinator.stateAdapter.sessionID, + unifiedSessionId: coordinator.stateAdapter.unifiedSessionId, + sfuAdapter: sfuAdapter + ) reportJoinCompletion() transitionOrDisconnect(.joined(self.context)) - case .peerConnectionReadinessAware(let timeout): - transitionOrDisconnect(.peerConnectionPreparing(self.context, timeout: timeout)) + + case .peerConnectionReadinessAware: + transitionOrDisconnect( + .peerConnectionPreparing( + self.context, + telemetryReporter: telemetryReporter + ) + ) } } } diff --git a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+PeerConnectionPreparing.swift b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+PeerConnectionPreparing.swift index 13170ad11..6501f918c 100644 --- a/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+PeerConnectionPreparing.swift +++ b/Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+PeerConnectionPreparing.swift @@ -10,13 +10,17 @@ extension WebRTCCoordinator.StateMachine.Stage { /// Creates the stage that waits briefly for publisher and subscriber peer /// connections to report `.connected` before the join call completes. + /// + /// - Parameters: + /// - context: The state machine context for the pending join flow. + /// - telemetryReporter: Reports telemetry after the stage finishes. static func peerConnectionPreparing( _ context: Context, - timeout: TimeInterval + telemetryReporter: JoinedStateTelemetryReporter ) -> WebRTCCoordinator.StateMachine.Stage { PeerConnectionPreparingStage( context, - timeout: timeout + telemetryReporter: telemetryReporter ) } } @@ -30,14 +34,19 @@ extension WebRTCCoordinator.StateMachine.Stage { @unchecked Sendable { private let disposableBag = DisposableBag() - private let timeout: TimeInterval + private let timeout: TimeInterval = WebRTCConfiguration.timeout.peerConnectionReadiness + private let telemetryReporter: JoinedStateTelemetryReporter /// Initializes a new instance of `PeerConnectionPreparingStage`. + /// + /// - Parameters: + /// - context: The state machine context for the pending join flow. + /// - telemetryReporter: Reports telemetry after the stage finishes. init( _ context: Context, - timeout: TimeInterval + telemetryReporter: JoinedStateTelemetryReporter ) { - self.timeout = timeout + self.telemetryReporter = telemetryReporter super.init(id: .peerConnectionPreparing, context: context) } @@ -67,8 +76,10 @@ extension WebRTCCoordinator.StateMachine.Stage { private func execute() async { guard - let publisher = await context.coordinator?.stateAdapter.publisher, - let subscriber = await context.coordinator?.stateAdapter.subscriber + let coordinator = context.coordinator, + let sfuAdapter = await coordinator.stateAdapter.sfuAdapter, + let publisher = await coordinator.stateAdapter.publisher, + let subscriber = await coordinator.stateAdapter.subscriber else { return } @@ -91,6 +102,12 @@ extension WebRTCCoordinator.StateMachine.Stage { ) } + await telemetryReporter.reportTelemetry( + sessionId: await coordinator.stateAdapter.sessionID, + unifiedSessionId: coordinator.stateAdapter.unifiedSessionId, + sfuAdapter: sfuAdapter + ) + reportJoinCompletion() transitionOrDisconnect(.joined(context)) diff --git a/Sources/StreamVideo/WebRTC/v2/WebRTCConfiguration.swift b/Sources/StreamVideo/WebRTC/v2/WebRTCConfiguration.swift index 1318ef760..fc3252e2c 100644 --- a/Sources/StreamVideo/WebRTC/v2/WebRTCConfiguration.swift +++ b/Sources/StreamVideo/WebRTC/v2/WebRTCConfiguration.swift @@ -14,6 +14,9 @@ enum WebRTCConfiguration { var migrationCompletion: TimeInterval var publisherSetUpBeforeNegotiation: TimeInterval var audioSessionConfigurationCompletion: TimeInterval + /// Maximum time to wait for both peer connections to reach + /// `.connected` after the SFU join flow succeeds. + var peerConnectionReadiness: TimeInterval /// Timeout for authentication in production environment. static let production = Timeout( @@ -22,7 +25,8 @@ enum WebRTCConfiguration { join: 30, migrationCompletion: 10, publisherSetUpBeforeNegotiation: 2, - audioSessionConfigurationCompletion: 2 + audioSessionConfigurationCompletion: 2, + peerConnectionReadiness: 5 ) #if STREAM_TESTS @@ -33,7 +37,8 @@ enum WebRTCConfiguration { join: 5, migrationCompletion: 5, publisherSetUpBeforeNegotiation: 5, - audioSessionConfigurationCompletion: 5 + audioSessionConfigurationCompletion: 5, + peerConnectionReadiness: 5 ) #endif } diff --git a/Sources/StreamVideoSwiftUI/CallViewModel.swift b/Sources/StreamVideoSwiftUI/CallViewModel.swift index 8d50e340d..1c279e366 100644 --- a/Sources/StreamVideoSwiftUI/CallViewModel.swift +++ b/Sources/StreamVideoSwiftUI/CallViewModel.swift @@ -590,7 +590,7 @@ open class CallViewModel: ObservableObject { callId: callId, members: [], customData: customData, - policy: .peerConnectionReadinessAware(timeout: 2) + policy: .peerConnectionReadinessAware ) } catch { hasAcceptedCall = false diff --git a/StreamVideoSwiftUITests/CallViewModel_Tests.swift b/StreamVideoSwiftUITests/CallViewModel_Tests.swift index 8ab946cef..f988f8520 100644 --- a/StreamVideoSwiftUITests/CallViewModel_Tests.swift +++ b/StreamVideoSwiftUITests/CallViewModel_Tests.swift @@ -447,8 +447,8 @@ final class CallViewModel_Tests: XCTestCase, @unchecked Sendable { switch policy { case .default: XCTFail() - case let .peerConnectionReadinessAware(timeout): - XCTAssertEqual(timeout, 2) + case .peerConnectionReadinessAware: + XCTAssertTrue(true) } default: XCTFail() diff --git a/StreamVideoTests/Call/Call_Tests.swift b/StreamVideoTests/Call/Call_Tests.swift index 76aea129a..ab7fe4334 100644 --- a/StreamVideoTests/Call/Call_Tests.swift +++ b/StreamVideoTests/Call/Call_Tests.swift @@ -641,7 +641,7 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable { call.stub(for: \.state, with: .init(.dummy())) mockCallController.stub(for: .join, with: JoinCallResponse.dummy()) - _ = try await call.join(policy: .peerConnectionReadinessAware(timeout: 2)) + _ = try await call.join(policy: .peerConnectionReadinessAware) let recordedInput = try XCTUnwrap( mockCallController.recordedInputPayload( @@ -661,8 +661,8 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable { switch recordedInput.6 { case .default: XCTFail() - case let .peerConnectionReadinessAware(timeout): - XCTAssertEqual(timeout, 2) + case .peerConnectionReadinessAware: + break } } diff --git a/StreamVideoTests/CallStateMachine/CallStateMachine/Stages/CallStateMachine_JoiningStageTests.swift b/StreamVideoTests/CallStateMachine/CallStateMachine/Stages/CallStateMachine_JoiningStageTests.swift index 8088c1931..b12478de8 100644 --- a/StreamVideoTests/CallStateMachine/CallStateMachine/Stages/CallStateMachine_JoiningStageTests.swift +++ b/StreamVideoTests/CallStateMachine/CallStateMachine/Stages/CallStateMachine_JoiningStageTests.swift @@ -128,7 +128,7 @@ final class StreamCallStateMachineStageJoiningStage_Tests: StreamVideoTestCase, notify: false, source: .inApp, deliverySubject: .init(nil), - policy: .peerConnectionReadinessAware(timeout: 2), + policy: .peerConnectionReadinessAware, retryPolicy: .init(maxRetries: 0, delay: { _ in 0 }) ) ) @@ -450,11 +450,8 @@ final class StreamCallStateMachineStageJoiningStage_Tests: StreamVideoTestCase, switch (context.input.join?.policy, recordedInput.6) { case (.default, .default): break - case let ( - .peerConnectionReadinessAware(expectedTimeout)?, - .peerConnectionReadinessAware(recordedTimeout) - ): - XCTAssertEqual(expectedTimeout, recordedTimeout) + case (.peerConnectionReadinessAware, .peerConnectionReadinessAware): + break default: XCTFail() } diff --git a/StreamVideoTests/Controllers/CallController_Tests.swift b/StreamVideoTests/Controllers/CallController_Tests.swift index c753d334d..c8d7a1ebb 100644 --- a/StreamVideoTests/Controllers/CallController_Tests.swift +++ b/StreamVideoTests/Controllers/CallController_Tests.swift @@ -99,7 +99,7 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { func test_joinCall_coordinatorTransitionsToConnecting() async throws { let callSettings = CallSettings(cameraPosition: .back) let options = CreateCallOptions(team: .unique) - let joinPolicy = WebRTCJoinPolicy.peerConnectionReadinessAware(timeout: 2) + let joinPolicy = WebRTCJoinPolicy.peerConnectionReadinessAware let expectedJoinSource = JoinSource.callKit(.init {}) try await assertTransitionToStage( @@ -129,8 +129,8 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { switch expectedStage.context.joinPolicy { case .default: XCTFail() - case let .peerConnectionReadinessAware(timeout): - XCTAssertEqual(timeout, 2) + case .peerConnectionReadinessAware: + break } await self.assertEqualAsync( await self diff --git a/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoiningStageTests.swift b/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoiningStageTests.swift index c4fa508e9..98f0918d7 100644 --- a/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoiningStageTests.swift +++ b/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_JoiningStageTests.swift @@ -489,7 +489,7 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec func test_transition_fromConnected_withReadinessAwarePolicy_transitionsToPeerConnectionPreparing() async throws { subject.context.coordinator = mockCoordinatorStack.coordinator subject.context.reconnectAttempts = 11 - subject.context.joinPolicy = .peerConnectionReadinessAware(timeout: 5) + subject.context.joinPolicy = .peerConnectionReadinessAware subject.context.initialJoinCallResponse = .dummy() subject.context.joinResponseHandler = .init() @@ -513,6 +513,9 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec XCTAssertNotNil(target.context.joinResponseHandler) } + let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service) + XCTAssertNil(mockSignalService.sendStatsWasCalledWithRequest) + eventCancellable.cancel() } diff --git a/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests.swift b/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests.swift index 1489b4d70..fe0fe45cb 100644 --- a/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests.swift +++ b/StreamVideoTests/WebRTC/v2/StateMachine/Stages/WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests.swift @@ -26,7 +26,7 @@ final class WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests: videoConfig: Self.videoConfig ) private lazy var subject: WebRTCCoordinator.StateMachine.Stage! = - .peerConnectionPreparing(.init(), timeout: 0.01) + .peerConnectionPreparing(.init(), telemetryReporter: .init()) override class func tearDown() { Self.videoConfig = nil @@ -55,7 +55,7 @@ final class WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests: } } - func test_transition_whenPeerConnectionsDoNotBecomeReadyWithinTimeout_reportsJoinCompletionAndTransitionsToJoined( + func test_transition_whenPeerConnectionsDoNotBecomeReadyWithinTimeout_reportsTelemetryAndTransitionsToJoined( ) async throws { let expectedJoinCallResponse = JoinCallResponse.dummy( call: .dummy(cid: "expected-call-id") @@ -78,7 +78,11 @@ final class WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests: initialJoinCallResponse: expectedJoinCallResponse, joinResponseHandler: completionSubject ) - subject = .peerConnectionPreparing(context, timeout: 0.01) + subject = .peerConnectionPreparing(context, telemetryReporter: .init()) + let unifiedSessionId = await mockCoordinatorStack + .coordinator + .stateAdapter + .unifiedSessionId await mockCoordinatorStack .coordinator @@ -107,6 +111,23 @@ final class WebRTCCoordinatorStateMachine_PeerConnectionPreparingStageTests: ) XCTAssertEqual(receivedCallID, expectedJoinCallResponse.call.cid) + let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service) + await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil } + let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry) + XCTAssertEqual( + mockSignalService.sendStatsWasCalledWithRequest?.unifiedSessionID, + unifiedSessionId + ) + + switch telemetry.data { + case .connectionTimeSeconds: + XCTAssertTrue(true) + case .reconnection: + XCTFail() + case .none: + XCTFail() + } + completionCancellable.cancel() } }