Skip to content

Commit 3a40d6a

Browse files
refactor(kernel-browser-runtime): Address PR review concerns
- Add test coverage for handlerPromise with delayed resolution - Fix documentation to clarify caching behavior (not buffering) - Add explicit error handling for handlerPromise rejection - Rename handlerReady to handlerResolution for clarity Co-authored-by: Erik Marks <rekmarks@users.noreply.github.com>
1 parent 4e9df00 commit 3a40d6a

File tree

2 files changed

+221
-17
lines changed

2 files changed

+221
-17
lines changed

packages/kernel-browser-runtime/src/internal-comms/internal-connections.test.ts

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,5 +455,183 @@ describe('internal-connections', () => {
455455
expect.any(Error),
456456
);
457457
});
458+
459+
it('should handle messages with handlerPromise after resolution', async () => {
460+
const handlerPromise = Promise.resolve(mockHandleMessage);
461+
462+
receiveInternalConnections({
463+
handlerPromise,
464+
logger,
465+
});
466+
467+
const controlChannel = MockBroadcastChannel.channels.get(
468+
COMMS_CONTROL_CHANNEL_NAME,
469+
);
470+
controlChannel?.onmessage?.(
471+
new MessageEvent('message', {
472+
data: {
473+
method: 'init',
474+
params: { channelName: 'internal-process-channel' },
475+
},
476+
}),
477+
);
478+
479+
await delay();
480+
const commsStream = streamInstances[0]!;
481+
expect(commsStream).toBeDefined();
482+
const commsStreamWriteSpy = vi.spyOn(commsStream, 'write');
483+
484+
const commsChannel = MockBroadcastChannel.channels.get(
485+
'internal-process-channel',
486+
)!;
487+
488+
// Send first message
489+
commsChannel.onmessage?.(
490+
new MessageEvent('message', {
491+
data: {
492+
method: 'getStatus',
493+
params: null,
494+
id: 1,
495+
},
496+
}),
497+
);
498+
await delay();
499+
500+
expect(mockHandleMessage).toHaveBeenCalledWith({
501+
method: 'getStatus',
502+
params: null,
503+
id: 1,
504+
});
505+
expect(commsStreamWriteSpy).toHaveBeenCalledWith({
506+
jsonrpc: '2.0',
507+
id: 1,
508+
result: { vats: [], clusterConfig: makeClusterConfig() },
509+
});
510+
511+
// Send second message to verify caching (handler should be used directly)
512+
commsChannel.onmessage?.(
513+
new MessageEvent('message', {
514+
data: {
515+
method: 'getStatus',
516+
params: null,
517+
id: 2,
518+
},
519+
}),
520+
);
521+
await delay();
522+
523+
expect(mockHandleMessage).toHaveBeenCalledTimes(2);
524+
expect(commsStreamWriteSpy).toHaveBeenCalledTimes(2);
525+
});
526+
527+
it('should queue messages until handlerPromise resolves', async () => {
528+
let resolveHandler: (handler: typeof mockHandleMessage) => void;
529+
const handlerPromise = new Promise<typeof mockHandleMessage>((resolve) => {
530+
resolveHandler = resolve;
531+
});
532+
533+
receiveInternalConnections({
534+
handlerPromise,
535+
logger,
536+
});
537+
538+
const controlChannel = MockBroadcastChannel.channels.get(
539+
COMMS_CONTROL_CHANNEL_NAME,
540+
);
541+
controlChannel?.onmessage?.(
542+
new MessageEvent('message', {
543+
data: {
544+
method: 'init',
545+
params: { channelName: 'internal-process-channel' },
546+
},
547+
}),
548+
);
549+
550+
await delay();
551+
const commsStream = streamInstances[0]!;
552+
expect(commsStream).toBeDefined();
553+
const commsStreamWriteSpy = vi.spyOn(commsStream, 'write');
554+
555+
const commsChannel = MockBroadcastChannel.channels.get(
556+
'internal-process-channel',
557+
)!;
558+
559+
// Send message before handler is ready
560+
commsChannel.onmessage?.(
561+
new MessageEvent('message', {
562+
data: {
563+
method: 'getStatus',
564+
params: null,
565+
id: 1,
566+
},
567+
}),
568+
);
569+
570+
// Handler should not be called yet
571+
await delay();
572+
expect(mockHandleMessage).not.toHaveBeenCalled();
573+
expect(commsStreamWriteSpy).not.toHaveBeenCalled();
574+
575+
// Now resolve the handler
576+
resolveHandler!(mockHandleMessage);
577+
await delay();
578+
579+
// Now the message should be handled
580+
expect(mockHandleMessage).toHaveBeenCalledWith({
581+
method: 'getStatus',
582+
params: null,
583+
id: 1,
584+
});
585+
expect(commsStreamWriteSpy).toHaveBeenCalledWith({
586+
jsonrpc: '2.0',
587+
id: 1,
588+
result: { vats: [], clusterConfig: makeClusterConfig() },
589+
});
590+
});
591+
592+
it('should handle handlerPromise rejection', async () => {
593+
const handlerError = new Error('Handler initialization failed');
594+
const handlerPromise = Promise.reject(handlerError);
595+
596+
receiveInternalConnections({
597+
handlerPromise,
598+
logger,
599+
});
600+
601+
const controlChannel = MockBroadcastChannel.channels.get(
602+
COMMS_CONTROL_CHANNEL_NAME,
603+
);
604+
controlChannel?.onmessage?.(
605+
new MessageEvent('message', {
606+
data: {
607+
method: 'init',
608+
params: { channelName: 'internal-process-channel' },
609+
},
610+
}),
611+
);
612+
613+
await delay();
614+
const commsChannel = MockBroadcastChannel.channels.get(
615+
'internal-process-channel',
616+
)!;
617+
618+
// Send message - should trigger error handling
619+
commsChannel.onmessage?.(
620+
new MessageEvent('message', {
621+
data: {
622+
method: 'getStatus',
623+
params: null,
624+
id: 1,
625+
},
626+
}),
627+
);
628+
629+
await delay();
630+
631+
expect(logger.error).toHaveBeenCalledWith(
632+
'Error initializing message handler for internal process "internal-process-channel":',
633+
handlerError,
634+
);
635+
});
458636
});
459637
});

