Skip to content

Commit 4be40ba

Browse files
authored
Move RPC registration to room level (#1396)
1 parent 061105f commit 4be40ba

File tree

8 files changed

+252
-213
lines changed

8 files changed

+252
-213
lines changed

.changeset/purple-moles-impress.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": minor
3+
---
4+
5+
Move RPC registration to room level and deprecate localParticipant level registration

examples/rpc/rpc-demo.ts

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
type RoomConnectOptions,
44
RoomEvent,
55
RpcError,
6-
RpcInvocationData,
6+
type RpcInvocationData,
77
} from '../../src/index';
88

99
let startTime: number;
@@ -75,7 +75,7 @@ async function main() {
7575
}
7676

7777
const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room): Promise<void> => {
78-
await greetersRoom.localParticipant?.registerRpcMethod(
78+
await greetersRoom.registerRpcMethod(
7979
'arrival',
8080
// eslint-disable-next-line @typescript-eslint/no-unused-vars
8181
async (data: RpcInvocationData) => {
@@ -85,52 +85,46 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
8585
},
8686
);
8787

88-
await mathGeniusRoom.localParticipant?.registerRpcMethod(
89-
'square-root',
90-
async (data: RpcInvocationData) => {
91-
const jsonData = JSON.parse(data.payload);
92-
const number = jsonData.number;
88+
await mathGeniusRoom.registerRpcMethod('square-root', async (data: RpcInvocationData) => {
89+
const jsonData = JSON.parse(data.payload);
90+
const number = jsonData.number;
9391

94-
console.log(
95-
`[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
96-
);
92+
console.log(
93+
`[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
94+
);
9795

98-
console.log(`[Math Genius] *doing math*…`);
99-
await new Promise((resolve) => setTimeout(resolve, 2000));
96+
console.log(`[Math Genius] *doing math*…`);
97+
await new Promise((resolve) => setTimeout(resolve, 2000));
10098

101-
const result = Math.sqrt(number);
102-
console.log(`[Math Genius] Aha! It's ${result}`);
103-
return JSON.stringify({ result });
104-
},
105-
);
99+
const result = Math.sqrt(number);
100+
console.log(`[Math Genius] Aha! It's ${result}`);
101+
return JSON.stringify({ result });
102+
});
106103

107-
await mathGeniusRoom.localParticipant?.registerRpcMethod(
108-
'divide',
109-
async (data: RpcInvocationData) => {
110-
const jsonData = JSON.parse(data.payload);
111-
const { numerator, denominator } = jsonData;
104+
await mathGeniusRoom.registerRpcMethod('divide', async (data: RpcInvocationData) => {
105+
const jsonData = JSON.parse(data.payload);
106+
const { numerator, denominator } = jsonData;
112107

113-
console.log(
114-
`[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
115-
);
108+
console.log(
109+
`[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
110+
);
116111

117-
await new Promise((resolve) => setTimeout(resolve, 2000));
112+
await new Promise((resolve) => setTimeout(resolve, 2000));
118113

119-
if (denominator === 0) {
120-
throw new Error('Cannot divide by zero');
121-
}
114+
if (denominator === 0) {
115+
throw new Error('Cannot divide by zero');
116+
}
122117

123-
const result = numerator / denominator;
124-
console.log(`[Math Genius] ${numerator} / ${denominator} = ${result}`);
125-
return JSON.stringify({ result });
126-
},
127-
);
118+
const result = numerator / denominator;
119+
console.log(`[Math Genius] ${numerator} / ${denominator} = ${result}`);
120+
return JSON.stringify({ result });
121+
});
128122
};
129123

130124
const performGreeting = async (room: Room): Promise<void> => {
131125
console.log("[Caller] Letting the greeter know that I've arrived");
132126
try {
133-
const response = await room.localParticipant!.performRpc({
127+
const response = await room.localParticipant.performRpc({
134128
destinationIdentity: 'greeter',
135129
method: 'arrival',
136130
payload: 'Hello',
@@ -145,7 +139,7 @@ const performGreeting = async (room: Room): Promise<void> => {
145139
const performDisconnection = async (room: Room): Promise<void> => {
146140
console.log('[Caller] Checking back in on the greeter...');
147141
try {
148-
const response = await room.localParticipant!.performRpc({
142+
const response = await room.localParticipant.performRpc({
149143
destinationIdentity: 'greeter',
150144
method: 'arrival',
151145
payload: 'You still there?',
@@ -164,7 +158,7 @@ const performDisconnection = async (room: Room): Promise<void> => {
164158
const performSquareRoot = async (room: Room): Promise<void> => {
165159
console.log("[Caller] What's the square root of 16?");
166160
try {
167-
const response = await room.localParticipant!.performRpc({
161+
const response = await room.localParticipant.performRpc({
168162
destinationIdentity: 'math-genius',
169163
method: 'square-root',
170164
payload: JSON.stringify({ number: 16 }),
@@ -180,7 +174,7 @@ const performSquareRoot = async (room: Room): Promise<void> => {
180174
const performQuantumHypergeometricSeries = async (room: Room): Promise<void> => {
181175
console.log("[Caller] What's the quantum hypergeometric series of 42?");
182176
try {
183-
const response = await room.localParticipant!.performRpc({
177+
const response = await room.localParticipant.performRpc({
184178
destinationIdentity: 'math-genius',
185179
method: 'quantum-hypergeometric-series',
186180
payload: JSON.stringify({ number: 42 }),
@@ -203,7 +197,7 @@ const performQuantumHypergeometricSeries = async (room: Room): Promise<void> =>
203197
const performDivision = async (room: Room): Promise<void> => {
204198
console.log("[Caller] Let's try dividing 10 by 0");
205199
try {
206-
const response = await room.localParticipant!.performRpc({
200+
const response = await room.localParticipant.performRpc({
207201
destinationIdentity: 'math-genius',
208202
method: 'divide',
209203
payload: JSON.stringify({ numerator: 10, denominator: 0 }),

src/api/SignalClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ export class SignalClient {
326326
} catch (e) {
327327
reject(
328328
new ConnectionError(
329-
'server was not reachable',
329+
e instanceof Error ? e.message : 'server was not reachable',
330330
ConnectionErrorReason.ServerUnreachable,
331331
),
332332
);

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export { facingModeFromDeviceLabel, facingModeFromLocalTrack } from './room/trac
6161
export * from './room/track/options';
6262
export * from './room/track/processor/types';
6363
export * from './room/track/types';
64+
export type * from './room/StreamReader';
65+
export type * from './room/StreamWriter';
6466
export type {
6567
DataPublishOptions,
6668
SimulationScenario,

src/room/RTCEngine.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import {
1616
type ReconnectResponse,
1717
RequestResponse,
1818
Room as RoomModel,
19+
RpcAck,
20+
RpcResponse,
1921
SignalTarget,
2022
SpeakerInfo,
2123
type StreamStateUpdate,
@@ -54,6 +56,7 @@ import {
5456
UnexpectedConnectionState,
5557
} from './errors';
5658
import { EngineEvent } from './events';
59+
import { RpcError } from './rpc';
5760
import CriticalTimers from './timers';
5861
import type LocalTrack from './track/LocalTrack';
5962
import type LocalTrackPublication from './track/LocalTrackPublication';
@@ -664,6 +667,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
664667
return;
665668
}
666669
const dp = DataPacket.fromBinary(new Uint8Array(buffer));
670+
667671
if (dp.value?.case === 'speaker') {
668672
// dispatch speaker updates
669673
this.emit(EngineEvent.ActiveSpeakersUpdate, dp.value.value.speakers);
@@ -1096,6 +1100,46 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
10961100
});
10971101
};
10981102

1103+
/** @internal */
1104+
async publishRpcResponse(
1105+
destinationIdentity: string,
1106+
requestId: string,
1107+
payload: string | null,
1108+
error: RpcError | null,
1109+
) {
1110+
const packet = new DataPacket({
1111+
destinationIdentities: [destinationIdentity],
1112+
kind: DataPacket_Kind.RELIABLE,
1113+
value: {
1114+
case: 'rpcResponse',
1115+
value: new RpcResponse({
1116+
requestId,
1117+
value: error
1118+
? { case: 'error', value: error.toProto() }
1119+
: { case: 'payload', value: payload ?? '' },
1120+
}),
1121+
},
1122+
});
1123+
1124+
await this.sendDataPacket(packet, DataPacket_Kind.RELIABLE);
1125+
}
1126+
1127+
/** @internal */
1128+
async publishRpcAck(destinationIdentity: string, requestId: string) {
1129+
const packet = new DataPacket({
1130+
destinationIdentities: [destinationIdentity],
1131+
kind: DataPacket_Kind.RELIABLE,
1132+
value: {
1133+
case: 'rpcAck',
1134+
value: new RpcAck({
1135+
requestId,
1136+
}),
1137+
},
1138+
});
1139+
1140+
await this.sendDataPacket(packet, DataPacket_Kind.RELIABLE);
1141+
}
1142+
10991143
/* @internal */
11001144
async sendDataPacket(packet: DataPacket, kind: DataPacket_Kind) {
11011145
const msg = packet.toBinary();

src/room/Room.ts

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ import LocalParticipant from './participant/LocalParticipant';
6767
import type Participant from './participant/Participant';
6868
import type { ConnectionQuality } from './participant/Participant';
6969
import RemoteParticipant from './participant/RemoteParticipant';
70+
import { MAX_PAYLOAD_BYTES, RpcError, type RpcInvocationData, byteLength } from './rpc';
7071
import CriticalTimers from './timers';
7172
import LocalAudioTrack from './track/LocalAudioTrack';
7273
import type LocalTrack from './track/LocalTrack';
@@ -206,6 +207,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
206207

207208
private textStreamHandlers = new Map<string, TextStreamHandler>();
208209

210+
private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();
211+
209212
/**
210213
* Creates a new Room, the primary construct for a LiveKit session.
211214
* @param options
@@ -237,7 +240,13 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
237240

238241
this.disconnectLock = new Mutex();
239242

240-
this.localParticipant = new LocalParticipant('', '', this.engine, this.options);
243+
this.localParticipant = new LocalParticipant(
244+
'',
245+
'',
246+
this.engine,
247+
this.options,
248+
this.rpcHandlers,
249+
);
241250

242251
if (this.options.videoCaptureDefaults.deviceId) {
243252
this.localParticipant.activeDeviceMap.set(
@@ -300,6 +309,113 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
300309
this.byteStreamHandlers.delete(topic);
301310
}
302311

312+
/**
313+
* Establishes the participant as a receiver for calls of the specified RPC method.
314+
* Will overwrite any existing callback for the same method.
315+
*
316+
* @param method - The name of the indicated RPC method
317+
* @param handler - Will be invoked when an RPC request for this method is received
318+
* @returns A promise that resolves when the method is successfully registered
319+
* @throws {Error} if the handler for a specific method has already been registered already
320+
*
321+
* @example
322+
* ```typescript
323+
* room.localParticipant?.registerRpcMethod(
324+
* 'greet',
325+
* async (data: RpcInvocationData) => {
326+
* console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
327+
* return `Hello, ${data.callerIdentity}!`;
328+
* }
329+
* );
330+
* ```
331+
*
332+
* The handler should return a Promise that resolves to a string.
333+
* If unable to respond within `responseTimeout`, the request will result in an error on the caller's side.
334+
*
335+
* You may throw errors of type `RpcError` with a string `message` in the handler,
336+
* and they will be received on the caller's side with the message intact.
337+
* Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
338+
*/
339+
registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise<string>) {
340+
if (this.rpcHandlers.has(method)) {
341+
throw Error(
342+
`RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`,
343+
);
344+
}
345+
this.rpcHandlers.set(method, handler);
346+
}
347+
348+
/**
349+
* Unregisters a previously registered RPC method.
350+
*
351+
* @param method - The name of the RPC method to unregister
352+
*/
353+
unregisterRpcMethod(method: string) {
354+
this.rpcHandlers.delete(method);
355+
}
356+
357+
private async handleIncomingRpcRequest(
358+
callerIdentity: string,
359+
requestId: string,
360+
method: string,
361+
payload: string,
362+
responseTimeout: number,
363+
version: number,
364+
) {
365+
await this.engine.publishRpcAck(callerIdentity, requestId);
366+
367+
if (version !== 1) {
368+
await this.engine.publishRpcResponse(
369+
callerIdentity,
370+
requestId,
371+
null,
372+
RpcError.builtIn('UNSUPPORTED_VERSION'),
373+
);
374+
return;
375+
}
376+
377+
const handler = this.rpcHandlers.get(method);
378+
379+
if (!handler) {
380+
await this.engine.publishRpcResponse(
381+
callerIdentity,
382+
requestId,
383+
null,
384+
RpcError.builtIn('UNSUPPORTED_METHOD'),
385+
);
386+
return;
387+
}
388+
389+
let responseError: RpcError | null = null;
390+
let responsePayload: string | null = null;
391+
392+
try {
393+
const response = await handler({
394+
requestId,
395+
callerIdentity,
396+
payload,
397+
responseTimeout,
398+
});
399+
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
400+
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
401+
console.warn(`RPC Response payload too large for ${method}`);
402+
} else {
403+
responsePayload = response;
404+
}
405+
} catch (error) {
406+
if (error instanceof RpcError) {
407+
responseError = error;
408+
} else {
409+
console.warn(
410+
`Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`,
411+
error,
412+
);
413+
responseError = RpcError.builtIn('APPLICATION_ERROR');
414+
}
415+
}
416+
await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError);
417+
}
418+
303419
/**
304420
* @experimental
305421
*/
@@ -1647,6 +1763,16 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
16471763
this.handleStreamChunk(packet.value.value);
16481764
} else if (packet.value.case === 'streamTrailer') {
16491765
this.handleStreamTrailer(packet.value.value);
1766+
} else if (packet.value.case === 'rpcRequest') {
1767+
const rpc = packet.value.value;
1768+
this.handleIncomingRpcRequest(
1769+
packet.participantIdentity,
1770+
rpc.id,
1771+
rpc.method,
1772+
rpc.payload,
1773+
rpc.responseTimeoutMs,
1774+
rpc.version,
1775+
);
16501776
}
16511777
};
16521778

0 commit comments

Comments
 (0)