@@ -114,6 +114,136 @@ describe('incremental render NDJSON endpoint', () => {
114114 } ) ;
115115 } ;
116116
117+ /**
118+ * Helper function to create a basic test setup with mocked handleIncrementalRenderRequest
119+ */
120+ const createBasicTestSetup = async ( ) => {
121+ await createVmBundle ( TEST_NAME ) ;
122+
123+ const { sink, sinkAddCalls, sinkEnd, sinkAbort } = createMockSink ( ) ;
124+ const mockResponse = createMockResponse ( ) ;
125+ const mockResult = createMockResult ( sink , mockResponse ) ;
126+
127+ const handleSpy = jest
128+ . spyOn ( incremental , 'handleIncrementalRenderRequest' )
129+ . mockImplementation ( ( ) => Promise . resolve ( mockResult ) ) ;
130+
131+ const SERVER_BUNDLE_TIMESTAMP = String ( BUNDLE_TIMESTAMP ) ;
132+
133+ return {
134+ sink,
135+ sinkAddCalls,
136+ sinkEnd,
137+ sinkAbort,
138+ mockResponse,
139+ mockResult,
140+ handleSpy,
141+ SERVER_BUNDLE_TIMESTAMP ,
142+ } ;
143+ } ;
144+
145+ /**
146+ * Helper function to create a streaming test setup
147+ */
148+ const createStreamingTestSetup = async ( ) => {
149+ await createVmBundle ( TEST_NAME ) ;
150+
151+ const { Readable } = await import ( 'stream' ) ;
152+ const responseStream = new Readable ( {
153+ read ( ) {
154+ // This is a readable stream that we can push to
155+ } ,
156+ } ) ;
157+
158+ const processedChunks : unknown [ ] = [ ] ;
159+ const sinkAdd = jest . fn ( ) ;
160+
161+ const sink : incremental . IncrementalRenderSink = {
162+ add : ( chunk ) => {
163+ console . log ( 'Sink.add called with chunk:' , chunk ) ;
164+ processedChunks . push ( chunk ) ;
165+ sinkAdd ( chunk ) ;
166+ } ,
167+ end : jest . fn ( ) ,
168+ abort : jest . fn ( ) ,
169+ } ;
170+
171+ const mockResponse : ResponseResult = {
172+ status : 200 ,
173+ headers : { 'Cache-Control' : 'no-cache, no-store, max-age=0, must-revalidate' } ,
174+ stream : responseStream ,
175+ } ;
176+
177+ const mockResult : incremental . IncrementalRenderResult = {
178+ response : mockResponse ,
179+ sink,
180+ } ;
181+
182+ const handleSpy = jest
183+ . spyOn ( incremental , 'handleIncrementalRenderRequest' )
184+ . mockImplementation ( ( ) => Promise . resolve ( mockResult ) ) ;
185+
186+ const SERVER_BUNDLE_TIMESTAMP = String ( BUNDLE_TIMESTAMP ) ;
187+
188+ return {
189+ responseStream,
190+ processedChunks,
191+ sinkAdd,
192+ sink,
193+ mockResponse,
194+ mockResult,
195+ handleSpy,
196+ SERVER_BUNDLE_TIMESTAMP ,
197+ } ;
198+ } ;
199+
200+ /**
201+ * Helper function to send chunks and wait for processing
202+ */
203+ const sendChunksAndWaitForProcessing = async (
204+ req : http . ClientRequest ,
205+ chunks : unknown [ ] ,
206+ waitForCondition : ( chunk : unknown , index : number ) => Promise < void > ,
207+ ) => {
208+ for ( let i = 0 ; i < chunks . length ; i += 1 ) {
209+ const chunk = chunks [ i ] ;
210+ req . write ( `${ JSON . stringify ( chunk ) } \n` ) ;
211+
212+ // eslint-disable-next-line no-await-in-loop
213+ await waitForCondition ( chunk , i ) ;
214+ }
215+ } ;
216+
217+ /**
218+ * Helper function to create streaming response promise
219+ */
220+ const createStreamingResponsePromise = ( req : http . ClientRequest ) => {
221+ return new Promise < { statusCode : number ; streamedData : string [ ] } > ( ( resolve , reject ) => {
222+ const streamedChunks : string [ ] = [ ] ;
223+
224+ req . on ( 'response' , ( res ) => {
225+ res . on ( 'data' , ( chunk : Buffer ) => {
226+ const chunkStr = chunk . toString ( ) ;
227+ console . log ( 'Client received chunk:' , chunkStr ) ;
228+ streamedChunks . push ( chunkStr ) ;
229+ } ) ;
230+ res . on ( 'end' , ( ) => {
231+ console . log ( 'Client response ended, total chunks received:' , streamedChunks . length ) ;
232+ resolve ( {
233+ statusCode : res . statusCode || 0 ,
234+ streamedData : streamedChunks ,
235+ } ) ;
236+ } ) ;
237+ res . on ( 'error' , ( e ) => {
238+ reject ( e ) ;
239+ } ) ;
240+ } ) ;
241+ req . on ( 'error' , ( e ) => {
242+ reject ( e ) ;
243+ } ) ;
244+ } ) ;
245+ } ;
246+
117247 beforeAll ( async ( ) => {
118248 await app . ready ( ) ;
119249 await app . listen ( { port : 0 } ) ;
@@ -126,21 +256,8 @@ describe('incremental render NDJSON endpoint', () => {
126256 } ) ;
127257
128258 test ( 'calls handleIncrementalRenderRequest immediately after first chunk and processes each subsequent chunk immediately' , async ( ) => {
129- // Create a bundle for this test
130- await createVmBundle ( TEST_NAME ) ;
131-
132- const { sink, sinkAddCalls, sinkEnd, sinkAbort } = createMockSink ( ) ;
133-
134- const mockResponse : ResponseResult = createMockResponse ( ) ;
135-
136- const mockResult : incremental . IncrementalRenderResult = createMockResult ( sink , mockResponse ) ;
137-
138- const resultPromise = Promise . resolve ( mockResult ) ;
139- const handleSpy = jest
140- . spyOn ( incremental , 'handleIncrementalRenderRequest' )
141- . mockImplementation ( ( ) => resultPromise ) ;
142-
143- const SERVER_BUNDLE_TIMESTAMP = String ( BUNDLE_TIMESTAMP ) ;
259+ const { sink, sinkAddCalls, sinkEnd, sinkAbort, handleSpy, SERVER_BUNDLE_TIMESTAMP } =
260+ await createBasicTestSetup ( ) ;
144261
145262 // Create the HTTP request
146263 const req = createHttpRequest ( SERVER_BUNDLE_TIMESTAMP ) ;
@@ -164,26 +281,21 @@ describe('incremental render NDJSON endpoint', () => {
164281 // Send subsequent props chunks one by one and verify immediate processing
165282 const chunksToSend = [ { a : 1 } , { b : 2 } , { c : 3 } ] ;
166283
167- for ( let i = 0 ; i < chunksToSend . length ; i += 1 ) {
168- const chunk = chunksToSend [ i ] ;
169- const expectedCallsBeforeWrite = i ;
284+ await sendChunksAndWaitForProcessing ( req , chunksToSend , async ( chunk , index ) => {
285+ const expectedCallsBeforeWrite = index ;
170286
171287 // Verify state before writing this chunk
172288 expect ( sinkAddCalls ) . toHaveLength ( expectedCallsBeforeWrite ) ;
173289
174- // Write the chunk
175- req . write ( `${ JSON . stringify ( chunk ) } \n` ) ;
176-
177290 // Wait for the chunk to be processed
178- // eslint-disable-next-line no-await-in-loop
179291 await waitFor ( ( ) => {
180292 expect ( sinkAddCalls ) . toHaveLength ( expectedCallsBeforeWrite + 1 ) ;
181293 } ) ;
182294
183295 // Verify the chunk was processed immediately
184296 expect ( sinkAddCalls ) . toHaveLength ( expectedCallsBeforeWrite + 1 ) ;
185297 expect ( sinkAddCalls [ expectedCallsBeforeWrite ] ) . toEqual ( chunk ) ;
186- }
298+ } ) ;
187299
188300 req . end ( ) ;
189301
@@ -414,9 +526,6 @@ describe('incremental render NDJSON endpoint', () => {
414526 } ) ;
415527
416528 test ( 'streaming response - client receives all streamed chunks in real-time' , async ( ) => {
417- // Create a bundle for this test
418- await createVmBundle ( TEST_NAME ) ;
419-
420529 const responseChunks = [
421530 'Hello from stream' ,
422531 'Chunk 1' ,
@@ -427,94 +536,26 @@ describe('incremental render NDJSON endpoint', () => {
427536 'Goodbye from stream' ,
428537 ] ;
429538
430- // Create a readable stream that yields chunks every 10ms
431- const { Readable } = await import ( 'stream' ) ;
432- let responseStreamInitialized = false ;
433- const responseStream = new Readable ( {
434- read ( ) {
435- if ( responseStreamInitialized ) {
436- return ;
437- }
438-
439- responseStreamInitialized = true ;
440- let chunkIndex = 0 ;
441- const intervalId = setInterval ( ( ) => {
442- if ( chunkIndex < responseChunks . length ) {
443- console . log ( 'Pushing response chunk:' , responseChunks [ chunkIndex ] ) ;
444- this . push ( responseChunks [ chunkIndex ] ) ;
445- chunkIndex += 1 ;
446- } else {
447- clearInterval ( intervalId ) ;
448- console . log ( 'Ending response stream' ) ;
449- this . push ( null ) ;
450- }
451- } , 10 ) ;
452- } ,
453- } ) ;
454-
455- // Track processed chunks to verify immediate processing
456- const processedChunks : unknown [ ] = [ ] ;
457-
458- const sinkAdd = jest . fn ( ) ;
459- // Create a sink that records processed chunks
460- const sink : incremental . IncrementalRenderSink = {
461- add : ( chunk ) => {
462- console . log ( 'Sink.add called with chunk:' , chunk ) ;
463- processedChunks . push ( chunk ) ;
464- sinkAdd ( chunk ) ;
465- } ,
466- end : jest . fn ( ) ,
467- abort : jest . fn ( ) ,
468- } ;
469-
470- // Create a response with the streaming response
471- const mockResponse : ResponseResult = {
472- status : 200 ,
473- headers : { 'Cache-Control' : 'no-cache, no-store, max-age=0, must-revalidate' } ,
474- stream : responseStream ,
475- } ;
476-
477- const mockResult : incremental . IncrementalRenderResult = {
478- response : mockResponse ,
479- sink,
480- } ;
481-
482- const resultPromise = Promise . resolve ( mockResult ) ;
483- const handleSpy = jest
484- . spyOn ( incremental , 'handleIncrementalRenderRequest' )
485- . mockImplementation ( ( ) => resultPromise ) ;
486-
487- const SERVER_BUNDLE_TIMESTAMP = String ( BUNDLE_TIMESTAMP ) ;
539+ const { responseStream, processedChunks, sinkAdd, sink, handleSpy, SERVER_BUNDLE_TIMESTAMP } =
540+ await createStreamingTestSetup ( ) ;
541+
542+ // write the response chunks to the stream
543+ let sentChunkIndex = 0 ;
544+ const intervalId = setInterval ( ( ) => {
545+ if ( sentChunkIndex < responseChunks . length ) {
546+ responseStream . push ( responseChunks [ sentChunkIndex ] || null ) ;
547+ sentChunkIndex += 1 ;
548+ } else {
549+ responseStream . push ( null ) ;
550+ clearInterval ( intervalId ) ;
551+ }
552+ } , 10 ) ;
488553
489554 // Create the HTTP request
490555 const req = createHttpRequest ( SERVER_BUNDLE_TIMESTAMP ) ;
491556
492557 // Set up promise to capture the streaming response
493- const responsePromise = new Promise < { statusCode : number ; streamedData : string [ ] } > ( ( resolve , reject ) => {
494- const streamedChunks : string [ ] = [ ] ;
495-
496- req . on ( 'response' , ( res ) => {
497- res . on ( 'data' , ( chunk : Buffer ) => {
498- // Capture each chunk of the streaming response
499- const chunkStr = chunk . toString ( ) ;
500- console . log ( 'Client received chunk:' , chunkStr ) ;
501- streamedChunks . push ( chunkStr ) ;
502- } ) ;
503- res . on ( 'end' , ( ) => {
504- console . log ( 'Client response ended, total chunks received:' , streamedChunks . length ) ;
505- resolve ( {
506- statusCode : res . statusCode || 0 ,
507- streamedData : streamedChunks ,
508- } ) ;
509- } ) ;
510- res . on ( 'error' , ( e ) => {
511- reject ( e ) ;
512- } ) ;
513- } ) ;
514- req . on ( 'error' , ( e ) => {
515- reject ( e ) ;
516- } ) ;
517- } ) ;
558+ const responsePromise = createStreamingResponsePromise ( req ) ;
518559
519560 // Write first object (valid JSON)
520561 const initialObj = createInitialObject ( SERVER_BUNDLE_TIMESTAMP ) ;
@@ -536,13 +577,11 @@ describe('incremental render NDJSON endpoint', () => {
536577 { type : 'update' , data : 'chunk3' } ,
537578 ] ;
538579
539- for ( const chunk of chunksToSend ) {
540- req . write ( `${ JSON . stringify ( chunk ) } \n` ) ;
541- // eslint-disable-next-line no-await-in-loop
580+ await sendChunksAndWaitForProcessing ( req , chunksToSend , async ( chunk ) => {
542581 await waitFor ( ( ) => {
543582 expect ( sinkAdd ) . toHaveBeenCalledWith ( chunk ) ;
544583 } ) ;
545- }
584+ } ) ;
546585
547586 // End the request
548587 console . log ( 'Ending request' ) ;
@@ -562,7 +601,7 @@ describe('incremental render NDJSON endpoint', () => {
562601 // Verify that each chunk was received in order
563602 responseChunks . forEach ( ( expectedChunk , index ) => {
564603 const receivedChunk = response . streamedData [ index ] ;
565- expect ( receivedChunk ) . toContain ( expectedChunk ) ;
604+ expect ( receivedChunk ) . toEqual ( expectedChunk ) ;
566605 } ) ;
567606
568607 // Verify that all request chunks were processed
0 commit comments