packages/kernel-browser-runtime/src/internal-comms/internal-connections.ts

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ type ReceiveConnectionsOptions = Omit<Options, 'label'> &
123123
* @param options.handler - The function to handle internal messages. Mutually exclusive
124124
* with `handlerPromise`.
125125
* @param options.handlerPromise - A promise that resolves to the handler function.
126-
* Messages will be buffered until the handler is ready, then subsequent messages are
127-
* handled directly. Mutually exclusive with `handler`.
126+
* Incoming messages await handler initialization once, then the resolved handler is
127+
* cached and used directly for all subsequent messages. Mutually exclusive with `handler`.
128128
* @param options.logger - The logger instance.
129129
* @param options.controlChannelName - The name of the control channel. Must match
130130
* the name used by {@link connectToKernel} on the other end.
@@ -137,18 +137,24 @@ export const receiveInternalConnections = ({
137137
}: ReceiveConnectionsOptions): void => {
138138
// Support both direct handler and promise-based handler
139139
let handler: HandleInternalMessage | null = null;
140-
let handlerReady: Promise<HandleInternalMessage>;
140+
let handlerResolution: Promise<HandleInternalMessage>;
141141

142142
if (directHandler !== undefined) {
143143
// Direct handler - use immediately
144144
handler = directHandler;
145-
handlerReady = Promise.resolve(directHandler);
145+
handlerResolution = Promise.resolve(directHandler);
146146
} else {
147147
// Promise-based handler - cache once resolved
148-
handlerReady = handlerPromise.then((resolvedHandler) => {
149-
handler = resolvedHandler;
150-
return resolvedHandler;
151-
});
148+
handlerResolution = handlerPromise.then(
149+
(resolvedHandler) => {
150+
handler = resolvedHandler;
151+
return resolvedHandler;
152+
},
153+
(error) => {
154+
// Re-throw to propagate initialization errors to message handlers
155+
throw error;
156+
},
157+
);
152158
}
153159

154160
const seenChannels = new Set<string>();
@@ -177,19 +183,39 @@ export const receiveInternalConnections = ({
177183
`Received message from internal process "${channelName}": ${JSON.stringify(message)}`,
178184
);
179185

180-
// Use cached handler if available, otherwise await once
181-
const messageHandler = handler ?? (await handlerReady);
182-
const reply = await messageHandler(message);
183-
if (reply !== undefined) {
184-
await kernelRpcStream.write(reply);
186+
try {
187+
// Use cached handler if available, otherwise await once
188+
const messageHandler = handler ?? (await handlerResolution);
189+
const reply = await messageHandler(message);
190+
if (reply !== undefined) {
191+
await kernelRpcStream.write(reply);
192+
}
193+
} catch (error) {
194+
// Check if this is a handler initialization error
195+
if (handler === null) {
196+
logger.error(
197+
`Error initializing message handler for internal process "${channelName}":`,
198+
error,
199+
);
200+
} else {
201+
logger.error(
202+
`Error handling message from internal process "${channelName}":`,
203+
error,
204+
);
205+
}
206+
throw error;
185207
}
186208
});
187209
})
188210
.catch((error) => {
189-
logger.error(
190-
`Error handling message from internal process "${channelName}":`,
191-
error,
192-
);
211+
// This catch handles connection errors and re-thrown handler errors
212+
if (handler !== null) {
213+
logger.error(
214+
`Error handling message from internal process "${channelName}":`,
215+
error,
216+
);
217+
}
218+
// Initialization errors are already logged in the try-catch above
193219
})
194220
.finally(() => {
195221
logger.debug(`Closed connection to internal process "${channelName}"`);

0 commit comments

Comments
 (0)