Skip to content

Commit 1a9686b

Browse files
authored
[Enhancement]Align peerconnection preparation timeout with the clients (#1085)
1 parent 3fef2ff commit 1a9686b

File tree

12 files changed

+175
-99
lines changed

12 files changed

+175
-99
lines changed

Sources/StreamVideo/WebRTC/v2/Policies/JoinPolicy/WebRTCJoinPolicy.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ public enum WebRTCJoinPolicy: Sendable {
1212

1313
/// Waits until both peer connections report `.connected`, or until the
1414
/// timeout elapses, before completing the join request.
15-
case peerConnectionReadinessAware(timeout: TimeInterval)
15+
case peerConnectionReadinessAware
1616
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//
2+
// Copyright © 2026 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
7+
/// Builds and sends telemetry for a join flow once the state machine reaches
8+
/// the point where the flow outcome is known.
9+
struct JoinedStateTelemetryReporter {
10+
11+
/// Describes the join or reconnection path that completed.
12+
enum FlowType { case regular, fast, rejoin, migrate }
13+
14+
/// The join flow that should be encoded in the telemetry payload.
15+
var flowType: FlowType = .regular
16+
17+
private let startTime = Date()
18+
19+
/// Reports telemetry for the completed join flow.
20+
///
21+
/// Regular joins send the elapsed connection time. Reconnect flows send a
22+
/// reconnection payload with the selected strategy and elapsed duration.
23+
///
24+
/// - Parameters:
25+
/// - sessionId: The SFU session identifier for the active participant.
26+
/// - unifiedSessionId: The identifier shared across reconnect attempts.
27+
/// - sfuAdapter: The adapter that submits telemetry to the SFU.
28+
func reportTelemetry(
29+
sessionId: String,
30+
unifiedSessionId: String,
31+
sfuAdapter: SFUAdapter
32+
) async {
33+
var telemetry = Stream_Video_Sfu_Signal_Telemetry()
34+
let duration = Float(Date().timeIntervalSince(startTime))
35+
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
36+
reconnection.timeSeconds = duration
37+
38+
telemetry.data = {
39+
switch self.flowType {
40+
case .regular:
41+
return .connectionTimeSeconds(duration)
42+
case .fast:
43+
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
44+
reconnection.strategy = .fast
45+
return .reconnection(reconnection)
46+
case .rejoin:
47+
reconnection.strategy = .rejoin
48+
return .reconnection(reconnection)
49+
case .migrate:
50+
reconnection.strategy = .migrate
51+
return .reconnection(reconnection)
52+
}
53+
}()
54+
55+
do {
56+
try await sfuAdapter.sendStats(
57+
for: sessionId,
58+
unifiedSessionId: unifiedSessionId,
59+
telemetry: telemetry
60+
)
61+
log.debug("Join call completed in \(duration) seconds.")
62+
} catch {
63+
log.error(error)
64+
}
65+
}
66+
}

Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joining.swift

Lines changed: 37 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ extension WebRTCCoordinator.StateMachine.Stage {
3131

3232
@Injected(\.audioStore) private var audioStore
3333

34-
private enum FlowType { case regular, fast, rejoin, migrate }
3534
private let disposableBag = DisposableBag()
3635
private let startTime = Date()
37-
private var flowType = FlowType.regular
36+
private var telemetryReporter: JoinedStateTelemetryReporter = .init()
3837

3938
/// Initializes a new instance of `JoiningStage`.
4039
/// - Parameter context: The context for the joining stage.
@@ -56,19 +55,19 @@ extension WebRTCCoordinator.StateMachine.Stage {
5655
) -> Self? {
5756
switch previousStage.id {
5857
case .connected where context.isRejoiningFromSessionID != nil:
59-
flowType = .rejoin
58+
telemetryReporter.flowType = .rejoin
6059
executeRejoining()
6160
return self
6261
case .connected:
63-
flowType = .regular
62+
telemetryReporter.flowType = .regular
6463
execute(isFastReconnecting: false)
6564
return self
6665
case .fastReconnected:
67-
flowType = .fast
66+
telemetryReporter.flowType = .fast
6867
execute(isFastReconnecting: true)
6968
return self
7069
case .migrated:
71-
flowType = .migrate
70+
telemetryReporter.flowType = .migrate
7271
executeMigration()
7372
return self
7473
default:
@@ -135,7 +134,11 @@ extension WebRTCCoordinator.StateMachine.Stage {
135134
await coordinator.stateAdapter.publisher?.restartICE()
136135
}
137136

138-
transitionToNextStage(context)
137+
await transitionToNextStage(
138+
context,
139+
coordinator: coordinator,
140+
sfuAdapter: sfuAdapter
141+
)
139142
} catch {
140143
context.reconnectionStrategy = context
141144
.reconnectionStrategy
@@ -196,7 +199,11 @@ extension WebRTCCoordinator.StateMachine.Stage {
196199
isFastReconnecting: false
197200
)
198201

199-
transitionToNextStage(context)
202+
await transitionToNextStage(
203+
context,
204+
coordinator: coordinator,
205+
sfuAdapter: sfuAdapter
206+
)
200207
} catch {
201208
context.reconnectionStrategy = .rejoin
202209
transitionDisconnectOrError(error)
@@ -255,7 +262,11 @@ extension WebRTCCoordinator.StateMachine.Stage {
255262
isFastReconnecting: false
256263
)
257264

258-
transitionToNextStage(context)
265+
await transitionToNextStage(
266+
context,
267+
coordinator: coordinator,
268+
sfuAdapter: sfuAdapter
269+
)
259270
} catch {
260271
transitionDisconnectOrError(error)
261272
}
@@ -422,12 +433,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
422433
)
423434

424435
try Task.checkCancellation()
425-
426-
reportTelemetry(
427-
sessionId: await coordinator.stateAdapter.sessionID,
428-
unifiedSessionId: coordinator.stateAdapter.unifiedSessionId,
429-
sfuAdapter: sfuAdapter
430-
)
431436
}
432437

433438
/// Waits until the audio session is fully ready for call setup.
@@ -461,66 +466,28 @@ extension WebRTCCoordinator.StateMachine.Stage {
461466
}
462467
}
463468

464-
/// Reports telemetry data to the SFU (Selective Forwarding Unit) to monitor and analyze the
465-
/// connection lifecycle.
466-
///
467-
/// This method collects relevant metrics based on the flow type of the connection, such as
468-
/// connection time or reconnection details, and sends them to the SFU for logging and diagnostics.
469-
/// The telemetry data provides insights into the connection's performance and the strategies used
470-
/// during rejoining, fast reconnecting, or migration.
471-
///
472-
/// The reported data includes:
473-
/// - Connection time in seconds for a regular flow.
474-
/// - Reconnection strategies (e.g., fast reconnect, rejoin, or migration) and their duration.
475-
private func reportTelemetry(
476-
sessionId: String,
477-
unifiedSessionId: String,
469+
private func transitionToNextStage(
470+
_ context: Context,
471+
coordinator: WebRTCCoordinator,
478472
sfuAdapter: SFUAdapter
479-
) {
480-
Task(disposableBag: disposableBag) { [weak self] in
481-
guard let self else { return }
482-
var telemetry = Stream_Video_Sfu_Signal_Telemetry()
483-
let duration = Float(Date().timeIntervalSince(startTime))
484-
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
485-
reconnection.timeSeconds = duration
486-
487-
telemetry.data = {
488-
switch self.flowType {
489-
case .regular:
490-
return .connectionTimeSeconds(duration)
491-
case .fast:
492-
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
493-
reconnection.strategy = .fast
494-
return .reconnection(reconnection)
495-
case .rejoin:
496-
reconnection.strategy = .rejoin
497-
return .reconnection(reconnection)
498-
case .migrate:
499-
reconnection.strategy = .migrate
500-
return .reconnection(reconnection)
501-
}
502-
}()
503-
504-
do {
505-
try await sfuAdapter.sendStats(
506-
for: sessionId,
507-
unifiedSessionId: unifiedSessionId,
508-
telemetry: telemetry
509-
)
510-
log.debug("Join call completed in \(duration) seconds.")
511-
} catch {
512-
log.error(error)
513-
}
514-
}
515-
}
516-
517-
private func transitionToNextStage(_ context: Context) {
473+
) async {
518474
switch context.joinPolicy {
519475
case .default:
476+
await telemetryReporter.reportTelemetry(
477+
sessionId: await coordinator.stateAdapter.sessionID,
478+
unifiedSessionId: coordinator.stateAdapter.unifiedSessionId,
479+
sfuAdapter: sfuAdapter
480+
)
520481
reportJoinCompletion()
521482
transitionOrDisconnect(.joined(self.context))
522-
case .peerConnectionReadinessAware(let timeout):
523-
transitionOrDisconnect(.peerConnectionPreparing(self.context, timeout: timeout))
483+
484+
case .peerConnectionReadinessAware:
485+
transitionOrDisconnect(
486+
.peerConnectionPreparing(
487+
self.context,
488+
telemetryReporter: telemetryReporter
489+
)
490+
)
524491
}
525492
}
526493
}

Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+PeerConnectionPreparing.swift

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@ extension WebRTCCoordinator.StateMachine.Stage {
1010

1111
/// Creates the stage that waits briefly for publisher and subscriber peer
1212
/// connections to report `.connected` before the join call completes.
13+
///
14+
/// - Parameters:
15+
/// - context: The state machine context for the pending join flow.
16+
/// - telemetryReporter: Reports telemetry after the stage finishes.
1317
static func peerConnectionPreparing(
1418
_ context: Context,
15-
timeout: TimeInterval
19+
telemetryReporter: JoinedStateTelemetryReporter
1620
) -> WebRTCCoordinator.StateMachine.Stage {
1721
PeerConnectionPreparingStage(
1822
context,
19-
timeout: timeout
23+
telemetryReporter: telemetryReporter
2024
)
2125
}
2226
}
@@ -30,14 +34,19 @@ extension WebRTCCoordinator.StateMachine.Stage {
3034
@unchecked Sendable {
3135

3236
private let disposableBag = DisposableBag()
33-
private let timeout: TimeInterval
37+
private let timeout: TimeInterval = WebRTCConfiguration.timeout.peerConnectionReadiness
38+
private let telemetryReporter: JoinedStateTelemetryReporter
3439

3540
/// Initializes a new instance of `PeerConnectionPreparingStage`.
41+
///
42+
/// - Parameters:
43+
/// - context: The state machine context for the pending join flow.
44+
/// - telemetryReporter: Reports telemetry after the stage finishes.
3645
init(
3746
_ context: Context,
38-
timeout: TimeInterval
47+
telemetryReporter: JoinedStateTelemetryReporter
3948
) {
40-
self.timeout = timeout
49+
self.telemetryReporter = telemetryReporter
4150
super.init(id: .peerConnectionPreparing, context: context)
4251
}
4352

@@ -67,8 +76,10 @@ extension WebRTCCoordinator.StateMachine.Stage {
6776

6877
private func execute() async {
6978
guard
70-
let publisher = await context.coordinator?.stateAdapter.publisher,
71-
let subscriber = await context.coordinator?.stateAdapter.subscriber
79+
let coordinator = context.coordinator,
80+
let sfuAdapter = await coordinator.stateAdapter.sfuAdapter,
81+
let publisher = await coordinator.stateAdapter.publisher,
82+
let subscriber = await coordinator.stateAdapter.subscriber
7283
else {
7384
return
7485
}
@@ -91,6 +102,12 @@ extension WebRTCCoordinator.StateMachine.Stage {
91102
)
92103
}
93104

105+
await telemetryReporter.reportTelemetry(
106+
sessionId: await coordinator.stateAdapter.sessionID,
107+
unifiedSessionId: coordinator.stateAdapter.unifiedSessionId,
108+
sfuAdapter: sfuAdapter
109+
)
110+
94111
reportJoinCompletion()
95112

96113
transitionOrDisconnect(.joined(context))

Sources/StreamVideo/WebRTC/v2/WebRTCConfiguration.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ enum WebRTCConfiguration {
1414
var migrationCompletion: TimeInterval
1515
var publisherSetUpBeforeNegotiation: TimeInterval
1616
var audioSessionConfigurationCompletion: TimeInterval
17+
/// Maximum time to wait for both peer connections to reach
18+
/// `.connected` after the SFU join flow succeeds.
19+
var peerConnectionReadiness: TimeInterval
1720

1821
/// Timeout for authentication in production environment.
1922
static let production = Timeout(
@@ -22,7 +25,8 @@ enum WebRTCConfiguration {
2225
join: 30,
2326
migrationCompletion: 10,
2427
publisherSetUpBeforeNegotiation: 2,
25-
audioSessionConfigurationCompletion: 2
28+
audioSessionConfigurationCompletion: 2,
29+
peerConnectionReadiness: 5
2630
)
2731

2832
#if STREAM_TESTS
@@ -33,7 +37,8 @@ enum WebRTCConfiguration {
3337
join: 5,
3438
migrationCompletion: 5,
3539
publisherSetUpBeforeNegotiation: 5,
36-
audioSessionConfigurationCompletion: 5
40+
audioSessionConfigurationCompletion: 5,
41+
peerConnectionReadiness: 5
3742
)
3843
#endif
3944
}

Sources/StreamVideoSwiftUI/CallViewModel.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ open class CallViewModel: ObservableObject {
590590
callId: callId,
591591
members: [],
592592
customData: customData,
593-
policy: .peerConnectionReadinessAware(timeout: 2)
593+
policy: .peerConnectionReadinessAware
594594
)
595595
} catch {
596596
hasAcceptedCall = false

StreamVideoSwiftUITests/CallViewModel_Tests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,8 @@ final class CallViewModel_Tests: XCTestCase, @unchecked Sendable {
447447
switch policy {
448448
case .default:
449449
XCTFail()
450-
case let .peerConnectionReadinessAware(timeout):
451-
XCTAssertEqual(timeout, 2)
450+
case .peerConnectionReadinessAware:
451+
XCTAssertTrue(true)
452452
}
453453
default:
454454
XCTFail()

StreamVideoTests/Call/Call_Tests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable {
641641
call.stub(for: \.state, with: .init(.dummy()))
642642
mockCallController.stub(for: .join, with: JoinCallResponse.dummy())
643643

644-
_ = try await call.join(policy: .peerConnectionReadinessAware(timeout: 2))
644+
_ = try await call.join(policy: .peerConnectionReadinessAware)
645645

646646
let recordedInput = try XCTUnwrap(
647647
mockCallController.recordedInputPayload(
@@ -661,8 +661,8 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable {
661661
switch recordedInput.6 {
662662
case .default:
663663
XCTFail()
664-
case let .peerConnectionReadinessAware(timeout):
665-
XCTAssertEqual(timeout, 2)
664+
case .peerConnectionReadinessAware:
665+
break
666666
}
667667
}
668668

0 commit comments

Comments
 (0)