11import { Readable } from "stream" ;
22import JSONbig from "json-bigint" ;
3- import readline from "readline" ;
43import {
54 getNormalizedMeta ,
65 normalizeResponseRowStreaming
@@ -11,75 +10,121 @@ import { Meta } from "../../meta";
1110
1211export class ServerSideStream extends Readable {
1312 private meta : Meta [ ] = [ ] ;
14- private readlineInterface : readline . Interface | null = null ;
15- private pendingRows : Row [ ] = [ ] ;
13+ private readonly pendingRows : Row [ ] = [ ] ;
1614 private finished = false ;
1715 private processingData = false ;
18- private readlineInterfacePaused = false ;
16+ private inputPaused = false ;
1917 private readonly maxPendingRows = 5 ; // Limit pending rows to prevent memory buildup
18+ private lineBuffer = "" ;
19+ private sourceStream : NodeJS . ReadableStream | null = null ;
2020
2121 constructor (
2222 private readonly response : Response ,
2323 private readonly executeQueryOptions : ExecuteQueryOptions
2424 ) {
2525 super ( { objectMode : true } ) ;
26- this . setupReadline ( ) ;
26+ this . setupInputStream ( ) ;
2727 }
2828
29- private setupReadline ( ) {
30- this . readlineInterface = readline . createInterface ( {
31- input : this . response . body ,
32- crlfDelay : Infinity
33- } ) ;
29+ private setupInputStream ( ) {
30+ this . sourceStream = this . response . body ;
3431
35- const lineParser = ( line : string ) => {
36- try {
37- if ( line . trim ( ) ) {
38- const parsed = JSONbig . parse ( line ) ;
39- if ( parsed ) {
40- if ( parsed . message_type === "DATA" ) {
41- this . handleDataMessage ( parsed ) ;
42- } else if ( parsed . message_type === "START" ) {
43- this . meta = getNormalizedMeta ( parsed . result_columns ) ;
44- this . emit ( "meta" , this . meta ) ;
45- } else if ( parsed . message_type === "FINISH_SUCCESSFULLY" ) {
46- this . finished = true ;
47- this . tryPushPendingData ( ) ;
48- } else if ( parsed . message_type === "FINISH_WITH_ERRORS" ) {
49- // Ensure readline interface is resumed before destroying to prevent hanging
50- if ( this . readlineInterface && this . readlineInterfacePaused ) {
51- this . readlineInterface . resume ( ) ;
52- this . readlineInterfacePaused = false ;
53- }
54- this . destroy (
55- new Error (
56- `Result encountered an error: ${ parsed . errors
57- . map ( ( error : { description : string } ) => error . description )
58- . join ( "\n" ) } `
59- )
60- ) ;
61- }
62- } else {
63- this . destroy ( new Error ( `Result row could not be parsed: ${ line } ` ) ) ;
64- }
65- }
66- } catch ( err ) {
67- this . destroy ( err ) ;
68- }
69- } ;
32+ if ( ! this . sourceStream ) {
33+ this . destroy ( new Error ( "Response body is null or undefined" ) ) ;
34+ return ;
35+ }
7036
71- this . readlineInterface . on ( "line" , lineParser ) ;
37+ this . sourceStream . on ( "data" , ( chunk : Buffer ) => {
38+ this . handleData ( chunk ) ;
39+ } ) ;
7240
73- this . readlineInterface . on ( "close" , ( ) => {
74- this . finished = true ;
75- this . tryPushPendingData ( ) ;
41+ this . sourceStream . on ( "end" , ( ) => {
42+ this . handleInputEnd ( ) ;
7643 } ) ;
7744
78- this . readlineInterface . on ( "error" , err => {
45+ this . sourceStream . on ( "error" , ( err : Error ) => {
7946 this . destroy ( err ) ;
8047 } ) ;
8148 }
8249
50+ private handleData ( chunk : Buffer ) {
51+ // Convert chunk to string and add to line buffer
52+ this . lineBuffer += chunk . toString ( ) ;
53+
54+ // Process complete lines
55+ let lineStart = 0 ;
56+ let lineEnd = this . lineBuffer . indexOf ( "\n" , lineStart ) ;
57+
58+ while ( lineEnd !== - 1 ) {
59+ const line = this . lineBuffer . slice ( lineStart , lineEnd ) ;
60+ this . processLine ( line . trim ( ) ) ;
61+
62+ lineStart = lineEnd + 1 ;
63+ lineEnd = this . lineBuffer . indexOf ( "\n" , lineStart ) ;
64+ }
65+
66+ // Keep remaining partial line in buffer
67+ this . lineBuffer = this . lineBuffer . slice ( lineStart ) ;
68+
69+ // Apply backpressure if we have too many pending rows
70+ if (
71+ this . pendingRows . length > this . maxPendingRows &&
72+ this . sourceStream &&
73+ ! this . inputPaused &&
74+ ! this . processingData
75+ ) {
76+ this . sourceStream . pause ( ) ;
77+ this . inputPaused = true ;
78+ }
79+ }
80+
81+ private handleInputEnd ( ) {
82+ // Process any remaining line in buffer
83+ if ( this . lineBuffer . trim ( ) ) {
84+ this . processLine ( this . lineBuffer . trim ( ) ) ;
85+ this . lineBuffer = "" ;
86+ }
87+
88+ this . finished = true ;
89+ this . tryPushPendingData ( ) ;
90+ }
91+
92+ private processLine ( line : string ) {
93+ if ( ! line ) return ;
94+
95+ try {
96+ const parsed = JSONbig . parse ( line ) ;
97+ if ( parsed ) {
98+ if ( parsed . message_type === "DATA" ) {
99+ this . handleDataMessage ( parsed ) ;
100+ } else if ( parsed . message_type === "START" ) {
101+ this . meta = getNormalizedMeta ( parsed . result_columns ) ;
102+ this . emit ( "meta" , this . meta ) ;
103+ } else if ( parsed . message_type === "FINISH_SUCCESSFULLY" ) {
104+ this . finished = true ;
105+ this . tryPushPendingData ( ) ;
106+ } else if ( parsed . message_type === "FINISH_WITH_ERRORS" ) {
107+ // Ensure source stream is resumed before destroying to prevent hanging
108+ if ( this . sourceStream && this . inputPaused ) {
109+ this . sourceStream . resume ( ) ;
110+ this . inputPaused = false ;
111+ }
112+ this . destroy (
113+ new Error (
114+ `Result encountered an error: ${ parsed . errors
115+ . map ( ( error : { description : string } ) => error . description )
116+ . join ( "\n" ) } `
117+ )
118+ ) ;
119+ }
120+ } else {
121+ this . destroy ( new Error ( `Result row could not be parsed: ${ line } ` ) ) ;
122+ }
123+ } catch ( err ) {
124+ this . destroy ( err ) ;
125+ }
126+ }
127+
83128 private handleDataMessage ( parsed : { data : unknown [ ] } ) {
84129 if ( parsed . data ) {
85130 // Process rows one by one to handle backpressure properly
@@ -92,18 +137,6 @@ export class ServerSideStream extends Readable {
92137 // Add to pending rows buffer
93138 this . pendingRows . push ( ...normalizedData ) ;
94139
95- // If we have too many pending rows, pause the readline interface to apply backpressure
96- // Only pause if we're not already processing and have significantly exceeded the limit
97- if (
98- this . pendingRows . length > this . maxPendingRows &&
99- this . readlineInterface &&
100- ! this . readlineInterfacePaused &&
101- ! this . processingData
102- ) {
103- this . readlineInterface . pause ( ) ;
104- this . readlineInterfacePaused = true ;
105- }
106-
107140 // Try to push data immediately if not already processing
108141 if ( ! this . processingData ) {
109142 this . tryPushPendingData ( ) ;
@@ -122,14 +155,14 @@ export class ServerSideStream extends Readable {
122155 const row = this . pendingRows . shift ( ) ;
123156 const canContinue = this . push ( row ) ;
124157
125- // If pending rows dropped below threshold, resume the readline interface
158+ // If pending rows dropped below threshold, resume the source stream
126159 if (
127160 this . pendingRows . length <= this . maxPendingRows / 4 &&
128- this . readlineInterface &&
129- this . readlineInterfacePaused
161+ this . sourceStream &&
162+ this . inputPaused
130163 ) {
131- this . readlineInterface . resume ( ) ;
132- this . readlineInterfacePaused = false ;
164+ this . sourceStream . resume ( ) ;
165+ this . inputPaused = false ;
133166 }
134167
135168 // If push returns false, stop pushing and wait for _read to be called
@@ -155,26 +188,33 @@ export class ServerSideStream extends Readable {
155188 this . tryPushPendingData ( ) ;
156189 }
157190
158- // Also resume readline interface if it was paused and we have capacity
191+ // Also resume source stream if it was paused and we have capacity
159192 if (
160- this . readlineInterface &&
161- this . readlineInterfacePaused &&
193+ this . sourceStream &&
194+ this . inputPaused &&
162195 this . pendingRows . length < this . maxPendingRows / 2
163196 ) {
164- this . readlineInterface . resume ( ) ;
165- this . readlineInterfacePaused = false ;
197+ this . sourceStream . resume ( ) ;
198+ this . inputPaused = false ;
166199 }
167200 }
168201
169202 _destroy ( err : Error | null , callback : ( error ?: Error | null ) => void ) {
170- if ( this . readlineInterface ) {
171- // Resume interface if paused to ensure proper cleanup
172- if ( this . readlineInterfacePaused ) {
173- this . readlineInterface . resume ( ) ;
174- this . readlineInterfacePaused = false ;
203+ if ( this . sourceStream ) {
204+ // Resume stream if paused to ensure proper cleanup
205+ if ( this . inputPaused ) {
206+ this . sourceStream . resume ( ) ;
207+ this . inputPaused = false ;
208+ }
209+
210+ // Only call destroy if it exists (for Node.js streams)
211+ const destroyableStream = this . sourceStream as unknown as {
212+ destroy ?: ( ) => void ;
213+ } ;
214+ if ( typeof destroyableStream . destroy === "function" ) {
215+ destroyableStream . destroy ( ) ;
175216 }
176- this . readlineInterface . close ( ) ;
177- this . readlineInterface = null ;
217+ this . sourceStream = null ;
178218 }
179219 callback ( err ) ;
180220 }
0 commit comments