Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/rpc/rpc-demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
async (data: RpcInvocationData) => {
console.log(`[Greeter] Oh ${data.callerIdentity} arrived and said "${data.payload}"`);
await new Promise((resolve) => setTimeout(resolve, 2000));
return 'Welcome and have a wonderful day!';
return 'Welcome and have a wonderful day!' + new Array<string>(10_000).fill('a').join('');
},
);

Expand Down
33 changes: 30 additions & 3 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
}
}

this.setupRPCStream();
}

registerTextStreamHandler(topic: string, callback: TextStreamHandler) {
Expand Down Expand Up @@ -1141,6 +1143,16 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private setupRPCStream() {
this.incomingDataStreamManager.registerTextStreamHandler('lk.rpc_response', async (reader) => {
const requestId = reader.info.attributes?.['lk.rpc_request_id'];
const payload = await reader.readAll();
if (requestId) {
this.localParticipant._handleIncomingRpcResponse(requestId, payload, null);
}
});
}

private onPageLeave = async () => {
this.log.info('Page leave detected, disconnecting', this.logContext);
await this.disconnect();
Expand Down Expand Up @@ -1897,7 +1909,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
) {
await this.engine.publishRpcAck(callerIdentity, requestId);

if (version !== 1) {
const supportedVersions = [1, 2];

if (!supportedVersions.includes(version)) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
Expand Down Expand Up @@ -1929,7 +1943,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
payload,
responseTimeout,
});
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
if (byteLength(response) > MAX_PAYLOAD_BYTES && version === 1) {
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
this.log.warn(`RPC Response payload too large for ${method}`);
} else {
Expand All @@ -1946,7 +1960,20 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
responseError = RpcError.builtIn('APPLICATION_ERROR');
}
}
await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError);
if (version === 1 || responseError) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
responsePayload,
responseError,
);
} else {
await this.localParticipant.sendText(responsePayload ?? '', {
topic: 'lk.rpc_response',
attributes: { 'lk.rpc_request_id': requestId },
destinationIdentities: [callerIdentity],
});
}
}

bufferedSegments: Map<string, TranscriptionSegmentModel> = new Map();
Expand Down
11 changes: 4 additions & 7 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ export default class LocalParticipant extends Participant {
} else if (rpcResponse.value.case === 'error') {
error = RpcError.fromProto(rpcResponse.value.value);
}
this.handleIncomingRpcResponse(rpcResponse.requestId, payload, error);
this._handleIncomingRpcResponse(rpcResponse.requestId, payload, error);
break;
case 'rpcAck':
let rpcAck = packet.value.value as RpcAck;
Expand Down Expand Up @@ -1907,11 +1907,8 @@ export default class LocalParticipant extends Participant {
}
}

private handleIncomingRpcResponse(
requestId: string,
payload: string | null,
error: RpcError | null,
) {
/** @internal */
_handleIncomingRpcResponse(requestId: string, payload: string | null, error: RpcError | null) {
const handler = this.pendingResponses.get(requestId);
if (handler) {
handler.resolve(payload, error);
Expand Down Expand Up @@ -1939,7 +1936,7 @@ export default class LocalParticipant extends Participant {
method,
payload,
responseTimeoutMs: responseTimeout,
version: 1,
version: 2,
}),
},
});
Expand Down
4 changes: 2 additions & 2 deletions src/room/rpc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ describe('LocalParticipant', () => {
setTimeout(() => {
localParticipant.handleIncomingRpcAck(requestId);
setTimeout(() => {
localParticipant.handleIncomingRpcResponse(requestId, responsePayload, null);
localParticipant._handleIncomingRpcResponse(requestId, responsePayload, null);
}, 10);
}, 10);
});
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('LocalParticipant', () => {
const requestId = packet.value.value.id;
setTimeout(() => {
localParticipant.handleIncomingRpcAck(requestId);
localParticipant.handleIncomingRpcResponse(
localParticipant._handleIncomingRpcResponse(
requestId,
null,
new RpcError(errorCode, errorMessage),
Expand Down
Loading