diff --git a/.changeset/purple-moles-impress.md b/.changeset/purple-moles-impress.md new file mode 100644 index 0000000000..7c40a45b36 --- /dev/null +++ b/.changeset/purple-moles-impress.md @@ -0,0 +1,5 @@ +--- +"livekit-client": minor +--- + +Move RPC registration to room level and deprecate localParticipant level registration diff --git a/examples/rpc/rpc-demo.ts b/examples/rpc/rpc-demo.ts index 5551db4853..4815754781 100644 --- a/examples/rpc/rpc-demo.ts +++ b/examples/rpc/rpc-demo.ts @@ -3,7 +3,7 @@ import { type RoomConnectOptions, RoomEvent, RpcError, - RpcInvocationData, + type RpcInvocationData, } from '../../src/index'; let startTime: number; @@ -75,7 +75,7 @@ async function main() { } const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room): Promise => { - await greetersRoom.localParticipant?.registerRpcMethod( + await greetersRoom.registerRpcMethod( 'arrival', // eslint-disable-next-line @typescript-eslint/no-unused-vars async (data: RpcInvocationData) => { @@ -85,52 +85,46 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room) }, ); - await mathGeniusRoom.localParticipant?.registerRpcMethod( - 'square-root', - async (data: RpcInvocationData) => { - const jsonData = JSON.parse(data.payload); - const number = jsonData.number; + await mathGeniusRoom.registerRpcMethod('square-root', async (data: RpcInvocationData) => { + const jsonData = JSON.parse(data.payload); + const number = jsonData.number; - console.log( - `[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`, - ); + console.log( + `[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`, + ); - console.log(`[Math Genius] *doing math*…`); - await new Promise((resolve) => setTimeout(resolve, 2000)); + console.log(`[Math Genius] *doing math*…`); + await new Promise((resolve) => setTimeout(resolve, 2000)); - const result = Math.sqrt(number); - console.log(`[Math Genius] Aha! It's ${result}`); - return JSON.stringify({ result }); - }, - ); + const result = Math.sqrt(number); + console.log(`[Math Genius] Aha! It's ${result}`); + return JSON.stringify({ result }); + }); - await mathGeniusRoom.localParticipant?.registerRpcMethod( - 'divide', - async (data: RpcInvocationData) => { - const jsonData = JSON.parse(data.payload); - const { numerator, denominator } = jsonData; + await mathGeniusRoom.registerRpcMethod('divide', async (data: RpcInvocationData) => { + const jsonData = JSON.parse(data.payload); + const { numerator, denominator } = jsonData; - console.log( - `[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`, - ); + console.log( + `[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`, + ); - await new Promise((resolve) => setTimeout(resolve, 2000)); + await new Promise((resolve) => setTimeout(resolve, 2000)); - if (denominator === 0) { - throw new Error('Cannot divide by zero'); - } + if (denominator === 0) { + throw new Error('Cannot divide by zero'); + } - const result = numerator / denominator; - console.log(`[Math Genius] ${numerator} / ${denominator} = ${result}`); - return JSON.stringify({ result }); - }, - ); + const result = numerator / denominator; + console.log(`[Math Genius] ${numerator} / ${denominator} = ${result}`); + return JSON.stringify({ result }); + }); }; const performGreeting = async (room: Room): Promise => { console.log("[Caller] Letting the greeter know that I've arrived"); try { - const response = await room.localParticipant!.performRpc({ + const response = await room.localParticipant.performRpc({ destinationIdentity: 'greeter', method: 'arrival', payload: 'Hello', @@ -145,7 +139,7 @@ const performGreeting = async (room: Room): Promise => { const performDisconnection = async (room: Room): Promise => { console.log('[Caller] Checking back in on the greeter...'); try { - const response = await room.localParticipant!.performRpc({ + const response = await room.localParticipant.performRpc({ destinationIdentity: 'greeter', method: 'arrival', payload: 'You still there?', @@ -164,7 +158,7 @@ const performDisconnection = async (room: Room): Promise => { const performSquareRoot = async (room: Room): Promise => { console.log("[Caller] What's the square root of 16?"); try { - const response = await room.localParticipant!.performRpc({ + const response = await room.localParticipant.performRpc({ destinationIdentity: 'math-genius', method: 'square-root', payload: JSON.stringify({ number: 16 }), @@ -180,7 +174,7 @@ const performSquareRoot = async (room: Room): Promise => { const performQuantumHypergeometricSeries = async (room: Room): Promise => { console.log("[Caller] What's the quantum hypergeometric series of 42?"); try { - const response = await room.localParticipant!.performRpc({ + const response = await room.localParticipant.performRpc({ destinationIdentity: 'math-genius', method: 'quantum-hypergeometric-series', payload: JSON.stringify({ number: 42 }), @@ -203,7 +197,7 @@ const performQuantumHypergeometricSeries = async (room: Room): Promise => const performDivision = async (room: Room): Promise => { console.log("[Caller] Let's try dividing 10 by 0"); try { - const response = await room.localParticipant!.performRpc({ + const response = await room.localParticipant.performRpc({ destinationIdentity: 'math-genius', method: 'divide', payload: JSON.stringify({ numerator: 10, denominator: 0 }), diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index 8992f3519e..b5c2168648 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -326,7 +326,7 @@ export class SignalClient { } catch (e) { reject( new ConnectionError( - 'server was not reachable', + e instanceof Error ? e.message : 'server was not reachable', ConnectionErrorReason.ServerUnreachable, ), ); diff --git a/src/index.ts b/src/index.ts index bea431eab6..c66d2f2f9d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -61,6 +61,8 @@ export { facingModeFromDeviceLabel, facingModeFromLocalTrack } from './room/trac export * from './room/track/options'; export * from './room/track/processor/types'; export * from './room/track/types'; +export type * from './room/StreamReader'; +export type * from './room/StreamWriter'; export type { DataPublishOptions, SimulationScenario, diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 65d495285f..12d097df2e 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -16,6 +16,8 @@ import { type ReconnectResponse, RequestResponse, Room as RoomModel, + RpcAck, + RpcResponse, SignalTarget, SpeakerInfo, type StreamStateUpdate, @@ -54,6 +56,7 @@ import { UnexpectedConnectionState, } from './errors'; import { EngineEvent } from './events'; +import { RpcError } from './rpc'; import CriticalTimers from './timers'; import type LocalTrack from './track/LocalTrack'; import type LocalTrackPublication from './track/LocalTrackPublication'; @@ -664,6 +667,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return; } const dp = DataPacket.fromBinary(new Uint8Array(buffer)); + if (dp.value?.case === 'speaker') { // dispatch speaker updates this.emit(EngineEvent.ActiveSpeakersUpdate, dp.value.value.speakers); @@ -1096,6 +1100,46 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }); }; + /** @internal */ + async publishRpcResponse( + destinationIdentity: string, + requestId: string, + payload: string | null, + error: RpcError | null, + ) { + const packet = new DataPacket({ + destinationIdentities: [destinationIdentity], + kind: DataPacket_Kind.RELIABLE, + value: { + case: 'rpcResponse', + value: new RpcResponse({ + requestId, + value: error + ? { case: 'error', value: error.toProto() } + : { case: 'payload', value: payload ?? '' }, + }), + }, + }); + + await this.sendDataPacket(packet, DataPacket_Kind.RELIABLE); + } + + /** @internal */ + async publishRpcAck(destinationIdentity: string, requestId: string) { + const packet = new DataPacket({ + destinationIdentities: [destinationIdentity], + kind: DataPacket_Kind.RELIABLE, + value: { + case: 'rpcAck', + value: new RpcAck({ + requestId, + }), + }, + }); + + await this.sendDataPacket(packet, DataPacket_Kind.RELIABLE); + } + /* @internal */ async sendDataPacket(packet: DataPacket, kind: DataPacket_Kind) { const msg = packet.toBinary(); diff --git a/src/room/Room.ts b/src/room/Room.ts index fc286174b0..41c3d1a0aa 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -67,6 +67,7 @@ import LocalParticipant from './participant/LocalParticipant'; import type Participant from './participant/Participant'; import type { ConnectionQuality } from './participant/Participant'; import RemoteParticipant from './participant/RemoteParticipant'; +import { MAX_PAYLOAD_BYTES, RpcError, type RpcInvocationData, byteLength } from './rpc'; import CriticalTimers from './timers'; import LocalAudioTrack from './track/LocalAudioTrack'; import type LocalTrack from './track/LocalTrack'; @@ -206,6 +207,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) private textStreamHandlers = new Map(); + private rpcHandlers: Map Promise> = new Map(); + /** * Creates a new Room, the primary construct for a LiveKit session. * @param options @@ -237,7 +240,13 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.disconnectLock = new Mutex(); - this.localParticipant = new LocalParticipant('', '', this.engine, this.options); + this.localParticipant = new LocalParticipant( + '', + '', + this.engine, + this.options, + this.rpcHandlers, + ); if (this.options.videoCaptureDefaults.deviceId) { this.localParticipant.activeDeviceMap.set( @@ -300,6 +309,113 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.byteStreamHandlers.delete(topic); } + /** + * Establishes the participant as a receiver for calls of the specified RPC method. + * Will overwrite any existing callback for the same method. + * + * @param method - The name of the indicated RPC method + * @param handler - Will be invoked when an RPC request for this method is received + * @returns A promise that resolves when the method is successfully registered + * @throws {Error} if the handler for a specific method has already been registered already + * + * @example + * ```typescript + * room.localParticipant?.registerRpcMethod( + * 'greet', + * async (data: RpcInvocationData) => { + * console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`); + * return `Hello, ${data.callerIdentity}!`; + * } + * ); + * ``` + * + * The handler should return a Promise that resolves to a string. + * If unable to respond within `responseTimeout`, the request will result in an error on the caller's side. + * + * You may throw errors of type `RpcError` with a string `message` in the handler, + * and they will be received on the caller's side with the message intact. + * Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error"). + */ + registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { + if (this.rpcHandlers.has(method)) { + throw Error( + `RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`, + ); + } + this.rpcHandlers.set(method, handler); + } + + /** + * Unregisters a previously registered RPC method. + * + * @param method - The name of the RPC method to unregister + */ + unregisterRpcMethod(method: string) { + this.rpcHandlers.delete(method); + } + + private async handleIncomingRpcRequest( + callerIdentity: string, + requestId: string, + method: string, + payload: string, + responseTimeout: number, + version: number, + ) { + await this.engine.publishRpcAck(callerIdentity, requestId); + + if (version !== 1) { + await this.engine.publishRpcResponse( + callerIdentity, + requestId, + null, + RpcError.builtIn('UNSUPPORTED_VERSION'), + ); + return; + } + + const handler = this.rpcHandlers.get(method); + + if (!handler) { + await this.engine.publishRpcResponse( + callerIdentity, + requestId, + null, + RpcError.builtIn('UNSUPPORTED_METHOD'), + ); + return; + } + + let responseError: RpcError | null = null; + let responsePayload: string | null = null; + + try { + const response = await handler({ + requestId, + callerIdentity, + payload, + responseTimeout, + }); + if (byteLength(response) > MAX_PAYLOAD_BYTES) { + responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE'); + console.warn(`RPC Response payload too large for ${method}`); + } else { + responsePayload = response; + } + } catch (error) { + if (error instanceof RpcError) { + responseError = error; + } else { + console.warn( + `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, + error, + ); + responseError = RpcError.builtIn('APPLICATION_ERROR'); + } + } + await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError); + } + /** * @experimental */ @@ -1647,6 +1763,16 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.handleStreamChunk(packet.value.value); } else if (packet.value.case === 'streamTrailer') { this.handleStreamTrailer(packet.value.value); + } else if (packet.value.case === 'rpcRequest') { + const rpc = packet.value.value; + this.handleIncomingRpcRequest( + packet.participantIdentity, + rpc.id, + rpc.method, + rpc.payload, + rpc.responseTimeoutMs, + rpc.version, + ); } }; diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index bdda34cdc4..49ad6707f7 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -139,6 +139,8 @@ export default class LocalParticipant extends Participant { private reconnectFuture?: Future; + private rpcHandlers: Map Promise>; + private pendingSignalRequests: Map< number, { @@ -150,8 +152,6 @@ export default class LocalParticipant extends Participant { private enabledPublishVideoCodecs: Codec[] = []; - private rpcHandlers: Map Promise> = new Map(); - private pendingAcks = new Map void; participantIdentity: string }>(); private pendingResponses = new Map< @@ -163,7 +163,13 @@ export default class LocalParticipant extends Participant { >(); /** @internal */ - constructor(sid: string, identity: string, engine: RTCEngine, options: InternalRoomOptions) { + constructor( + sid: string, + identity: string, + engine: RTCEngine, + options: InternalRoomOptions, + roomRpcHandlers: Map Promise>, + ) { super(sid, identity, undefined, undefined, undefined, { loggerName: options.loggerName, loggerContextCb: () => this.engine.logContext, @@ -180,6 +186,7 @@ export default class LocalParticipant extends Participant { ['audiooutput', 'default'], ]); this.pendingSignalRequests = new Map(); + this.rpcHandlers = roomRpcHandlers; } get lastCameraError(): Error | undefined { @@ -271,17 +278,6 @@ export default class LocalParticipant extends Participant { private handleDataPacket = (packet: DataPacket) => { switch (packet.value.case) { - case 'rpcRequest': - let rpcRequest = packet.value.value as RpcRequest; - this.handleIncomingRpcRequest( - packet.participantIdentity, - rpcRequest.id, - rpcRequest.method, - rpcRequest.payload, - rpcRequest.responseTimeoutMs, - rpcRequest.version, - ); - break; case 'rpcResponse': let rpcResponse = packet.value.value as RpcResponse; let payload: string | null = null; @@ -1883,39 +1879,20 @@ export default class LocalParticipant extends Participant { } /** - * Establishes the participant as a receiver for calls of the specified RPC method. - * Will overwrite any existing callback for the same method. - * - * @param method - The name of the indicated RPC method - * @param handler - Will be invoked when an RPC request for this method is received - * @returns A promise that resolves when the method is successfully registered - * - * @example - * ```typescript - * room.localParticipant?.registerRpcMethod( - * 'greet', - * async (data: RpcInvocationData) => { - * console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`); - * return `Hello, ${data.callerIdentity}!`; - * } - * ); - * ``` - * - * The handler should return a Promise that resolves to a string. - * If unable to respond within `responseTimeout`, the request will result in an error on the caller's side. - * - * You may throw errors of type `RpcError` with a string `message` in the handler, - * and they will be received on the caller's side with the message intact. - * Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error"). + * @deprecated use `room.registerRpcMethod` instead */ registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { + if (this.rpcHandlers.has(method)) { + this.log.warn( + `you're overriding the RPC handler for method ${method}, in the future this will throw an error`, + ); + } + this.rpcHandlers.set(method, handler); } /** - * Unregisters a previously registered RPC method. - * - * @param method - The name of the RPC method to unregister + * @deprecated use `room.unregisterRpcMethod` instead */ unregisterRpcMethod(method: string) { this.rpcHandlers.delete(method); @@ -1973,68 +1950,6 @@ export default class LocalParticipant extends Participant { } } - private async handleIncomingRpcRequest( - callerIdentity: string, - requestId: string, - method: string, - payload: string, - responseTimeout: number, - version: number, - ) { - await this.publishRpcAck(callerIdentity, requestId); - - if (version !== 1) { - await this.publishRpcResponse( - callerIdentity, - requestId, - null, - RpcError.builtIn('UNSUPPORTED_VERSION'), - ); - return; - } - - const handler = this.rpcHandlers.get(method); - - if (!handler) { - await this.publishRpcResponse( - callerIdentity, - requestId, - null, - RpcError.builtIn('UNSUPPORTED_METHOD'), - ); - return; - } - - let responseError: RpcError | null = null; - let responsePayload: string | null = null; - - try { - const response = await handler({ - requestId, - callerIdentity, - payload, - responseTimeout, - }); - if (byteLength(response) > MAX_PAYLOAD_BYTES) { - responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE'); - console.warn(`RPC Response payload too large for ${method}`); - } else { - responsePayload = response; - } - } catch (error) { - if (error instanceof RpcError) { - responseError = error; - } else { - console.warn( - `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, - error, - ); - responseError = RpcError.builtIn('APPLICATION_ERROR'); - } - } - await this.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError); - } - /** @internal */ private async publishRpcRequest( destinationIdentity: string, @@ -2061,46 +1976,6 @@ export default class LocalParticipant extends Participant { await this.engine.sendDataPacket(packet, DataPacket_Kind.RELIABLE); } - /** @internal */ - private async publishRpcResponse( - destinationIdentity: string, - requestId: string, - payload: string | null, - error: RpcError | null, - ) { - const packet = new DataPacket({ - destinationIdentities: [destinationIdentity], - kind: DataPacket_Kind.RELIABLE, - value: { - case: 'rpcResponse', - value: new RpcResponse({ - requestId, - value: error - ? { case: 'error', value: error.toProto() } - : { case: 'payload', value: payload ?? '' }, - }), - }, - }); - - await this.engine.sendDataPacket(packet, DataPacket_Kind.RELIABLE); - } - - /** @internal */ - private async publishRpcAck(destinationIdentity: string, requestId: string) { - const packet = new DataPacket({ - destinationIdentities: [destinationIdentity], - kind: DataPacket_Kind.RELIABLE, - value: { - case: 'rpcAck', - value: new RpcAck({ - requestId, - }), - }, - }); - - await this.engine.sendDataPacket(packet, DataPacket_Kind.RELIABLE); - } - /** @internal */ handleParticipantDisconnected(participantIdentity: string) { for (const [id, { participantIdentity: pendingIdentity }] of this.pendingAcks) { diff --git a/src/room/participant/LocalParticipant.test.ts b/src/room/rpc.test.ts similarity index 87% rename from src/room/participant/LocalParticipant.test.ts rename to src/room/rpc.test.ts index 238d16a306..5ac3d78606 100644 --- a/src/room/participant/LocalParticipant.test.ts +++ b/src/room/rpc.test.ts @@ -1,44 +1,37 @@ import { DataPacket, DataPacket_Kind } from '@livekit/protocol'; import { beforeEach, describe, expect, it, vi } from 'vitest'; -import type { InternalRoomOptions } from '../../options'; -import type RTCEngine from '../RTCEngine'; -import { RpcError } from '../rpc'; -import LocalParticipant from './LocalParticipant'; -import { ParticipantKind } from './Participant'; -import RemoteParticipant from './RemoteParticipant'; +import type { InternalRoomOptions } from '../options'; +import type RTCEngine from './RTCEngine'; +import Room from './Room'; +import LocalParticipant from './participant/LocalParticipant'; +import { ParticipantKind } from './participant/Participant'; +import RemoteParticipant from './participant/RemoteParticipant'; +import { RpcError } from './rpc'; describe('LocalParticipant', () => { describe('registerRpcMethod', () => { - let localParticipant: LocalParticipant; - let mockEngine: RTCEngine; - let mockRoomOptions: InternalRoomOptions; + let room: Room; let mockSendDataPacket: ReturnType; beforeEach(() => { mockSendDataPacket = vi.fn(); - mockEngine = { - client: { - sendUpdateLocalMetadata: vi.fn(), - }, - on: vi.fn().mockReturnThis(), - sendDataPacket: mockSendDataPacket, - } as unknown as RTCEngine; - mockRoomOptions = {} as InternalRoomOptions; + room = new Room(); + room.engine.client = { + sendUpdateLocalMetadata: vi.fn(), + }; + room.engine.on = vi.fn().mockReturnThis(); + room.engine.sendDataPacket = mockSendDataPacket; - localParticipant = new LocalParticipant( - 'test-sid', - 'test-identity', - mockEngine, - mockRoomOptions, - ); + room.localParticipant.sid = 'test-sid'; + room.localParticipant.identity = 'test-identity'; }); it('should register an RPC method handler', async () => { const methodName = 'testMethod'; const handler = vi.fn().mockResolvedValue('test response'); - localParticipant.registerRpcMethod(methodName, handler); + room.registerRpcMethod(methodName, handler); const mockCaller = new RemoteParticipant( {} as any, @@ -51,7 +44,7 @@ describe('LocalParticipant', () => { ParticipantKind.STANDARD, ); - await localParticipant.handleIncomingRpcRequest( + await room.handleIncomingRpcRequest( mockCaller.identity, 'test-request-id', methodName, @@ -84,7 +77,7 @@ describe('LocalParticipant', () => { const errorMessage = 'Test error'; const handler = vi.fn().mockRejectedValue(new Error(errorMessage)); - localParticipant.registerRpcMethod(methodName, handler); + room.registerRpcMethod(methodName, handler); const mockCaller = new RemoteParticipant( {} as any, @@ -97,7 +90,7 @@ describe('LocalParticipant', () => { ParticipantKind.STANDARD, ); - await localParticipant.handleIncomingRpcRequest( + await room.handleIncomingRpcRequest( mockCaller.identity, 'test-error-request-id', methodName, @@ -127,7 +120,7 @@ describe('LocalParticipant', () => { const errorMessage = 'some-error-message'; const handler = vi.fn().mockRejectedValue(new RpcError(errorCode, errorMessage)); - localParticipant.registerRpcMethod(methodName, handler); + room.localParticipant.registerRpcMethod(methodName, handler); const mockCaller = new RemoteParticipant( {} as any, @@ -140,7 +133,7 @@ describe('LocalParticipant', () => { ParticipantKind.STANDARD, ); - await localParticipant.handleIncomingRpcRequest( + await room.handleIncomingRpcRequest( mockCaller.identity, 'test-rpc-error-request-id', methodName,