Skip to content

Commit 961d166

Browse files
Refactor incremental render handling and improve error management
- Introduced `IncrementalRenderSink` type to manage streaming updates more effectively. - Updated `handleIncrementalRenderRequest` to return an optional sink and handle execution context errors gracefully. - Refactored the `run` function to utilize the new sink for processing updates, enhancing error logging for unexpected chunks. - Simplified test setup by removing unused sink methods, ensuring tests focus on relevant functionality.
1 parent 2a036e3 commit 961d166

File tree

4 files changed

+74
-97
lines changed

4 files changed

+74
-97
lines changed

react_on_rails_pro/packages/node-renderer/src/worker.ts

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import handleGracefulShutdown from './worker/handleGracefulShutdown';
2525
import {
2626
handleIncrementalRenderRequest,
2727
type IncrementalRenderInitialRequest,
28+
type IncrementalRenderSink,
2829
} from './worker/handleIncrementalRenderRequest';
2930
import { handleIncrementalRenderStream } from './worker/handleIncrementalRenderStream';
3031
import {
@@ -260,7 +261,7 @@ export default function run(config: Partial<Config>) {
260261
const { bundleTimestamp } = req.params;
261262

262263
// Stream parser state
263-
let renderResult: Awaited<ReturnType<typeof handleIncrementalRenderRequest>> | null = null;
264+
let incrementalSink: IncrementalRenderSink | undefined;
264265

265266
try {
266267
// Handle the incremental render stream
@@ -292,10 +293,12 @@ export default function run(config: Partial<Config>) {
292293
};
293294

294295
try {
295-
renderResult = await handleIncrementalRenderRequest(initial);
296+
const { response, sink } = await handleIncrementalRenderRequest(initial);
297+
incrementalSink = sink;
298+
296299
return {
297-
response: renderResult.response,
298-
shouldContinue: true,
300+
response,
301+
shouldContinue: !!incrementalSink,
299302
};
300303
} catch (err) {
301304
const errorResponse = errorResponseResult(
@@ -313,13 +316,13 @@ export default function run(config: Partial<Config>) {
313316
},
314317

315318
onUpdateReceived: (obj: unknown) => {
316-
// Only process updates if we have a render result
317-
if (!renderResult) {
319+
if (!incrementalSink) {
320+
log.error({ msg: 'Unexpected update chunk received after rendering was aborted', obj });
318321
return;
319322
}
320323

321324
try {
322-
renderResult.sink.add(obj);
325+
incrementalSink.add(obj);
323326
} catch (err) {
324327
// Log error but don't stop processing
325328
log.error({ err, msg: 'Error processing update chunk' });
@@ -331,13 +334,7 @@ export default function run(config: Partial<Config>) {
331334
},
332335

333336
onRequestEnded: () => {
334-
try {
335-
if (renderResult) {
336-
renderResult.sink.end();
337-
}
338-
} catch (err) {
339-
log.error({ err, msg: 'Error ending render sink' });
340-
}
337+
// Do nothing
341338
},
342339
});
343340
} catch (err) {

react_on_rails_pro/packages/node-renderer/src/worker/handleIncrementalRenderRequest.ts

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
11
import type { ResponseResult } from '../shared/utils';
22
import { handleRenderRequest } from './handleRenderRequest';
3+
import log from '../shared/log';
4+
import { getRequestBundleFilePath } from '../shared/utils';
35

46
export type IncrementalRenderSink = {
57
/** Called for every subsequent NDJSON object after the first one */
68
add: (chunk: unknown) => void;
7-
/** Called when the client finishes sending the NDJSON stream */
8-
end: () => void;
9-
/** Called if the request stream errors or validation fails */
10-
abort: (error: unknown) => void;
119
};
1210

11+
export type UpdateChunk = {
12+
bundleTimestamp: string | number;
13+
updateChunk: string;
14+
};
15+
16+
function assertIsUpdateChunk(value: unknown): asserts value is UpdateChunk {
17+
if (
18+
typeof value !== 'object' ||
19+
value === null ||
20+
!('bundleTimestamp' in value) ||
21+
!('updateChunk' in value) ||
22+
(typeof value.bundleTimestamp !== 'string' && typeof value.bundleTimestamp !== 'number') ||
23+
typeof value.updateChunk !== 'string'
24+
) {
25+
throw new Error('Invalid incremental render chunk received, missing properties');
26+
}
27+
}
28+
1329
export type IncrementalRenderInitialRequest = {
1430
renderingRequest: string;
1531
bundleTimestamp: string | number;
@@ -18,7 +34,7 @@ export type IncrementalRenderInitialRequest = {
1834

1935
export type IncrementalRenderResult = {
2036
response: ResponseResult;
21-
sink: IncrementalRenderSink;
37+
sink?: IncrementalRenderSink;
2238
};
2339

2440
/**
@@ -34,26 +50,34 @@ export async function handleIncrementalRenderRequest(
3450

3551
try {
3652
// Call handleRenderRequest internally to handle all validation and VM execution
37-
const renderResult = await handleRenderRequest({
53+
const { response, executionContext } = await handleRenderRequest({
3854
renderingRequest,
3955
bundleTimestamp,
4056
dependencyBundleTimestamps,
4157
providedNewBundles: undefined,
4258
assetsToCopy: undefined,
4359
});
4460

45-
// Return the result directly with a placeholder sink
61+
// If we don't get an execution context, it means there was an early error
62+
// (e.g. bundle not found). In this case, the sink will be a no-op.
63+
if (!executionContext) {
64+
return { response };
65+
}
66+
67+
// Return the result with a sink that uses the execution context
4668
return {
47-
response: renderResult,
69+
response,
4870
sink: {
49-
add: () => {
50-
/* no-op - will be implemented in next commit */
51-
},
52-
end: () => {
53-
/* no-op - will be implemented in next commit */
54-
},
55-
abort: () => {
56-
/* no-op - will be implemented in next commit */
71+
add: (chunk: unknown) => {
72+
try {
73+
assertIsUpdateChunk(chunk);
74+
const bundlePath = getRequestBundleFilePath(chunk.bundleTimestamp);
75+
executionContext.runInVM(chunk.updateChunk, bundlePath).catch((err: unknown) => {
76+
log.error({ msg: 'Error running incremental render chunk', err, chunk });
77+
});
78+
} catch (err) {
79+
log.error({ msg: 'Invalid incremental render chunk', err, chunk });
80+
}
5781
},
5882
},
5983
};
@@ -67,17 +91,6 @@ export async function handleIncrementalRenderRequest(
6791
headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' },
6892
data: errorMessage,
6993
},
70-
sink: {
71-
add: () => {
72-
/* no-op */
73-
},
74-
end: () => {
75-
/* no-op */
76-
},
77-
abort: () => {
78-
/* no-op */
79-
},
80-
},
8194
};
8295
}
8396
}

react_on_rails_pro/packages/node-renderer/src/worker/handleRenderRequest.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ export async function handleRenderRequest({
192192
dependencyBundleTimestamps?: string[] | number[];
193193
providedNewBundles?: ProvidedNewBundle[] | null;
194194
assetsToCopy?: Asset[] | null;
195-
}): Promise<ResponseResult> {
195+
}): Promise<{ response: ResponseResult; executionContext?: ExecutionContext }> {
196196
try {
197197
// const bundleFilePathPerTimestamp = getRequestBundleFilePath(bundleTimestamp);
198198
const allBundleFilePaths = Array.from(
@@ -204,15 +204,20 @@ export async function handleRenderRequest({
204204

205205
if (allBundleFilePaths.length > maxVMPoolSize) {
206206
return {
207-
headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' },
208-
status: 410,
209-
data: `Too many bundles uploaded. The maximum allowed is ${maxVMPoolSize}. Please reduce the number of bundles or increase maxVMPoolSize in your configuration.`,
207+
response: {
208+
headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' },
209+
status: 410,
210+
data: `Too many bundles uploaded. The maximum allowed is ${maxVMPoolSize}. Please reduce the number of bundles or increase maxVMPoolSize in your configuration.`,
211+
},
210212
};
211213
}
212214

213215
try {
214216
const executionContext = await buildExecutionContext(allBundleFilePaths, /* buildVmsIfNeeded */ false);
215-
return await prepareResult(renderingRequest, entryBundleFilePath, executionContext);
217+
return {
218+
response: await prepareResult(renderingRequest, entryBundleFilePath, executionContext),
219+
executionContext,
220+
};
216221
} catch (e) {
217222
// Ignore VMContextNotFoundError, it means the bundle does not exist.
218223
// The following code will handle this case.
@@ -225,14 +230,14 @@ export async function handleRenderRequest({
225230
if (providedNewBundles && providedNewBundles.length > 0) {
226231
const result = await handleNewBundlesProvided(renderingRequest, providedNewBundles, assetsToCopy);
227232
if (result) {
228-
return result;
233+
return { response: result };
229234
}
230235
}
231236

232237
// Check if the bundle exists:
233238
const missingBundleError = await validateBundlesExist(bundleTimestamp, dependencyBundleTimestamps);
234239
if (missingBundleError) {
235-
return missingBundleError;
240+
return { response: missingBundleError };
236241
}
237242

238243
// The bundle exists, but the VM has not yet been created.
@@ -243,7 +248,10 @@ export async function handleRenderRequest({
243248
workerIdLabel(),
244249
);
245250
const executionContext = await buildExecutionContext(allBundleFilePaths, /* buildVmsIfNeeded */ true);
246-
return await prepareResult(renderingRequest, entryBundleFilePath, executionContext);
251+
return {
252+
response: await prepareResult(renderingRequest, entryBundleFilePath, executionContext),
253+
executionContext,
254+
};
247255
} catch (error) {
248256
const msg = formatExceptionMessage(
249257
renderingRequest,

0 commit comments

Comments
 (0)