From 0ed297d46a2ff89ae78952919cdc2b384304f7b9 Mon Sep 17 00:00:00 2001 From: Sujal Salekar Date: Sun, 13 Jul 2025 11:03:33 +0000 Subject: [PATCH] fix(transport): improve WebSocket/WebRTC connection robustness --- .../private-to-private/initiate-connection.ts | 82 ++++++++++++++-- packages/transport-websockets/src/index.ts | 98 ++++++++++++++----- 2 files changed, 146 insertions(+), 34 deletions(-) diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts index c78b1a9646..b9adc7130e 100644 --- a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -34,6 +34,8 @@ export interface ConnectOptions extends LoggerOptions, ProgressOptions { const { circuitAddress, targetPeer } = splitAddr(ma) + const ICE_GATHERING_TIMEOUT = 30000 // 30 seconds + const CONNECTION_TIMEOUT = 60000 // 60 seconds metrics?.dialerEvents.increment({ open: true }) @@ -54,7 +56,6 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa }) } else { onProgress?.(new CustomProgressEvent('webrtc:reuse-relay-connection')) - connection = connections[0] } @@ -74,7 +75,59 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa dataChannelOptions: dataChannel }) + // Track connection state + let isConnecting = true + let iceGatheringComplete = false + let connectionStateTimeout: NodeJS.Timeout + let iceGatheringTimeout: NodeJS.Timeout + try { + // Monitor ICE gathering state + peerConnection.onicegatheringstatechange = () => { + if (peerConnection.iceGatheringState === 'complete') { + iceGatheringComplete = true + clearTimeout(iceGatheringTimeout) + } + } + + // Monitor connection state changes + peerConnection.onconnectionstatechange = () => { + log.trace('connection state changed to: %s', peerConnection.connectionState) + + switch (peerConnection.connectionState) { + case 'connected': + isConnecting = false + clearTimeout(connectionStateTimeout) + break + case 'failed': + case 'disconnected': + case 'closed': + isConnecting = false + clearTimeout(connectionStateTimeout) + if (!iceGatheringComplete) { + log.error('connection failed before ICE gathering completed') + } + break + } + } + + // Set timeouts + iceGatheringTimeout = setTimeout(() => { + if (!iceGatheringComplete) { + log.error('ICE gathering timed out after %d ms', ICE_GATHERING_TIMEOUT) + peerConnection.close() + throw new Error('ICE gathering timeout') + } + }, ICE_GATHERING_TIMEOUT) + + connectionStateTimeout = setTimeout(() => { + if (isConnecting) { + log.error('connection establishment timed out after %d ms', CONNECTION_TIMEOUT) + peerConnection.close() + throw new Error('Connection timeout') + } + }, CONNECTION_TIMEOUT) + // we create the channel so that the RTCPeerConnection has a component for // which to collect candidates. The label is not relevant to connection // initiation but can be useful for debugging @@ -100,14 +153,17 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa log.error('error sending ICE candidate', err) }) } + peerConnection.onicecandidateerror = (event) => { log.error('initiator ICE candidate error', event) + metrics?.dialerEvents.increment({ ice_error: true }) } // create an offer const offerSdp = await peerConnection.createOffer().catch(err => { log.error('could not execute createOffer', err) - throw new SDPHandshakeFailedError('Failed to set createOffer') + metrics?.dialerEvents.increment({ offer_error: true }) + throw new SDPHandshakeFailedError('Failed to create offer') }) log.trace('initiator send SDP offer %s', offerSdp.sdp) @@ -122,19 +178,22 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa // set offer as local description await peerConnection.setLocalDescription(offerSdp).catch(err => { log.error('could not execute setLocalDescription', err) - throw new SDPHandshakeFailedError('Failed to set localDescription') + metrics?.dialerEvents.increment({ local_description_error: true }) + throw new SDPHandshakeFailedError('Failed to set local description') }) onProgress?.(new CustomProgressEvent('webrtc:read-sdp-answer')) log.trace('initiator read SDP answer') - // read answer - const answerMessage = await messageStream.read({ - signal - }) + // read answer with timeout + const answerMessage = await Promise.race([ + messageStream.read({ signal }), + new Promise((_, reject) => setTimeout(() => reject(new Error('SDP answer timeout')), 30000)) + ]) if (answerMessage.type !== Message.Type.SDP_ANSWER) { + metrics?.dialerEvents.increment({ answer_error: true }) throw new SDPHandshakeFailedError('Remote should send an SDP answer') } @@ -143,7 +202,8 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) await peerConnection.setRemoteDescription(answerSdp).catch(err => { log.error('could not execute setRemoteDescription', err) - throw new SDPHandshakeFailedError('Failed to set remoteDescription') + metrics?.dialerEvents.increment({ remote_description_error: true }) + throw new SDPHandshakeFailedError('Failed to set remote description') }) log.trace('initiator read candidates until connected') @@ -168,6 +228,7 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa }) log.trace('initiator connected to remote address %s', ma) + metrics?.dialerEvents.increment({ success: true }) return { remoteAddress: ma, @@ -176,12 +237,17 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa } } catch (err: any) { log.error('outgoing signaling error', err) + metrics?.dialerEvents.increment({ error: true }) + clearTimeout(iceGatheringTimeout) + clearTimeout(connectionStateTimeout) peerConnection.close() stream.abort(err) throw err } finally { peerConnection.onicecandidate = null peerConnection.onicecandidateerror = null + peerConnection.onicegatheringstatechange = null + peerConnection.onconnectionstatechange = null } } diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 0785a134b6..49d60b5c55 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -139,37 +139,83 @@ class WebSockets implements Transport { const cOpts = ma.toOptions() this.log('dialing %s:%s', cOpts.host, cOpts.port) - const errorPromise = pDefer() - const rawSocket = connect(toUri(ma), this.init) - rawSocket.socket.addEventListener('error', () => { - // the WebSocket.ErrorEvent type doesn't actually give us any useful - // information about what happened - // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event - const err = new ConnectionFailedError(`Could not connect to ${ma.toString()}`) - this.log.error('connection error:', err) - this.metrics?.dialerEvents.increment({ error: true }) - errorPromise.reject(err) - }) + const MAX_RETRIES = 3 + const RETRY_DELAY_MS = 1000 + let lastError: Error | undefined - try { - options.onProgress?.(new CustomProgressEvent('websockets:open-connection')) - await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal) - } catch (err: any) { - if (options.signal?.aborted) { - this.metrics?.dialerEvents.increment({ abort: true }) - } + for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { + const errorPromise = pDefer() + const connectPromise = pDefer() + const rawSocket = connect(toUri(ma), this.init) - rawSocket.close() - .catch(err => { - this.log.error('error closing raw socket', err) - }) + // Track connection state + let isConnecting = true + + // Handle WebSocket errors + rawSocket.socket.addEventListener('error', (event) => { + const err = new ConnectionFailedError(`WebSocket connection failed to ${ma.toString()}: ${(event as any).message ?? 'Unknown error'}`) + this.log.error('connection error (attempt %d/%d):', attempt, MAX_RETRIES, err) + this.metrics?.dialerEvents.increment({ error: true }) + errorPromise.reject(err) + }) + + // Handle successful connection + rawSocket.socket.addEventListener('open', () => { + this.log('connection successful on attempt %d', attempt) + connectPromise.resolve() + }) + + try { + options.onProgress?.(new CustomProgressEvent('websockets:open-connection', { attempt })) + + // Race between connection, error, and timeout + await raceSignal( + Promise.race([ + connectPromise.promise, + errorPromise.promise, + // Add explicit connection timeout + new Promise((_, reject) => setTimeout(() => { + if (isConnecting) { + reject(new ConnectionFailedError(`Connection timeout after ${this.init.timeout ?? 30000}ms`)) + } + }, this.init.timeout ?? 30000)) + ]), + options.signal + ) - throw err + isConnecting = false + this.log('connected %s on attempt %d', ma, attempt) + this.metrics?.dialerEvents.increment({ connect: true }) + return rawSocket + } catch (err: any) { + isConnecting = false + lastError = err + + if (options.signal?.aborted) { + this.metrics?.dialerEvents.increment({ abort: true }) + throw err + } + + // Close the failed socket + rawSocket.close() + .catch(err => { + this.log.error('error closing raw socket after failure', err) + }) + + // If we have retries left, wait and try again + if (attempt < MAX_RETRIES) { + this.log('retrying connection after %dms...', RETRY_DELAY_MS) + await new Promise(resolve => setTimeout(resolve, RETRY_DELAY_MS)) + continue + } + + // No more retries, throw the last error + throw new ConnectionFailedError(`Failed to connect after ${MAX_RETRIES} attempts: ${lastError.message}`) + } } - this.log('connected %s', ma) - this.metrics?.dialerEvents.increment({ connect: true }) - return rawSocket + // This should never be reached due to the throw above + throw new ConnectionFailedError('Unexpected connection failure') } /**