Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/blue-cheetahs-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': minor
---

Add new rtc path that defaults to single peer connection mode and falls back to legacy dual pc
1 change: 0 additions & 1 deletion examples/demo/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ const appActions = {
encryption: e2eeEnabled
? { keyProvider: state.e2eeKeyProvider, worker: new E2EEWorker() }
: undefined,
singlePeerConnection: true,
};
if (
roomOpts.publishDefaults?.videoCodec === 'av1' ||
Expand Down
36 changes: 25 additions & 11 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ export interface SignalOptions {
maxRetries: number;
e2eeEnabled: boolean;
websocketTimeout: number;
singlePeerConnection: boolean;
}

type SignalMessage = SignalRequest['message'];
Expand Down Expand Up @@ -246,12 +245,13 @@ export class SignalClient {
token: string,
opts: SignalOptions,
abortSignal?: AbortSignal,
forceV0Path?: boolean,
): Promise<JoinResponse> {
// during a full reconnect, we'd want to start the sequence even if currently
// connected
this.state = SignalConnectionState.CONNECTING;
this.options = opts;
const res = await this.connect(url, token, opts, abortSignal);
const res = await this.connect(url, token, opts, abortSignal, forceV0Path);
return res as JoinResponse;
}

Expand Down Expand Up @@ -286,16 +286,18 @@ export class SignalClient {
token: string,
opts: ConnectOpts,
abortSignal?: AbortSignal,
/** setting this to true results in dual peer connection mode being used */
forceV0Path?: boolean,
): Promise<JoinResponse | ReconnectResponse | undefined> {
const unlock = await this.connectionLock.lock();

this.connectOptions = opts;
const clientInfo = getClientInfo();
const params = opts.singlePeerConnection
? createJoinRequestConnectionParams(token, clientInfo, opts)
: createConnectionParams(token, clientInfo, opts);
const rtcUrl = createRtcUrl(url, params);
const validateUrl = createValidateUrl(rtcUrl);
const params = forceV0Path
? createConnectionParams(token, clientInfo, opts)
: createJoinRequestConnectionParams(token, clientInfo, opts);
const rtcUrl = createRtcUrl(url, params, forceV0Path).toString();
const validateUrl = createValidateUrl(rtcUrl).toString();

return new Promise<JoinResponse | ReconnectResponse | undefined>(async (resolve, reject) => {
try {
Expand Down Expand Up @@ -995,10 +997,22 @@ export class SignalClient {
): Promise<ConnectionError> {
try {
const resp = await fetch(validateUrl);
if (resp.status.toFixed(0).startsWith('4')) {
const msg = await resp.text();
return ConnectionError.notAllowed(msg, resp.status);
} else if (reason instanceof ConnectionError) {

switch (resp.status) {
case 404:
return ConnectionError.serviceNotFound(
'v1 RTC path not found. Consider upgrading your LiveKit server version',
'v0-rtc',
);
case 401:
case 403:
const msg = await resp.text();
return ConnectionError.notAllowed(msg, resp.status);
default:
break;
}

if (reason instanceof ConnectionError) {
return reason;
} else {
return ConnectionError.internal(
Expand Down
40 changes: 24 additions & 16 deletions src/api/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ describe('createRtcUrl', () => {
const url = 'wss://example.com';
const searchParams = new URLSearchParams();
const result = createRtcUrl(url, searchParams);
expect(result.toString()).toBe('wss://example.com/rtc');
expect(result.toString()).toBe('wss://example.com/rtc/v1');
});

it('should create a basic RTC URL with http protocol', () => {
const url = 'http://example.com';
const searchParams = new URLSearchParams();
const result = createRtcUrl(url, searchParams);
expect(result.toString()).toBe('ws://example.com/rtc');
expect(result.toString()).toBe('ws://example.com/rtc/v1');
});

it('should handle search parameters', () => {
Expand All @@ -25,7 +25,7 @@ describe('createRtcUrl', () => {
const result = createRtcUrl(url, searchParams);

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/rtc');
expect(parsedResult.pathname).toBe('/rtc/v1');
expect(parsedResult.searchParams.get('token')).toBe('test-token');
expect(parsedResult.searchParams.get('room')).toBe('test-room');
});
Expand All @@ -36,7 +36,7 @@ describe('createRtcUrl', () => {
const result = createRtcUrl(url, searchParams);

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/rtc');
expect(parsedResult.pathname).toBe('/rtc/v1');
});

it('should handle sub paths', () => {
Expand All @@ -45,7 +45,7 @@ describe('createRtcUrl', () => {
const result = createRtcUrl(url, searchParams);

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/sub/path/rtc');
expect(parsedResult.pathname).toBe('/sub/path/rtc/v1');
});

it('should handle sub paths with trailing slashes', () => {
Expand All @@ -54,7 +54,7 @@ describe('createRtcUrl', () => {
const result = createRtcUrl(url, searchParams);

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/sub/path/rtc');
expect(parsedResult.pathname).toBe('/sub/path/rtc/v1');
});

it('should handle sub paths with url params', () => {
Expand All @@ -64,7 +64,7 @@ describe('createRtcUrl', () => {
const result = createRtcUrl(url, searchParams);

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/sub/path/rtc');
expect(parsedResult.pathname).toBe('/sub/path/rtc/v1');
expect(parsedResult.searchParams.get('param')).toBe('value');
expect(parsedResult.searchParams.get('token')).toBe('test-token');
});
Expand All @@ -73,8 +73,8 @@ describe('createRtcUrl', () => {
describe('createValidateUrl', () => {
it('should create a basic validate URL', () => {
const rtcUrl = createRtcUrl('wss://example.com', new URLSearchParams());
const result = createValidateUrl(rtcUrl);
expect(result.toString()).toBe('https://example.com/rtc/validate');
const result = createValidateUrl(rtcUrl.toString());
expect(result.toString()).toBe('https://example.com/rtc/v1/validate');
});

it('should handle search parameters', () => {
Expand All @@ -85,33 +85,41 @@ describe('createValidateUrl', () => {
room: 'test-room',
}),
);
const result = createValidateUrl(rtcUrl);
const result = createValidateUrl(rtcUrl.toString());

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/rtc/validate');
expect(parsedResult.pathname).toBe('/rtc/v1/validate');
expect(parsedResult.searchParams.get('token')).toBe('test-token');
expect(parsedResult.searchParams.get('room')).toBe('test-room');
});

it('should handle ws protocol', () => {
const rtcUrl = createRtcUrl('ws://example.com', new URLSearchParams());
const result = createValidateUrl(rtcUrl);
const result = createValidateUrl(rtcUrl.toString());

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/rtc/validate');
expect(parsedResult.pathname).toBe('/rtc/v1/validate');
});

it('should preserve the original path', () => {
const rtcUrl = createRtcUrl('wss://example.com/some/path', new URLSearchParams());
const result = createValidateUrl(rtcUrl);
const result = createValidateUrl(rtcUrl.toString());

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/some/path/rtc/validate');
expect(parsedResult.pathname).toBe('/some/path/rtc/v1/validate');
});

it('should handle sub paths with trailing slashes', () => {
const rtcUrl = createRtcUrl('wss://example.com/sub/path/', new URLSearchParams());
const result = createValidateUrl(rtcUrl);
const result = createValidateUrl(rtcUrl.toString());

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/sub/path/rtc/v1/validate');
});

it('should handle v0 paths', () => {
const rtcUrl = createRtcUrl('wss://example.com/sub/path/', new URLSearchParams(), true);
const result = createValidateUrl(rtcUrl.toString());

const parsedResult = new URL(result);
expect(parsedResult.pathname).toBe('/sub/path/rtc/validate');
Expand Down
13 changes: 11 additions & 2 deletions src/api/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import { SignalResponse } from '@livekit/protocol';
import { toHttpUrl, toWebsocketUrl } from '../room/utils';

export function createRtcUrl(url: string, searchParams: URLSearchParams) {
export function createRtcUrl(url: string, searchParams: URLSearchParams, useV0Path = false) {
const v0Url = createV0RtcUrl(url, searchParams);
if (useV0Path) {
return v0Url;
} else {
return appendUrlPath(v0Url, 'v1');
}
}

export function createV0RtcUrl(url: string, searchParams: URLSearchParams) {
const urlObj = new URL(toWebsocketUrl(url));
searchParams.forEach((value, key) => {
urlObj.searchParams.set(key, value);
Expand All @@ -20,7 +29,7 @@ export function ensureTrailingSlash(path: string) {

function appendUrlPath(urlObj: URL, path: string) {
urlObj.pathname = `${ensureTrailingSlash(urlObj.pathname)}${path}`;
return urlObj.toString();
return urlObj;
}

export function parseSignalResponse(value: ArrayBuffer | string) {
Expand Down
19 changes: 12 additions & 7 deletions src/connectionHelper/checks/turn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@ export class TURNCheck extends Checker {
(await new RegionUrlProvider(this.url, this.token).getNextBestRegionUrl()) ?? this.url;
}
const signalClient = new SignalClient();
const joinRes = await signalClient.join(this.url, this.token, {
autoSubscribe: true,
maxRetries: 0,
e2eeEnabled: false,
websocketTimeout: 15_000,
singlePeerConnection: false,
});
const joinRes = await signalClient.join(
this.url,
this.token,
{
autoSubscribe: true,
maxRetries: 0,
e2eeEnabled: false,
websocketTimeout: 15_000,
},
undefined,
true,
);

let hasTLS = false;
let hasTURN = false;
Expand Down
38 changes: 24 additions & 14 deletions src/connectionHelper/checks/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ export class WebSocketCheck extends Checker {
let signalClient = new SignalClient();
let joinRes: JoinResponse | undefined;
try {
joinRes = await signalClient.join(this.url, this.token, {
autoSubscribe: true,
maxRetries: 0,
e2eeEnabled: false,
websocketTimeout: 15_000,
singlePeerConnection: false,
});
joinRes = await signalClient.join(
this.url,
this.token,
{
autoSubscribe: true,
maxRetries: 0,
e2eeEnabled: false,
websocketTimeout: 15_000,
},
undefined,
true,
);
} catch (e: any) {
if (isCloud(new URL(this.url))) {
this.appendMessage(
Expand All @@ -32,13 +37,18 @@ export class WebSocketCheck extends Checker {
const regionProvider = new RegionUrlProvider(this.url, this.token);
const regionUrl = await regionProvider.getNextBestRegionUrl();
if (regionUrl) {
joinRes = await signalClient.join(regionUrl, this.token, {
autoSubscribe: true,
maxRetries: 0,
e2eeEnabled: false,
websocketTimeout: 15_000,
singlePeerConnection: false,
});
joinRes = await signalClient.join(
regionUrl,
this.token,
{
autoSubscribe: true,
maxRetries: 0,
e2eeEnabled: false,
websocketTimeout: 15_000,
},
undefined,
true,
);
this.appendMessage(
`Fallback to region worked. To avoid initial connections failing, ensure you're calling room.prepareConnection() ahead of time`,
);
Expand Down
7 changes: 4 additions & 3 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ export interface InternalRoomOptions {
loggerName?: string;

/**
* @experimental
* only supported on LiveKit Cloud
* and LiveKit OSS >= 1.9.2
* will attempt to connect via single peer connection mode.
* falls back to dual peer connection mode if not available.
*
* @default true
*/
singlePeerConnection: boolean;
}
Expand Down
7 changes: 7 additions & 0 deletions src/room/PCTransportManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,20 @@ export class PCTransportManager {

private loggerOptions: LoggerOptions;

private _mode: PCMode;

get mode(): PCMode {
return this._mode;
}

constructor(rtcConfig: RTCConfiguration, mode: PCMode, loggerOptions: LoggerOptions) {
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCManager);
this.loggerOptions = loggerOptions;

this.isPublisherConnectionRequired = mode !== 'subscriber-primary';
this.isSubscriberConnectionRequired = mode === 'subscriber-primary';
this.publisher = new PCTransport(rtcConfig, loggerOptions);
this._mode = mode;
if (mode !== 'publisher-only') {
this.subscriber = new PCTransport(rtcConfig, loggerOptions);
this.subscriber.onConnectionStateChange = this.updateState;
Expand Down
Loading
Loading