Skip to content

Commit 5b53abf

Browse files
authored
Reconnect errors (#845)
Resolves #844 - Handles `offerId` (symptoms) - Fixes double offer on ICE restart (root cause) - Fixes deadlock, probably introduced by strict ordering in #804 (the fix is still valid though) - Handles `leaveAction` param vs legacy `canReconnect` (requires protocol v16 introduced elsewhere)
1 parent d8b73fd commit 5b53abf

File tree

7 files changed

+89
-43
lines changed

7 files changed

+89
-43
lines changed

.changes/reconnect-errors

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
patch type="fixed" "Reconnect sequence stuck in failed state"

Sources/LiveKit/Core/Room+Engine.swift

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,10 @@ extension Room {
164164
primary: !isSubscriberPrimary,
165165
delegate: self)
166166

167-
await publisher.set { [weak self] offer in
167+
await publisher.set { [weak self] offer, offerId in
168168
guard let self else { return }
169-
log("Publisher onOffer \(offer.sdp)")
170-
try await signalClient.send(offer: offer)
169+
log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)")
170+
try await signalClient.send(offer: offer, offerId: offerId)
171171
}
172172

173173
// data over pub channel for backwards compatibility
@@ -322,7 +322,13 @@ extension Room {
322322

323323
log("[Connect] Waiting for subscriber to connect...")
324324
// Wait for primary transport to connect (if not already)
325-
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
325+
do {
326+
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
327+
log("[Connect] Subscriber transport connected")
328+
} catch {
329+
log("[Connect] Subscriber transport failed to connect, error: \(error)", .error)
330+
throw error
331+
}
326332
try Task.checkCancellation()
327333

328334
// send SyncState before offer
@@ -334,7 +340,13 @@ extension Room {
334340
// Only if published, wait for publisher to connect...
335341
log("[Connect] Waiting for publisher to connect...")
336342
try await publisher.createAndSendOffer(iceRestart: true)
337-
try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
343+
do {
344+
try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
345+
log("[Connect] Publisher transport connected")
346+
} catch {
347+
log("[Connect] Publisher transport failed to connect, error: \(error)", .error)
348+
throw error
349+
}
338350
}
339351
}
340352

@@ -469,8 +481,8 @@ extension Room {
469481
$0.subscribe = !autoSubscribe
470482
}
471483

472-
try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(),
473-
offer: previousOffer?.toPBType(),
484+
try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(offerId: 0),
485+
offer: previousOffer?.toPBType(offerId: 0),
474486
subscription: subscription,
475487
publishTracks: localParticipant.publishedTracksInfo(),
476488
dataChannels: publisherDataChannel.infos(),

Sources/LiveKit/Core/Room+SignalClientDelegate.swift

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,32 @@ extension Room: SignalClientDelegate {
3434
// engine is currently connected state
3535
case .connected = _state.connectionState
3636
{
37-
do {
38-
try await startReconnect(reason: .websocket)
39-
} catch {
40-
log("Failed calling startReconnect, error: \(error)", .error)
37+
Task {
38+
do {
39+
try await startReconnect(reason: .websocket)
40+
} catch {
41+
log("Failed calling startReconnect, error: \(error)", .error)
42+
}
4143
}
4244
}
4345
}
4446

45-
func signalClient(_: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async {
46-
log("canReconnect: \(canReconnect), reason: \(reason)")
47+
func signalClient(_: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason) async {
48+
log("action: \(action), reason: \(reason)")
4749

48-
if canReconnect {
49-
// force .full for next reconnect
50+
let error = LiveKitError.from(reason: reason)
51+
switch action {
52+
case .reconnect:
53+
// Force .full for next reconnect
5054
_state.mutate { $0.nextReconnectMode = .full }
51-
} else {
52-
// Server indicates it's not recoverable
53-
await cleanUp(withError: LiveKitError.from(reason: reason))
55+
fallthrough
56+
case .resume:
57+
// Abort current attempt
58+
await signalClient.cleanUp(withError: error)
59+
case .disconnect:
60+
await cleanUp(withError: error)
61+
default:
62+
log("Unknown leave action: \(action), ignoring", .warning)
5463
}
5564
}
5665

@@ -341,17 +350,19 @@ extension Room: SignalClientDelegate {
341350
}
342351
}
343352

344-
func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async {
353+
func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async {
354+
log("Received answer for offerId: \(offerId)")
355+
345356
do {
346357
let publisher = try requirePublisher()
347-
try await publisher.set(remoteDescription: answer)
358+
try await publisher.set(remoteDescription: answer, offerId: offerId)
348359
} catch {
349-
log("Failed to set remote description, error: \(error)", .error)
360+
log("Failed to set remote description with offerId: \(offerId), error: \(error)", .error)
350361
}
351362
}
352363

353-
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async {
354-
log("Received offer, creating & sending answer...")
364+
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async {
365+
log("Received offer with offerId: \(offerId), creating & sending answer...")
355366

356367
guard let subscriber = _state.subscriber else {
357368
log("Failed to send answer, subscriber is nil", .error)
@@ -362,9 +373,9 @@ extension Room: SignalClientDelegate {
362373
try await subscriber.set(remoteDescription: offer)
363374
let answer = try await subscriber.createAnswer()
364375
try await subscriber.set(localDescription: answer)
365-
try await signalClient.send(answer: answer)
376+
try await signalClient.send(answer: answer, offerId: offerId)
366377
} catch {
367-
log("Failed to send answer with error: \(error)", .error)
378+
log("Failed to send answer for offerId: \(offerId), error: \(error)", .error)
368379
}
369380
}
370381

Sources/LiveKit/Core/SignalClient.swift

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,15 @@ actor SignalClient: Loggable {
129129
participantSid: participantSid,
130130
adaptiveStream: adaptiveStream)
131131

132-
if reconnectMode != nil {
133-
log("[Connect] with url: \(url)")
132+
let isReconnect = reconnectMode != nil
133+
134+
if isReconnect {
135+
log("Reconnecting with url: \(url)")
134136
} else {
135137
log("Connecting with url: \(url)")
136138
}
137139

138-
_state.mutate { $0.connectionState = (reconnectMode != nil ? .reconnecting : .connecting) }
140+
_state.mutate { $0.connectionState = (isReconnect ? .reconnecting : .connecting) }
139141

140142
do {
141143
let socket = try await WebSocket(url: url,
@@ -293,10 +295,12 @@ private extension SignalClient {
293295
await _restartPingTimer()
294296

295297
case let .answer(sd):
296-
_delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: sd.toRTCType()) }
298+
let (rtcDescription, offerId) = sd.toRTCType()
299+
_delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: rtcDescription, offerId: offerId) }
297300

298301
case let .offer(sd):
299-
_delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: sd.toRTCType()) }
302+
let (rtcDescription, offerId) = sd.toRTCType()
303+
_delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: rtcDescription, offerId: offerId) }
300304

301305
case let .trickle(trickle):
302306
guard let rtcCandidate = try? RTC.createIceCandidate(fromJsonString: trickle.candidateInit) else {
@@ -332,7 +336,7 @@ private extension SignalClient {
332336
_delegate.notifyDetached { await $0.signalClient(self, didUpdateRemoteMute: Track.Sid(from: mute.sid), muted: mute.muted) }
333337

334338
case let .leave(leave):
335-
_delegate.notifyDetached { await $0.signalClient(self, didReceiveLeave: leave.canReconnect, reason: leave.reason) }
339+
_delegate.notifyDetached { await $0.signalClient(self, didReceiveLeave: leave.action, reason: leave.reason) }
336340

337341
case let .streamStateUpdate(states):
338342
_delegate.notifyDetached { await $0.signalClient(self, didUpdateTrackStreamStates: states.streamStates) }
@@ -375,17 +379,17 @@ extension SignalClient {
375379
// MARK: - Send methods
376380

377381
extension SignalClient {
378-
func send(offer: LKRTCSessionDescription) async throws {
382+
func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws {
379383
let r = Livekit_SignalRequest.with {
380-
$0.offer = offer.toPBType()
384+
$0.offer = offer.toPBType(offerId: offerId)
381385
}
382386

383387
try await _sendRequest(r)
384388
}
385389

386-
func send(answer: LKRTCSessionDescription) async throws {
390+
func send(answer: LKRTCSessionDescription, offerId: UInt32) async throws {
387391
let r = Livekit_SignalRequest.with {
388-
$0.answer = answer.toPBType()
392+
$0.answer = answer.toPBType(offerId: offerId)
389393
}
390394

391395
try await _sendRequest(r)

Sources/LiveKit/Core/Transport.swift

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ internal import LiveKitWebRTC
2121
actor Transport: NSObject, Loggable {
2222
// MARK: - Types
2323

24-
typealias OnOfferBlock = @Sendable (LKRTCSessionDescription) async throws -> Void
24+
typealias OnOfferBlock = @Sendable (LKRTCSessionDescription, UInt32) async throws -> Void
2525

2626
// MARK: - Public
2727

@@ -56,6 +56,7 @@ actor Transport: NSObject, Loggable {
5656
private var _reNegotiate: Bool = false
5757
private var _onOffer: OnOfferBlock?
5858
private var _isRestartingIce: Bool = false
59+
private var _latestOfferId: UInt32 = 0
5960

6061
// forbid direct access to PeerConnection
6162
private let _pc: LKRTCPeerConnection
@@ -110,6 +111,20 @@ actor Transport: NSObject, Loggable {
110111
await _iceCandidatesQueue.process(candidate, if: remoteDescription != nil && !_isRestartingIce)
111112
}
112113

114+
func set(remoteDescription sd: LKRTCSessionDescription, offerId: UInt32) async throws {
115+
if signalingState != .haveLocalOffer {
116+
log("Received answer with unexpected signaling state: \(signalingState), expected .haveLocalOffer", .warning)
117+
}
118+
119+
if offerId == 0 {
120+
log("Skipping validation for legacy server (missing offerId), latestOfferId: \(_latestOfferId)", .warning)
121+
} else if offerId != _latestOfferId {
122+
throw LiveKitError(.invalidState, message: "OfferId mismatch, expected \(_latestOfferId) but got \(offerId)")
123+
}
124+
125+
try await set(remoteDescription: sd)
126+
}
127+
113128
func set(remoteDescription sd: LKRTCSessionDescription) async throws {
114129
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
115130
_pc.setRemoteDescription(sd) { error in
@@ -157,12 +172,14 @@ actor Transport: NSObject, Loggable {
157172

158173
// Actually negotiate
159174
func _negotiateSequence() async throws {
175+
_latestOfferId += 1
160176
let offer = try await createOffer(for: constraints)
161177
try await set(localDescription: offer)
162-
try await _onOffer(offer)
178+
try await _onOffer(offer, _latestOfferId)
163179
}
164180

165181
if signalingState == .haveLocalOffer, iceRestart, let sd = remoteDescription {
182+
_reNegotiate = false // Clear flag to prevent double offer
166183
try await set(remoteDescription: sd)
167184
return try await _negotiateSequence()
168185
}

Sources/LiveKit/Protocols/SignalClientDelegate.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ internal import LiveKitWebRTC
2121
protocol SignalClientDelegate: AnyObject, Sendable {
2222
func signalClient(_ signalClient: SignalClient, didUpdateConnectionState newState: ConnectionState, oldState: ConnectionState, disconnectError: LiveKitError?) async
2323
func signalClient(_ signalClient: SignalClient, didReceiveConnectResponse connectResponse: SignalClient.ConnectResponse) async
24-
func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async
25-
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async
24+
func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async
25+
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async
2626
func signalClient(_ signalClient: SignalClient, didReceiveIceCandidate iceCandidate: IceCandidate, target: Livekit_SignalTarget) async
2727
func signalClient(_ signalClient: SignalClient, didUnpublishLocalTrack localTrack: Livekit_TrackUnpublishedResponse) async
2828
func signalClient(_ signalClient: SignalClient, didUpdateParticipants participants: [Livekit_ParticipantInfo]) async
@@ -35,6 +35,6 @@ protocol SignalClientDelegate: AnyObject, Sendable {
3535
func signalClient(_ signalClient: SignalClient, didReceiveRoomMoved response: Livekit_RoomMovedResponse) async
3636
func signalClient(_ signalClient: SignalClient, didUpdateSubscriptionPermission permission: Livekit_SubscriptionPermissionUpdate) async
3737
func signalClient(_ signalClient: SignalClient, didUpdateToken token: String) async
38-
func signalClient(_ signalClient: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async
38+
func signalClient(_ signalClient: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason) async
3939
func signalClient(_ signalClient: SignalClient, didSubscribeTrack trackSid: Track.Sid) async
4040
}

Sources/LiveKit/Types/SessionDescription.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
internal import LiveKitWebRTC
1818

1919
extension LKRTCSessionDescription {
20-
func toPBType() -> Livekit_SessionDescription {
20+
func toPBType(offerId: UInt32) -> Livekit_SessionDescription {
2121
var sd = Livekit_SessionDescription()
2222
sd.sdp = sdp
23+
sd.id = offerId
2324

2425
switch type {
2526
case .answer: sd.type = "answer"
@@ -33,7 +34,7 @@ extension LKRTCSessionDescription {
3334
}
3435

3536
extension Livekit_SessionDescription {
36-
func toRTCType() -> LKRTCSessionDescription {
37+
func toRTCType() -> (LKRTCSessionDescription, UInt32) {
3738
var sdpType: LKRTCSdpType
3839
switch type {
3940
case "answer": sdpType = .answer
@@ -42,6 +43,6 @@ extension Livekit_SessionDescription {
4243
default: fatalError("Unknown state \(type)") // This should never happen
4344
}
4445

45-
return RTC.createSessionDescription(type: sdpType, sdp: sdp)
46+
return (RTC.createSessionDescription(type: sdpType, sdp: sdp), id)
4647
}
4748
}

0 commit comments

Comments
 (0)