Skip to content

Commit f453926

Browse files
committed
refactor out readline
1 parent f5df9a2 commit f453926

File tree

1 file changed

+118
-78
lines changed

1 file changed

+118
-78
lines changed

src/statement/stream/serverSideStream.ts

Lines changed: 118 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { Readable } from "stream";
22
import JSONbig from "json-bigint";
3-
import readline from "readline";
43
import {
54
getNormalizedMeta,
65
normalizeResponseRowStreaming
@@ -11,75 +10,121 @@ import { Meta } from "../../meta";
1110

1211
export class ServerSideStream extends Readable {
1312
private meta: Meta[] = [];
14-
private readlineInterface: readline.Interface | null = null;
1513
private pendingRows: Row[] = [];
1614
private finished = false;
1715
private processingData = false;
18-
private readlineInterfacePaused = false;
16+
private inputPaused = false;
1917
private readonly maxPendingRows = 5; // Limit pending rows to prevent memory buildup
18+
private lineBuffer = "";
19+
private sourceStream: NodeJS.ReadableStream | null = null;
2020

2121
constructor(
2222
private readonly response: Response,
2323
private readonly executeQueryOptions: ExecuteQueryOptions
2424
) {
2525
super({ objectMode: true });
26-
this.setupReadline();
26+
this.setupInputStream();
2727
}
2828

29-
private setupReadline() {
30-
this.readlineInterface = readline.createInterface({
31-
input: this.response.body,
32-
crlfDelay: Infinity
33-
});
29+
private setupInputStream() {
30+
this.sourceStream = this.response.body;
3431

35-
const lineParser = (line: string) => {
36-
try {
37-
if (line.trim()) {
38-
const parsed = JSONbig.parse(line);
39-
if (parsed) {
40-
if (parsed.message_type === "DATA") {
41-
this.handleDataMessage(parsed);
42-
} else if (parsed.message_type === "START") {
43-
this.meta = getNormalizedMeta(parsed.result_columns);
44-
this.emit("meta", this.meta);
45-
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
46-
this.finished = true;
47-
this.tryPushPendingData();
48-
} else if (parsed.message_type === "FINISH_WITH_ERRORS") {
49-
// Ensure readline interface is resumed before destroying to prevent hanging
50-
if (this.readlineInterface && this.readlineInterfacePaused) {
51-
this.readlineInterface.resume();
52-
this.readlineInterfacePaused = false;
53-
}
54-
this.destroy(
55-
new Error(
56-
`Result encountered an error: ${parsed.errors
57-
.map((error: { description: string }) => error.description)
58-
.join("\n")}`
59-
)
60-
);
61-
}
62-
} else {
63-
this.destroy(new Error(`Result row could not be parsed: ${line}`));
64-
}
65-
}
66-
} catch (err) {
67-
this.destroy(err);
68-
}
69-
};
32+
if (!this.sourceStream) {
33+
this.destroy(new Error("Response body is null or undefined"));
34+
return;
35+
}
7036

71-
this.readlineInterface.on("line", lineParser);
37+
this.sourceStream.on("data", (chunk: Buffer) => {
38+
this.handleData(chunk);
39+
});
7240

73-
this.readlineInterface.on("close", () => {
74-
this.finished = true;
75-
this.tryPushPendingData();
41+
this.sourceStream.on("end", () => {
42+
this.handleInputEnd();
7643
});
7744

78-
this.readlineInterface.on("error", err => {
45+
this.sourceStream.on("error", (err: Error) => {
7946
this.destroy(err);
8047
});
8148
}
8249

50+
private handleData(chunk: Buffer) {
51+
// Convert chunk to string and add to line buffer
52+
this.lineBuffer += chunk.toString();
53+
54+
// Process complete lines
55+
let lineStart = 0;
56+
let lineEnd = this.lineBuffer.indexOf("\n", lineStart);
57+
58+
while (lineEnd !== -1) {
59+
const line = this.lineBuffer.slice(lineStart, lineEnd);
60+
this.processLine(line.trim());
61+
62+
lineStart = lineEnd + 1;
63+
lineEnd = this.lineBuffer.indexOf("\n", lineStart);
64+
}
65+
66+
// Keep remaining partial line in buffer
67+
this.lineBuffer = this.lineBuffer.slice(lineStart);
68+
69+
// Apply backpressure if we have too many pending rows
70+
if (
71+
this.pendingRows.length > this.maxPendingRows &&
72+
this.sourceStream &&
73+
!this.inputPaused &&
74+
!this.processingData
75+
) {
76+
this.sourceStream.pause();
77+
this.inputPaused = true;
78+
}
79+
}
80+
81+
private handleInputEnd() {
82+
// Process any remaining line in buffer
83+
if (this.lineBuffer.trim()) {
84+
this.processLine(this.lineBuffer.trim());
85+
this.lineBuffer = "";
86+
}
87+
88+
this.finished = true;
89+
this.tryPushPendingData();
90+
}
91+
92+
private processLine(line: string) {
93+
if (!line) return;
94+
95+
try {
96+
const parsed = JSONbig.parse(line);
97+
if (parsed) {
98+
if (parsed.message_type === "DATA") {
99+
this.handleDataMessage(parsed);
100+
} else if (parsed.message_type === "START") {
101+
this.meta = getNormalizedMeta(parsed.result_columns);
102+
this.emit("meta", this.meta);
103+
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
104+
this.finished = true;
105+
this.tryPushPendingData();
106+
} else if (parsed.message_type === "FINISH_WITH_ERRORS") {
107+
// Ensure source stream is resumed before destroying to prevent hanging
108+
if (this.sourceStream && this.inputPaused) {
109+
this.sourceStream.resume();
110+
this.inputPaused = false;
111+
}
112+
this.destroy(
113+
new Error(
114+
`Result encountered an error: ${parsed.errors
115+
.map((error: { description: string }) => error.description)
116+
.join("\n")}`
117+
)
118+
);
119+
}
120+
} else {
121+
this.destroy(new Error(`Result row could not be parsed: ${line}`));
122+
}
123+
} catch (err) {
124+
this.destroy(err);
125+
}
126+
}
127+
83128
private handleDataMessage(parsed: { data: unknown[] }) {
84129
if (parsed.data) {
85130
// Process rows one by one to handle backpressure properly
@@ -92,18 +137,6 @@ export class ServerSideStream extends Readable {
92137
// Add to pending rows buffer
93138
this.pendingRows.push(...normalizedData);
94139

95-
// If we have too many pending rows, pause the readline interface to apply backpressure
96-
// Only pause if we're not already processing and have significantly exceeded the limit
97-
if (
98-
this.pendingRows.length > this.maxPendingRows &&
99-
this.readlineInterface &&
100-
!this.readlineInterfacePaused &&
101-
!this.processingData
102-
) {
103-
this.readlineInterface.pause();
104-
this.readlineInterfacePaused = true;
105-
}
106-
107140
// Try to push data immediately if not already processing
108141
if (!this.processingData) {
109142
this.tryPushPendingData();
@@ -122,14 +155,14 @@ export class ServerSideStream extends Readable {
122155
const row = this.pendingRows.shift();
123156
const canContinue = this.push(row);
124157

125-
// If pending rows dropped below threshold, resume the readline interface
158+
// If pending rows dropped below threshold, resume the source stream
126159
if (
127160
this.pendingRows.length <= this.maxPendingRows / 4 &&
128-
this.readlineInterface &&
129-
this.readlineInterfacePaused
161+
this.sourceStream &&
162+
this.inputPaused
130163
) {
131-
this.readlineInterface.resume();
132-
this.readlineInterfacePaused = false;
164+
this.sourceStream.resume();
165+
this.inputPaused = false;
133166
}
134167

135168
// If push returns false, stop pushing and wait for _read to be called
@@ -155,26 +188,33 @@ export class ServerSideStream extends Readable {
155188
this.tryPushPendingData();
156189
}
157190

158-
// Also resume readline interface if it was paused and we have capacity
191+
// Also resume source stream if it was paused and we have capacity
159192
if (
160-
this.readlineInterface &&
161-
this.readlineInterfacePaused &&
193+
this.sourceStream &&
194+
this.inputPaused &&
162195
this.pendingRows.length < this.maxPendingRows / 2
163196
) {
164-
this.readlineInterface.resume();
165-
this.readlineInterfacePaused = false;
197+
this.sourceStream.resume();
198+
this.inputPaused = false;
166199
}
167200
}
168201

169202
_destroy(err: Error | null, callback: (error?: Error | null) => void) {
170-
if (this.readlineInterface) {
171-
// Resume interface if paused to ensure proper cleanup
172-
if (this.readlineInterfacePaused) {
173-
this.readlineInterface.resume();
174-
this.readlineInterfacePaused = false;
203+
if (this.sourceStream) {
204+
// Resume stream if paused to ensure proper cleanup
205+
if (this.inputPaused) {
206+
this.sourceStream.resume();
207+
this.inputPaused = false;
208+
}
209+
210+
// Only call destroy if it exists (for Node.js streams)
211+
const destroyableStream = this.sourceStream as unknown as {
212+
destroy?: () => void;
213+
};
214+
if (typeof destroyableStream.destroy === "function") {
215+
destroyableStream.destroy();
175216
}
176-
this.readlineInterface.close();
177-
this.readlineInterface = null;
217+
this.sourceStream = null;
178218
}
179219
callback(err);
180220
}

0 commit comments

Comments
 (0)