Skip to content

Commit a499607

Browse files
committed
rename threshold
1 parent 38a92c9 commit a499607

File tree

2 files changed

+72
-64
lines changed

2 files changed

+72
-64
lines changed

src/statement/stream/serverSideStream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export class ServerSideStream extends Readable {
1414
private finished = false;
1515
private processingData = false;
1616
private inputPaused = false;
17-
private readonly maxPendingRows = 5; // Limit pending rows to prevent memory buildup
17+
private readonly bufferGrowthThreshold = 10; // Stop adding to buffer when over this many rows are already in
1818
private lineBuffer = "";
1919
private sourceStream: NodeJS.ReadableStream | null = null;
2020

@@ -68,7 +68,7 @@ export class ServerSideStream extends Readable {
6868

6969
// Apply backpressure if we have too many pending rows
7070
if (
71-
this.pendingRows.length > this.maxPendingRows &&
71+
this.pendingRows.length > this.bufferGrowthThreshold &&
7272
this.sourceStream &&
7373
!this.inputPaused &&
7474
!this.processingData
@@ -182,7 +182,7 @@ export class ServerSideStream extends Readable {
182182
if (
183183
this.sourceStream &&
184184
this.inputPaused &&
185-
this.pendingRows.length < this.maxPendingRows / 4
185+
this.pendingRows.length < this.bufferGrowthThreshold
186186
) {
187187
this.sourceStream.resume();
188188
this.inputPaused = false;

test/unit/v2/serverSideStream.test.ts

Lines changed: 69 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@ describe("ServerSideStream", () => {
1717
});
1818

1919
describe("backpressure and pause/resume functionality", () => {
20-
it("should pause source stream when pending rows exceed maxPendingRows threshold", done => {
20+
it("should pause source stream when pending rows exceed bufferGrowthThreshold", done => {
2121
const sourceStream = new PassThrough();
2222
mockResponse.body = sourceStream;
2323

2424
let pauseCalled = false;
25-
let pauseCallCount = 0;
2625

27-
// Mock the pause method on the response body (this is what ServerSideStream calls)
26+
// Mock the pause method
2827
sourceStream.pause = jest.fn(() => {
2928
pauseCalled = true;
30-
pauseCallCount++;
3129
return sourceStream;
3230
});
3331

@@ -36,41 +34,42 @@ describe("ServerSideStream", () => {
3634
executeQueryOptions
3735
);
3836

39-
// Mock push to return false immediately to create backpressure from the start
40-
// This should cause rows to accumulate in pendingRows
41-
serverSideStream.push = jest.fn((_chunk: unknown) => {
42-
// Always return false to simulate constant backpressure
43-
return false;
44-
});
45-
4637
serverSideStream.on("error", done);
4738

48-
// First send the START message
39+
// Send START message first
4940
sourceStream.write(
5041
JSON.stringify({
5142
message_type: "START",
5243
result_columns: [{ name: "id", type: "integer" }]
5344
}) + "\n"
5445
);
5546

56-
// Send fewer data messages but with multiple rows each
57-
// Each data message can contain multiple rows in the data array
58-
let dataMessages = "";
59-
for (let i = 0; i < 3; i++) {
60-
dataMessages +=
47+
// Wait for start message to be processed, then setup backpressure
48+
setTimeout(() => {
49+
// Mock push to always return false to create backpressure
50+
const originalPush = serverSideStream.push.bind(serverSideStream);
51+
serverSideStream.push = jest.fn(chunk => {
52+
if (chunk !== null) {
53+
return false; // Always return false for data chunks
54+
}
55+
return originalPush(chunk); // Allow null (end) to pass through
56+
});
57+
58+
// Send a large single data message with many rows to ensure we exceed threshold
59+
const largeDataMessage =
6160
JSON.stringify({
6261
message_type: "DATA",
63-
data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]]
62+
data: Array.from({ length: 15 }, (_, i) => [i]) // 15 rows, exceeds threshold of 10
6463
}) + "\n";
65-
}
66-
sourceStream.write(dataMessages);
6764

68-
// Give the stream time to process and check if pause was called
69-
setTimeout(() => {
70-
expect(pauseCalled).toBe(true);
71-
expect(pauseCallCount).toBeGreaterThan(0);
72-
done();
73-
}, 100);
65+
sourceStream.write(largeDataMessage);
66+
67+
// Check after a delay
68+
setTimeout(() => {
69+
expect(pauseCalled).toBe(true);
70+
done();
71+
}, 50);
72+
}, 10);
7473
});
7574

7675
it("should resume source stream when pending rows drop below threshold", done => {
@@ -98,44 +97,53 @@ describe("ServerSideStream", () => {
9897

9998
serverSideStream.on("error", done);
10099

101-
// Mock push to always return false to accumulate rows and trigger pause
102-
serverSideStream.push = jest.fn(() => {
103-
return false;
104-
});
105-
106-
// Send START message
100+
// Send START message first
107101
sourceStream.write(
108102
JSON.stringify({
109103
message_type: "START",
110104
result_columns: [{ name: "id", type: "integer" }]
111105
}) + "\n"
112106
);
113107

114-
// Send multiple DATA messages to exceed maxPendingRows threshold
115-
let dataMessages = "";
116-
for (let i = 0; i < 3; i++) {
117-
dataMessages +=
108+
setTimeout(() => {
109+
// Mock push to always return false to accumulate rows and trigger pause
110+
const originalPush = serverSideStream.push.bind(serverSideStream);
111+
serverSideStream.push = jest.fn(chunk => {
112+
if (chunk !== null) {
113+
return false; // Always return false for data chunks to accumulate
114+
}
115+
return originalPush(chunk);
116+
});
117+
118+
// Send large data message to exceed threshold and trigger pause
119+
const largeDataMessage =
118120
JSON.stringify({
119121
message_type: "DATA",
120-
data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]]
122+
data: Array.from({ length: 15 }, (_, i) => [i])
121123
}) + "\n";
122-
}
123-
sourceStream.write(dataMessages);
124124

125-
setTimeout(() => {
126-
expect(pauseCalled).toBe(true);
127-
128-
// Now test resume: change push to return true (allow processing)
129-
serverSideStream.push = jest.fn(() => true);
130-
131-
// Call _read to trigger the resume condition check
132-
serverSideStream._read();
125+
sourceStream.write(largeDataMessage);
133126

134127
setTimeout(() => {
135-
expect(resumeCalled).toBe(true);
136-
done();
128+
expect(pauseCalled).toBe(true);
129+
130+
// Now test resume: change push to return true (allow processing)
131+
serverSideStream.push = jest.fn(chunk => {
132+
if (chunk !== null) {
133+
return true; // Allow processing to drain buffer
134+
}
135+
return originalPush(chunk);
136+
});
137+
138+
// Call _read to trigger the resume condition check
139+
serverSideStream._read();
140+
141+
setTimeout(() => {
142+
expect(resumeCalled).toBe(true);
143+
done();
144+
}, 50);
137145
}, 50);
138-
}, 100);
146+
}, 10);
139147
});
140148

141149
it("should properly clean up on stream destruction", done => {
@@ -159,7 +167,7 @@ describe("ServerSideStream", () => {
159167
);
160168

161169
// First pause the stream by creating backpressure
162-
serverSideStream.push = jest.fn((_chunk: unknown) => {
170+
serverSideStream.push = jest.fn(() => {
163171
return false; // Always return false to simulate backpressure
164172
});
165173

@@ -172,15 +180,15 @@ describe("ServerSideStream", () => {
172180
);
173181

174182
// Send data to trigger pause
175-
let dataMessages = "";
176-
for (let i = 0; i < 3; i++) {
177-
dataMessages +=
183+
// Send messages one at a time to trigger backpressure check between messages
184+
for (let i = 0; i < 6; i++) {
185+
const dataMessage =
178186
JSON.stringify({
179187
message_type: "DATA",
180-
data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]]
188+
data: [[i * 2 + 0], [i * 2 + 1]]
181189
}) + "\n";
190+
sourceStream.write(dataMessage);
182191
}
183-
sourceStream.write(dataMessages);
184192

185193
setTimeout(() => {
186194
// Destroy the stream
@@ -220,18 +228,18 @@ describe("ServerSideStream", () => {
220228
}) + "\n"
221229
);
222230

223-
let dataMessages = "";
224-
for (let i = 0; i < 3; i++) {
225-
dataMessages +=
231+
// Send data one message at a time to trigger pause
232+
for (let i = 0; i < 12; i++) {
233+
const dataMessage =
226234
JSON.stringify({
227235
message_type: "DATA",
228-
data: [[i * 3 + 0]]
236+
data: [[i]]
229237
}) + "\n";
238+
sourceStream.write(dataMessage);
230239
}
231240

232241
// Send error message
233242
setTimeout(() => {
234-
sourceStream.write(dataMessages);
235243
sourceStream.write(
236244
JSON.stringify({
237245
message_type: "FINISH_WITH_ERRORS",

0 commit comments

Comments
 (0)