Skip to content

Commit 2ee73d4

Browse files
authored
Ensure leave requests can be sent before join response is received (#1687)
1 parent d27d47a commit 2ee73d4

File tree

3 files changed

+110
-14
lines changed

3 files changed

+110
-14
lines changed

.changeset/good-comics-confess.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": patch
3+
---
4+
5+
Ensure leave requests can be sent before join response is received

src/api/SignalClient.test.ts

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { JoinResponse, LeaveRequest, ReconnectResponse, SignalResponse } from '@livekit/protocol';
1+
import {
2+
JoinResponse,
3+
LeaveRequest,
4+
ReconnectResponse,
5+
SignalRequest,
6+
SignalResponse,
7+
} from '@livekit/protocol';
28
import { beforeEach, describe, expect, it, vi } from 'vitest';
39
import { ConnectionError, ConnectionErrorReason } from '../room/errors';
410
import { SignalClient, SignalConnectionState } from './SignalClient';
@@ -84,6 +90,7 @@ describe('SignalClient.connect', () => {
8490
maxRetries: 0,
8591
e2eeEnabled: false,
8692
websocketTimeout: 1000,
93+
singlePeerConnection: false,
8794
};
8895

8996
beforeEach(() => {
@@ -202,6 +209,89 @@ describe('SignalClient.connect', () => {
202209
),
203210
).rejects.toThrow('User aborted connection');
204211
});
212+
213+
it('should send leave request before closing when AbortSignal is triggered during connection', async () => {
214+
const abortController = new AbortController();
215+
const writtenMessages: Array<ArrayBuffer | string> = [];
216+
let streamWriterReady: (() => void) | undefined;
217+
const streamWriterReadyPromise = new Promise<void>((resolve) => {
218+
streamWriterReady = resolve;
219+
});
220+
221+
// Create a mock writable stream that captures writes
222+
const mockWritable = new WritableStream({
223+
write(chunk) {
224+
writtenMessages.push(chunk);
225+
return Promise.resolve();
226+
},
227+
});
228+
229+
// Override getWriter to signal when streamWriter is assigned
230+
const originalGetWriter = mockWritable.getWriter.bind(mockWritable);
231+
mockWritable.getWriter = () => {
232+
const writer = originalGetWriter();
233+
streamWriterReady?.();
234+
return writer;
235+
};
236+
237+
const mockReadable = new ReadableStream<ArrayBuffer>({
238+
async start() {
239+
// Keep connection open but don't send join response yet
240+
// This simulates aborting during connection (after WS opens, before join response)
241+
},
242+
});
243+
244+
const mockConnection = {
245+
readable: mockReadable,
246+
writable: mockWritable,
247+
protocol: '',
248+
extensions: '',
249+
};
250+
251+
vi.mocked(WebSocketStream).mockImplementation(() => {
252+
return {
253+
url: 'wss://test.livekit.io',
254+
opened: Promise.resolve(mockConnection),
255+
closed: new Promise(() => {}),
256+
close: vi.fn(),
257+
readyState: 1,
258+
} as any;
259+
});
260+
261+
// Start the connection
262+
const joinPromise = signalClient.join(
263+
'wss://test.livekit.io',
264+
'test-token',
265+
defaultOptions,
266+
abortController.signal,
267+
);
268+
269+
// Wait for streamWriter to be assigned
270+
await streamWriterReadyPromise;
271+
272+
// Now abort the connection (after WS opens, before join response)
273+
abortController.abort(new Error('User aborted connection'));
274+
275+
// joinPromise should reject
276+
await expect(joinPromise).rejects.toThrow('User aborted connection');
277+
278+
// Verify that a leave request was sent before closing
279+
const leaveRequestSent = writtenMessages.some((data) => {
280+
if (typeof data === 'string') {
281+
return false;
282+
}
283+
try {
284+
const request = SignalRequest.fromBinary(
285+
data instanceof ArrayBuffer ? new Uint8Array(data) : data,
286+
);
287+
return request.message?.case === 'leave';
288+
} catch {
289+
return false;
290+
}
291+
});
292+
293+
expect(leaveRequestSent).toBe(true);
294+
});
205295
});
206296

207297
describe('Failure Case - WebSocket Connection Errors', () => {
@@ -429,6 +519,7 @@ describe('SignalClient.handleSignalConnected', () => {
429519
maxRetries: 0,
430520
e2eeEnabled: false,
431521
websocketTimeout: 1000,
522+
singlePeerConnection: false,
432523
};
433524

434525
beforeEach(() => {
@@ -448,17 +539,6 @@ describe('SignalClient.handleSignalConnected', () => {
448539
}
449540
});
450541

451-
it('should set up stream writer from connection writable', () => {
452-
const mockReadable = new ReadableStream<ArrayBuffer>();
453-
const mockConnection = createMockConnection(mockReadable);
454-
455-
const handleMethod = (signalClient as any).handleSignalConnected;
456-
if (handleMethod) {
457-
handleMethod.call(signalClient, mockConnection);
458-
expect((signalClient as any).streamWriter).toBeDefined();
459-
}
460-
});
461-
462542
it('should start reading loop without first message', async () => {
463543
const joinResponse = createJoinResponse();
464544
const signalResponse = createSignalResponse('join', joinResponse);
@@ -495,6 +575,7 @@ describe('SignalClient.validateFirstMessage', () => {
495575
maxRetries: 0,
496576
e2eeEnabled: false,
497577
websocketTimeout: 1000,
578+
singlePeerConnection: false,
498579
};
499580

500581
beforeEach(() => {

src/api/SignalClient.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,17 @@ export class SignalClient {
295295
const combinedAbort = AbortSignal.any(signals);
296296

297297
const abortHandler = async (event: Event) => {
298-
this.close();
298+
// send leave if we have an active stream writer (connection is open)
299+
if (this.streamWriter) {
300+
this.sendLeave()
301+
.then(() => this.close())
302+
.catch((e) => {
303+
this.log.error(e);
304+
this.close();
305+
});
306+
} else {
307+
this.close();
308+
}
299309
clearTimeout(wsTimeout);
300310
const target = event.currentTarget;
301311
reject(target instanceof AbortSignal ? target.reason : target);
@@ -383,6 +393,7 @@ export class SignalClient {
383393
return;
384394
}
385395
const signalReader = connection.readable.getReader();
396+
this.streamWriter = connection.writable.getWriter();
386397
const firstMessage = await signalReader.read();
387398
signalReader.releaseLock();
388399
if (!firstMessage.value) {
@@ -885,7 +896,6 @@ export class SignalClient {
885896
clearTimeout(timeoutHandle);
886897
this.startPingInterval();
887898
this.startReadingLoop(connection.readable.getReader(), firstMessage);
888-
this.streamWriter = connection.writable.getWriter();
889899
}
890900

891901
/**

0 commit comments

Comments
 (0)