Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/purple-moles-impress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": minor
---

Move RPC registration to room level and deprecate localParticipant level registration
72 changes: 33 additions & 39 deletions examples/rpc/rpc-demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
type RoomConnectOptions,
RoomEvent,
RpcError,
RpcInvocationData,
type RpcInvocationData,
} from '../../src/index';

let startTime: number;
Expand Down Expand Up @@ -75,7 +75,7 @@ async function main() {
}

const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room): Promise<void> => {
await greetersRoom.localParticipant?.registerRpcMethod(
await greetersRoom.registerRpcMethod(
'arrival',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (data: RpcInvocationData) => {
Expand All @@ -85,52 +85,46 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
},
);

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'square-root',
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const number = jsonData.number;
await mathGeniusRoom.registerRpcMethod('square-root', async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const number = jsonData.number;

console.log(
`[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
);
console.log(
`[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
);

console.log(`[Math Genius] *doing math*…`);
await new Promise((resolve) => setTimeout(resolve, 2000));
console.log(`[Math Genius] *doing math*…`);
await new Promise((resolve) => setTimeout(resolve, 2000));

const result = Math.sqrt(number);
console.log(`[Math Genius] Aha! It's ${result}`);
return JSON.stringify({ result });
},
);
const result = Math.sqrt(number);
console.log(`[Math Genius] Aha! It's ${result}`);
return JSON.stringify({ result });
});

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'divide',
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const { numerator, denominator } = jsonData;
await mathGeniusRoom.registerRpcMethod('divide', async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const { numerator, denominator } = jsonData;

console.log(
`[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
);
console.log(
`[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
);

await new Promise((resolve) => setTimeout(resolve, 2000));
await new Promise((resolve) => setTimeout(resolve, 2000));

if (denominator === 0) {
throw new Error('Cannot divide by zero');
}
if (denominator === 0) {
throw new Error('Cannot divide by zero');
}

const result = numerator / denominator;
console.log(`[Math Genius] ${numerator} / ${denominator} = ${result}`);
return JSON.stringify({ result });
},
);
const result = numerator / denominator;
console.log(`[Math Genius] ${numerator} / ${denominator} = ${result}`);
return JSON.stringify({ result });
});
};

