@@ -210,18 +210,18 @@ describe('incremental render NDJSON endpoint', () => {
210210 * Helper function to create streaming response promise
211211 */
212212 const createStreamingResponsePromise = ( req : http . ClientRequest ) => {
213- return new Promise < { statusCode : number ; streamedData : string [ ] } > ( ( resolve , reject ) => {
214- const streamedChunks : string [ ] = [ ] ;
215-
213+ const receivedChunks : string [ ] = [ ] ;
214+
215+ const promise = new Promise < { statusCode : number ; streamedData : string [ ] } > ( ( resolve , reject ) => {
216216 req . on ( 'response' , ( res ) => {
217217 res . on ( 'data' , ( chunk : Buffer ) => {
218218 const chunkStr = chunk . toString ( ) ;
219- streamedChunks . push ( chunkStr ) ;
219+ receivedChunks . push ( chunkStr ) ;
220220 } ) ;
221221 res . on ( 'end' , ( ) => {
222222 resolve ( {
223223 statusCode : res . statusCode || 0 ,
224- streamedData : streamedChunks ,
224+ streamedData : [ ... receivedChunks ] , // Return a copy
225225 } ) ;
226226 } ) ;
227227 res . on ( 'error' , ( e ) => {
@@ -232,6 +232,8 @@ describe('incremental render NDJSON endpoint', () => {
232232 reject ( e ) ;
233233 } ) ;
234234 } ) ;
235+
236+ return { promise, receivedChunks } ;
235237 } ;
236238
237239 beforeAll ( async ( ) => {
@@ -544,7 +546,7 @@ describe('incremental render NDJSON endpoint', () => {
544546 const req = createHttpRequest ( SERVER_BUNDLE_TIMESTAMP ) ;
545547
546548 // Set up promise to capture the streaming response
547- const responsePromise = createStreamingResponsePromise ( req ) ;
549+ const { promise } = createStreamingResponsePromise ( req ) ;
548550
549551 // Write first object (valid JSON)
550552 const initialObj = createInitialObject ( SERVER_BUNDLE_TIMESTAMP ) ;
@@ -575,7 +577,7 @@ describe('incremental render NDJSON endpoint', () => {
575577 req . end ( ) ;
576578
577579 // Wait for the request to complete and capture the streaming response
578- const response = await responsePromise ;
580+ const response = await promise ;
579581
580582 // Verify the response status
581583 expect ( response . statusCode ) . toBe ( 200 ) ;
@@ -602,4 +604,101 @@ describe('incremental render NDJSON endpoint', () => {
602604 expect ( sink . end ) . toHaveBeenCalled ( ) ;
603605 } ) ;
604606 } ) ;
607+
608+ test ( 'echo server - processes each chunk and immediately streams it back' , async ( ) => {
609+ const { responseStream, sinkAdd, sink, handleSpy, SERVER_BUNDLE_TIMESTAMP } =
610+ await createStreamingTestSetup ( ) ;
611+
612+ // Create the HTTP request
613+ const req = createHttpRequest ( SERVER_BUNDLE_TIMESTAMP ) ;
614+
615+ // Set up promise to capture the streaming response
616+ const { promise, receivedChunks } = createStreamingResponsePromise ( req ) ;
617+
618+ // Write first object (valid JSON)
619+ const initialObj = createInitialObject ( SERVER_BUNDLE_TIMESTAMP ) ;
620+ req . write ( `${ JSON . stringify ( initialObj ) } \n` ) ;
621+
622+ // Wait for the server to process the first object and set up the response
623+ await waitFor ( ( ) => {
624+ expect ( handleSpy ) . toHaveBeenCalledTimes ( 1 ) ;
625+ } ) ;
626+
627+ // Verify handleIncrementalRenderRequest was called
628+ expect ( handleSpy ) . toHaveBeenCalledTimes ( 1 ) ;
629+
630+ // Send chunks one by one and verify immediate processing and echoing
631+ const chunksToSend = [
632+ { type : 'update' , data : 'chunk1' } ,
633+ { type : 'update' , data : 'chunk2' } ,
634+ { type : 'update' , data : 'chunk3' } ,
635+ { type : 'update' , data : 'chunk4' } ,
636+ ] ;
637+
638+ // Process each chunk and immediately echo it back
639+ for ( let i = 0 ; i < chunksToSend . length ; i += 1 ) {
640+ const chunk = chunksToSend [ i ] ;
641+
642+ // Send the chunk
643+ req . write ( `${ JSON . stringify ( chunk ) } \n` ) ;
644+
645+ // Wait for the chunk to be processed
646+ // eslint-disable-next-line no-await-in-loop
647+ await waitFor ( ( ) => {
648+ expect ( sinkAdd ) . toHaveBeenCalledWith ( chunk ) ;
649+ } ) ;
650+
651+ // Immediately echo the chunk back through the stream
652+ const echoResponse = `processed ${ JSON . stringify ( chunk ) } ` ;
653+ responseStream . push ( echoResponse ) ;
654+
655+ // Wait for the echo response to be received by the client
656+ // eslint-disable-next-line no-await-in-loop
657+ await waitFor ( ( ) => {
658+ expect ( receivedChunks [ i ] ) . toEqual ( echoResponse ) ;
659+ } ) ;
660+
661+ // Wait a moment to ensure the echo is sent
662+ // eslint-disable-next-line no-await-in-loop
663+ await new Promise < void > ( ( resolve ) => {
664+ setTimeout ( resolve , 10 ) ;
665+ } ) ;
666+ }
667+
668+ // End the stream to signal no more data
669+ responseStream . push ( null ) ;
670+
671+ // End the request
672+ req . end ( ) ;
673+
674+ // Wait for the request to complete and capture the streaming response
675+ const response = await promise ;
676+
677+ // Verify the response status
678+ expect ( response . statusCode ) . toBe ( 200 ) ;
679+
680+ // Verify that we received echo responses for each chunk
681+ expect ( response . streamedData ) . toHaveLength ( chunksToSend . length ) ;
682+
683+ // Verify that each chunk was echoed back correctly
684+ chunksToSend . forEach ( ( chunk , index ) => {
685+ const expectedEcho = `processed ${ JSON . stringify ( chunk ) } ` ;
686+ const receivedEcho = response . streamedData [ index ] ;
687+ expect ( receivedEcho ) . toEqual ( expectedEcho ) ;
688+ } ) ;
689+
690+ // Verify that all request chunks were processed
691+ expect ( sinkAdd ) . toHaveBeenCalledTimes ( chunksToSend . length ) ;
692+ chunksToSend . forEach ( ( chunk , index ) => {
693+ expect ( sinkAdd ) . toHaveBeenNthCalledWith ( index + 1 , chunk ) ;
694+ } ) ;
695+
696+ // Verify that the mock was called correctly
697+ expect ( handleSpy ) . toHaveBeenCalledTimes ( 1 ) ;
698+
699+ // Verify that the sink.end was called
700+ await waitFor ( ( ) => {
701+ expect ( sink . end ) . toHaveBeenCalled ( ) ;
702+ } ) ;
703+ } ) ;
605704} ) ;
0 commit comments