From 16b21c3c8b1747d41520980c3d4a6741444db366 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 3 Feb 2026 21:51:53 +0100 Subject: [PATCH 1/3] poc --- src/room/Room.ts | 33 +++++++++++++++++++++--- src/room/participant/LocalParticipant.ts | 9 +++---- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/room/Room.ts b/src/room/Room.ts index d66f6cf3f0..0a46fd70cf 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -293,6 +293,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); } } + + this.setupRPCStream(); } registerTextStreamHandler(topic: string, callback: TextStreamHandler) { @@ -1141,6 +1143,16 @@ class Room extends (EventEmitter as new () => TypedEmitter) } } + private setupRPCStream() { + this.incomingDataStreamManager.registerTextStreamHandler('lk.rpc_response', async (reader) => { + const requestId = reader.info.attributes?.['lk.rpc_request_id']; + const payload = await reader.readAll(); + if (requestId) { + this.localParticipant._handleIncomingRpcResponse(requestId, payload, null); + } + }); + } + private onPageLeave = async () => { this.log.info('Page leave detected, disconnecting', this.logContext); await this.disconnect(); @@ -1897,7 +1909,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) ) { await this.engine.publishRpcAck(callerIdentity, requestId); - if (version !== 1) { + const supportedVersions = [1, 2]; + + if (!supportedVersions.includes(version)) { await this.engine.publishRpcResponse( callerIdentity, requestId, @@ -1929,7 +1943,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) payload, responseTimeout, }); - if (byteLength(response) > MAX_PAYLOAD_BYTES) { + if (byteLength(response) > MAX_PAYLOAD_BYTES && version === 1) { responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE'); this.log.warn(`RPC Response payload too large for ${method}`); } else { @@ -1946,7 +1960,20 @@ class Room extends (EventEmitter as new () => TypedEmitter) responseError = RpcError.builtIn('APPLICATION_ERROR'); } } - await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError); + if (version === 1 || responseError) { + await this.engine.publishRpcResponse( + callerIdentity, + requestId, + responsePayload, + responseError, + ); + } else { + await this.localParticipant.sendText(responsePayload ?? '', { + topic: 'lk.rpc_response', + attributes: { 'lk.rpc_request_id': requestId }, + destinationIdentities: [callerIdentity], + }); + } } bufferedSegments: Map = new Map(); diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 495ccc80dd..670d607a23 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -1907,11 +1907,8 @@ export default class LocalParticipant extends Participant { } } - private handleIncomingRpcResponse( - requestId: string, - payload: string | null, - error: RpcError | null, - ) { + /** @internal */ + _handleIncomingRpcResponse(requestId: string, payload: string | null, error: RpcError | null) { const handler = this.pendingResponses.get(requestId); if (handler) { handler.resolve(payload, error); @@ -1939,7 +1936,7 @@ export default class LocalParticipant extends Participant { method, payload, responseTimeoutMs: responseTimeout, - version: 1, + version: 2, }), }, }); From 54d4b55240221bd75b84601eb2b3d5d6e2af41ce Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 3 Feb 2026 22:00:59 +0100 Subject: [PATCH 2/3] update example and fix error --- examples/rpc/rpc-demo.ts | 2 +- src/room/participant/LocalParticipant.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/rpc/rpc-demo.ts b/examples/rpc/rpc-demo.ts index 4815754781..c2be9b9cde 100644 --- a/examples/rpc/rpc-demo.ts +++ b/examples/rpc/rpc-demo.ts @@ -81,7 +81,7 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room) async (data: RpcInvocationData) => { console.log(`[Greeter] Oh ${data.callerIdentity} arrived and said "${data.payload}"`); await new Promise((resolve) => setTimeout(resolve, 2000)); - return 'Welcome and have a wonderful day!'; + return 'Welcome and have a wonderful day!' + new Array(10_000).fill('a').join(''); }, ); diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 670d607a23..8688c9cba0 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -323,7 +323,7 @@ export default class LocalParticipant extends Participant { } else if (rpcResponse.value.case === 'error') { error = RpcError.fromProto(rpcResponse.value.value); } - this.handleIncomingRpcResponse(rpcResponse.requestId, payload, error); + this._handleIncomingRpcResponse(rpcResponse.requestId, payload, error); break; case 'rpcAck': let rpcAck = packet.value.value as RpcAck; From 029e7cbe18f93de19320d512c3c47c82a42615d8 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 3 Feb 2026 22:19:33 +0100 Subject: [PATCH 3/3] fix rpc test --- src/room/rpc.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/room/rpc.test.ts b/src/room/rpc.test.ts index 5ac3d78606..f7231ef105 100644 --- a/src/room/rpc.test.ts +++ b/src/room/rpc.test.ts @@ -207,7 +207,7 @@ describe('LocalParticipant', () => { setTimeout(() => { localParticipant.handleIncomingRpcAck(requestId); setTimeout(() => { - localParticipant.handleIncomingRpcResponse(requestId, responsePayload, null); + localParticipant._handleIncomingRpcResponse(requestId, responsePayload, null); }, 10); }, 10); }); @@ -262,7 +262,7 @@ describe('LocalParticipant', () => { const requestId = packet.value.value.id; setTimeout(() => { localParticipant.handleIncomingRpcAck(requestId); - localParticipant.handleIncomingRpcResponse( + localParticipant._handleIncomingRpcResponse( requestId, null, new RpcError(errorCode, errorMessage),