Skip to content

Commit 9f3eb53

Browse files
authored
fix(FIR-50188): memory leak in streaming (#148)
1 parent 49cb9e5 commit 9f3eb53

File tree

7 files changed

+1349
-49
lines changed

7 files changed

+1349
-49
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"build": "rm -fr ./build && tsc -p tsconfig.lib.json",
1212
"release": "standard-version",
1313
"test": "jest",
14-
"test:ci": "jest --ci --bail",
14+
"test:ci": "node --expose-gc ./node_modules/.bin/jest --ci --bail",
1515
"type-check": "tsc -p tsconfig.lib.json"
1616
},
1717
"prettier": {

src/statement/normalizeResponse.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,14 @@ export const normalizeResponseRowStreaming = (
105105
const { response: { normalizeData = false } = {} } = executeQueryOptions;
106106

107107
const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;
108-
109-
return data.map((row: Row) => {
110-
const hydratedRow = hydrate(row, meta, executeQueryOptions);
108+
const result: Row[] = new Array(data.length);
109+
for (let i = 0; i < data.length; i++) {
110+
const hydratedRow = hydrate(data[i], meta, executeQueryOptions);
111111
if (normalizeData) {
112-
return normalizeRow(hydratedRow, meta, executeQueryOptions);
112+
result[i] = normalizeRow(hydratedRow, meta, executeQueryOptions);
113+
} else {
114+
result[i] = hydratedRow;
113115
}
114-
return hydratedRow;
115-
});
116+
}
117+
return result;
116118
};
Lines changed: 175 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,211 @@
11
import { Readable } from "stream";
22
import JSONbig from "json-bigint";
3-
import readline from "readline";
43
import {
54
getNormalizedMeta,
65
normalizeResponseRowStreaming
76
} from "../normalizeResponse";
87
import { Response } from "node-fetch";
9-
import { ExecuteQueryOptions } from "../../types";
8+
import { ExecuteQueryOptions, Row } from "../../types";
109
import { Meta } from "../../meta";
1110

1211
export class ServerSideStream extends Readable {
1312
private meta: Meta[] = [];
13+
private readonly pendingRows: Row[] = [];
14+
private finished = false;
15+
private processingData = false;
16+
private inputPaused = false;
17+
private readonly bufferGrowthThreshold = 10; // Stop adding to buffer when over this many rows are already in
18+
private lineBuffer = "";
19+
private sourceStream: NodeJS.ReadableStream | null = null;
20+
1421
constructor(
1522
private readonly response: Response,
1623
private readonly executeQueryOptions: ExecuteQueryOptions
1724
) {
1825
super({ objectMode: true });
19-
const readLine = readline.createInterface({
20-
input: response.body,
21-
crlfDelay: Infinity
26+
this.setupInputStream();
27+
}
28+
29+
private setupInputStream() {
30+
this.sourceStream = this.response.body;
31+
32+
if (!this.sourceStream) {
33+
this.destroy(new Error("Response body is null or undefined"));
34+
return;
35+
}
36+
37+
this.sourceStream.on("data", (chunk: Buffer) => {
38+
this.handleData(chunk);
2239
});
2340

24-
const lineParser = (line: string) => {
25-
try {
26-
if (line.trim()) {
27-
const parsed = JSONbig.parse(line);
28-
if (parsed) {
29-
if (parsed.message_type === "DATA") {
30-
this.processData(parsed);
31-
} else if (parsed.message_type === "START") {
32-
this.meta = getNormalizedMeta(parsed.result_columns);
33-
this.emit("meta", this.meta);
34-
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
35-
this.push(null);
36-
} else if (parsed.message_type === "FINISH_WITH_ERRORS") {
37-
this.destroy(
38-
new Error(
39-
`Result encountered an error: ${parsed.errors
40-
.map((error: { description: string }) => error.description)
41-
.join("\n")}`
42-
)
43-
);
44-
}
45-
} else {
46-
this.destroy(new Error(`Result row could not be parsed: ${line}`));
41+
this.sourceStream.on("end", () => {
42+
this.handleInputEnd();
43+
});
44+
45+
this.sourceStream.on("error", (err: Error) => {
46+
this.destroy(err);
47+
});
48+
}
49+
50+
private handleData(chunk: Buffer) {
51+
// Convert chunk to string and add to line buffer
52+
this.lineBuffer += chunk.toString();
53+
54+
// Process complete lines
55+
let lineStart = 0;
56+
let lineEnd = this.lineBuffer.indexOf("\n", lineStart);
57+
58+
while (lineEnd !== -1) {
59+
const line = this.lineBuffer.slice(lineStart, lineEnd);
60+
this.processLine(line.trim());
61+
62+
lineStart = lineEnd + 1;
63+
lineEnd = this.lineBuffer.indexOf("\n", lineStart);
64+
}
65+
66+
// Keep remaining partial line in buffer
67+
this.lineBuffer = this.lineBuffer.slice(lineStart);
68+
69+
// Apply backpressure if we have too many pending rows
70+
if (
71+
this.pendingRows.length > this.bufferGrowthThreshold &&
72+
this.sourceStream &&
73+
!this.inputPaused &&
74+
!this.processingData
75+
) {
76+
this.sourceStream.pause();
77+
this.inputPaused = true;
78+
}
79+
}
80+
81+
private handleInputEnd() {
82+
// Process any remaining line in buffer
83+
if (this.lineBuffer.trim()) {
84+
this.processLine(this.lineBuffer.trim());
85+
this.lineBuffer = "";
86+
}
87+
88+
this.finished = true;
89+
this.tryPushPendingData();
90+
}
91+
92+
private processLine(line: string) {
93+
if (!line) return;
94+
95+
try {
96+
const parsed = JSONbig.parse(line);
97+
if (parsed) {
98+
if (parsed.message_type === "DATA") {
99+
this.handleDataMessage(parsed);
100+
} else if (parsed.message_type === "START") {
101+
this.meta = getNormalizedMeta(parsed.result_columns);
102+
this.emit("meta", this.meta);
103+
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
104+
this.finished = true;
105+
this.tryPushPendingData();
106+
} else if (parsed.message_type === "FINISH_WITH_ERRORS") {
107+
// Ensure source stream is resumed before destroying to prevent hanging
108+
if (this.sourceStream && this.inputPaused) {
109+
this.sourceStream.resume();
110+
this.inputPaused = false;
47111
}
112+
this.destroy(
113+
new Error(
114+
`Result encountered an error: ${parsed.errors
115+
.map((error: { description: string }) => error.description)
116+
.join("\n")}`
117+
)
118+
);
48119
}
49-
} catch (err) {
50-
this.destroy(err);
120+
} else {
121+
this.destroy(new Error(`Result row could not be parsed: ${line}`));
51122
}
52-
};
53-
readLine.on("line", lineParser);
54-
55-
readLine.on("close", () => {
56-
this.push(null);
57-
});
123+
} catch (err) {
124+
this.destroy(err);
125+
}
58126
}
59127

60-
private processData(parsed: { data: any[] }) {
128+
private handleDataMessage(parsed: { data: unknown[] }) {
61129
if (parsed.data) {
130+
// Process rows one by one to handle backpressure properly
62131
const normalizedData = normalizeResponseRowStreaming(
63132
parsed.data,
64133
this.executeQueryOptions,
65134
this.meta
66135
);
67-
for (const data of normalizedData) {
68-
this.emit("data", data);
136+
137+
// Add to pending rows buffer
138+
this.pendingRows.push(...normalizedData);
139+
140+
// Try to push data immediately if not already processing
141+
if (!this.processingData) {
142+
this.tryPushPendingData();
143+
}
144+
}
145+
}
146+
147+
private tryPushPendingData() {
148+
if (this.processingData || this.destroyed) {
149+
return;
150+
}
151+
152+
this.processingData = true;
153+
154+
while (this.pendingRows.length > 0) {
155+
const row = this.pendingRows.shift();
156+
const canContinue = this.push(row);
157+
158+
// If push returns false, stop pushing and wait for _read to be called
159+
if (!canContinue) {
160+
this.processingData = false;
161+
return;
69162
}
70163
}
164+
165+
// If we've finished processing all data and the server indicated completion
166+
if (this.finished && this.pendingRows.length === 0) {
167+
this.push(null);
168+
this.processingData = false;
169+
return;
170+
}
171+
172+
this.processingData = false;
71173
}
72174

73175
_read() {
74-
/* _read method requires implementation, even if data comes from other sources */
176+
// Called when the stream is ready for more data
177+
if (!this.processingData && this.pendingRows.length > 0) {
178+
this.tryPushPendingData();
179+
}
180+
181+
// Also resume source stream if it was paused and we have capacity
182+
if (
183+
this.sourceStream &&
184+
this.inputPaused &&
185+
this.pendingRows.length < this.bufferGrowthThreshold
186+
) {
187+
this.sourceStream.resume();
188+
this.inputPaused = false;
189+
}
190+
}
191+
192+
_destroy(err: Error | null, callback: (error?: Error | null) => void) {
193+
if (this.sourceStream) {
194+
// Resume stream if paused to ensure proper cleanup
195+
if (this.inputPaused) {
196+
this.sourceStream.resume();
197+
this.inputPaused = false;
198+
}
199+
200+
// Only call destroy if it exists (for Node.js streams)
201+
const destroyableStream = this.sourceStream as unknown as {
202+
destroy?: () => void;
203+
};
204+
if (typeof destroyableStream.destroy === "function") {
205+
destroyableStream.destroy();
206+
}
207+
this.sourceStream = null;
208+
}
209+
callback(err);
75210
}
76211
}

0 commit comments

Comments
 (0)