const performGreeting = async (room: Room): Promise<void> => {
console.log("[Caller] Letting the greeter know that I've arrived");
try {
const response = await room.localParticipant!.performRpc({
const response = await room.localParticipant.performRpc({
destinationIdentity: 'greeter',
method: 'arrival',
payload: 'Hello',
Expand All @@ -145,7 +139,7 @@ const performGreeting = async (room: Room): Promise<void> => {
const performDisconnection = async (room: Room): Promise<void> => {
console.log('[Caller] Checking back in on the greeter...');
try {
const response = await room.localParticipant!.performRpc({
const response = await room.localParticipant.performRpc({
destinationIdentity: 'greeter',
method: 'arrival',
payload: 'You still there?',
Expand All @@ -164,7 +158,7 @@ const performDisconnection = async (room: Room): Promise<void> => {
const performSquareRoot = async (room: Room): Promise<void> => {
console.log("[Caller] What's the square root of 16?");
try {
const response = await room.localParticipant!.performRpc({
const response = await room.localParticipant.performRpc({
destinationIdentity: 'math-genius',
method: 'square-root',
payload: JSON.stringify({ number: 16 }),
Expand All @@ -180,7 +174,7 @@ const performSquareRoot = async (room: Room): Promise<void> => {
const performQuantumHypergeometricSeries = async (room: Room): Promise<void> => {
console.log("[Caller] What's the quantum hypergeometric series of 42?");
try {
const response = await room.localParticipant!.performRpc({
const response = await room.localParticipant.performRpc({
destinationIdentity: 'math-genius',
method: 'quantum-hypergeometric-series',
payload: JSON.stringify({ number: 42 }),
Expand All @@ -203,7 +197,7 @@ const performQuantumHypergeometricSeries = async (room: Room): Promise<void> =>
const performDivision = async (room: Room): Promise<void> => {
console.log("[Caller] Let's try dividing 10 by 0");
try {
const response = await room.localParticipant!.performRpc({
const response = await room.localParticipant.performRpc({
destinationIdentity: 'math-genius',
method: 'divide',
payload: JSON.stringify({ numerator: 10, denominator: 0 }),
Expand Down
44 changes: 44 additions & 0 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
type ReconnectResponse,
RequestResponse,
Room as RoomModel,
RpcAck,
RpcResponse,
SignalTarget,
SpeakerInfo,
type StreamStateUpdate,
Expand Down Expand Up @@ -54,6 +56,7 @@ import {
UnexpectedConnectionState,
} from './errors';
import { EngineEvent } from './events';
import { RpcError } from './rpc';
import CriticalTimers from './timers';
import type LocalTrack from './track/LocalTrack';
import type LocalTrackPublication from './track/LocalTrackPublication';
Expand Down Expand Up @@ -664,6 +667,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
return;
}
const dp = DataPacket.fromBinary(new Uint8Array(buffer));

if (dp.value?.case === 'speaker') {
// dispatch speaker updates
this.emit(EngineEvent.ActiveSpeakersUpdate, dp.value.value.speakers);
Expand Down Expand Up @@ -1096,6 +1100,46 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
});
};

/** @internal */
async publishRpcResponse(
destinationIdentity: string,
requestId: string,
payload: string | null,
error: RpcError | null,
) {
const packet = new DataPacket({
destinationIdentities: [destinationIdentity],
kind: DataPacket_Kind.RELIABLE,
value: {
case: 'rpcResponse',
value: new RpcResponse({
requestId,
value: error
? { case: 'error', value: error.toProto() }
: { case: 'payload', value: payload ?? '' },
}),
},
});

await this.sendDataPacket(packet, DataPacket_Kind.RELIABLE);
}

/** @internal */
async publishRpcAck(destinationIdentity: string, requestId: string) {
const packet = new DataPacket({
destinationIdentities: [destinationIdentity],
kind: DataPacket_Kind.RELIABLE,
value: {
case: 'rpcAck',
value: new RpcAck({
requestId,
}),
},
});

await this.sendDataPacket(packet, DataPacket_Kind.RELIABLE);
}

/* @internal */
async sendDataPacket(packet: DataPacket, kind: DataPacket_Kind) {
const msg = packet.toBinary();
Expand Down
127 changes: 126 additions & 1 deletion src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import LocalParticipant from './participant/LocalParticipant';
import type Participant from './participant/Participant';
import type { ConnectionQuality } from './participant/Participant';
import RemoteParticipant from './participant/RemoteParticipant';
import { MAX_PAYLOAD_BYTES, RpcError, type RpcInvocationData, byteLength } from './rpc';
import CriticalTimers from './timers';
import LocalAudioTrack from './track/LocalAudioTrack';
import type LocalTrack from './track/LocalTrack';
Expand Down Expand Up @@ -206,6 +207,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)

private textStreamHandlers = new Map<string, TextStreamHandler>();

private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();

/**
* Creates a new Room, the primary construct for a LiveKit session.
* @param options
Expand Down Expand Up @@ -237,7 +240,13 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)

this.disconnectLock = new Mutex();

this.localParticipant = new LocalParticipant('', '', this.engine, this.options);
this.localParticipant = new LocalParticipant(
'',
'',
this.engine,
this.options,
this.rpcHandlers,
);

if (this.options.videoCaptureDefaults.deviceId) {
this.localParticipant.activeDeviceMap.set(
Expand Down Expand Up @@ -300,6 +309,112 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.byteStreamHandlers.delete(topic);
}

/**
* Establishes the participant as a receiver for calls of the specified RPC method.
* Will overwrite any existing callback for the same method.
*
* @param method - The name of the indicated RPC method
* @param handler - Will be invoked when an RPC request for this method is received
* @returns A promise that resolves when the method is successfully registered
*
* @example
* ```typescript
* room.localParticipant?.registerRpcMethod(
* 'greet',
* async (data: RpcInvocationData) => {
* console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
* return `Hello, ${data.callerIdentity}!`;
* }
* );
* ```
*
* The handler should return a Promise that resolves to a string.
* If unable to respond within `responseTimeout`, the request will result in an error on the caller's side.
*
* You may throw errors of type `RpcError` with a string `message` in the handler,
* and they will be received on the caller's side with the message intact.
* Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
*/
registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise<string>) {
if (this.rpcHandlers.has(method)) {
throw Error(
`RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`,
);
}
this.rpcHandlers.set(method, handler);
}

/**
* Unregisters a previously registered RPC method.
*
* @param method - The name of the RPC method to unregister
*/
unregisterRpcMethod(method: string) {
this.rpcHandlers.delete(method);
}

private async handleIncomingRpcRequest(
callerIdentity: string,
requestId: string,
method: string,
payload: string,
responseTimeout: number,
version: number,
) {
await this.engine.publishRpcAck(callerIdentity, requestId);

if (version !== 1) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
null,
RpcError.builtIn('UNSUPPORTED_VERSION'),
);
return;
}

const handler = this.rpcHandlers.get(method);

if (!handler) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
null,
RpcError.builtIn('UNSUPPORTED_METHOD'),
);
return;
}

let responseError: RpcError | null = null;
let responsePayload: string | null = null;

try {
const response = await handler({
requestId,
callerIdentity,
payload,
responseTimeout,
});
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
console.warn(`RPC Response payload too large for ${method}`);
} else {
responsePayload = response;
}
} catch (error) {
if (error instanceof RpcError) {
responseError = error;
} else {
console.warn(
`Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`,
error,
);
responseError = RpcError.builtIn('APPLICATION_ERROR');
}
}
await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError);
}

/**
* @experimental
*/
Expand Down Expand Up @@ -1647,6 +1762,16 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.handleStreamChunk(packet.value.value);
} else if (packet.value.case === 'streamTrailer') {
this.handleStreamTrailer(packet.value.value);
} else if (packet.value.case === 'rpcRequest') {
const rpc = packet.value.value;
this.handleIncomingRpcRequest(
packet.participantIdentity,
rpc.id,
rpc.method,
rpc.payload,
rpc.responseTimeoutMs,
rpc.version,
);
}
};

Expand Down
Loading
Loading